From 1bf2547d0bcec9870536267b45e6bd63892a8c9a Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 14 Jun 2023 22:40:06 +0200 Subject: [PATCH] [receiver/sqlquery] add support for logs (#20730) Fixes #20284 This introduces initial support for retrieving rows from SQL databases into logs. This PR aims to provide an initial, not feature rich, but production ready implementation. The following features are available: - Use `body_column` to select the column to use to fill the Body field of the created log - Use `tracking_start_value` and `tracking_column` properties to track rows that were already ingested - Use `storage` property to persist the tracking value across collector restarts In this state and marked as "development" stability, the component can be used for experimentation and to guide future development. There are definitely more things that need to be implemented for this component to be considered "alpha" quality - like filling in other [log fields](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/logs/data-model.md#log-and-event-record-definition) like Timestamp, ObservedTimestamp and others. I would like to add them in subsequent pull requests, as this pull request is already way too big. --------- Co-authored-by: Dominika Molenda Co-authored-by: Dominika Molenda <73838995+dmolenda-sumo@users.noreply.github.com> Co-authored-by: Katarzyna Kujawa <73836361+kkujawa-sumo@users.noreply.github.com> Co-authored-by: Katarzyna Kujawa --- .chloggen/sqlquery-receiver-add-logs.yaml | 16 + receiver/sqlqueryreceiver/README.md | 100 +++++- receiver/sqlqueryreceiver/config.go | 35 +- receiver/sqlqueryreceiver/config_test.go | 35 +- receiver/sqlqueryreceiver/db_client.go | 6 +- receiver/sqlqueryreceiver/db_client_test.go | 10 +- receiver/sqlqueryreceiver/factory.go | 3 +- receiver/sqlqueryreceiver/factory_test.go | 7 + receiver/sqlqueryreceiver/go.mod | 14 +- receiver/sqlqueryreceiver/go.sum | 8 + receiver/sqlqueryreceiver/integration_test.go | 324 +++++++++++++++++- .../internal/metadata/generated_status.go | 1 + receiver/sqlqueryreceiver/logs_receiver.go | 314 +++++++++++++++++ receiver/sqlqueryreceiver/metadata.yaml | 1 + receiver/sqlqueryreceiver/receiver.go | 17 +- receiver/sqlqueryreceiver/receiver_test.go | 30 +- receiver/sqlqueryreceiver/row_scanner.go | 4 + receiver/sqlqueryreceiver/scraper.go | 7 +- ... config-invalid-missing-logs-metrics.yaml} | 0 .../config-logs-missing-body-column.yaml | 8 + .../testdata/config-logs.yaml | 10 + .../testdata/integration/mysql/init.sql | 15 + .../testdata/integration/oracle/init.sql | 19 + .../testdata/integration/postgresql/init.sql | 16 + 24 files changed, 954 insertions(+), 46 deletions(-) create mode 100755 .chloggen/sqlquery-receiver-add-logs.yaml create mode 100644 receiver/sqlqueryreceiver/logs_receiver.go rename receiver/sqlqueryreceiver/testdata/{config-invalid-missing-metrics.yaml => config-invalid-missing-logs-metrics.yaml} (100%) create mode 100644 receiver/sqlqueryreceiver/testdata/config-logs-missing-body-column.yaml create mode 100644 receiver/sqlqueryreceiver/testdata/config-logs.yaml diff --git a/.chloggen/sqlquery-receiver-add-logs.yaml b/.chloggen/sqlquery-receiver-add-logs.yaml new file mode 100755 index 000000000000..95cc4b444dc3 --- /dev/null +++ b/.chloggen/sqlquery-receiver-add-logs.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sqlqueryreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for logs + +# One or more tracking issues related to the change +issues: [20284] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/sqlqueryreceiver/README.md b/receiver/sqlqueryreceiver/README.md index 8a5e4e23759a..5fcff4f84eba 100644 --- a/receiver/sqlqueryreceiver/README.md +++ b/receiver/sqlqueryreceiver/README.md @@ -4,9 +4,11 @@ | Status | | | ------------- |-----------| | Stability | [alpha]: metrics | +| | [development]: logs | | Distributions | [contrib], [observiq], [splunk], [sumo] | [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha +[development]: https://github.com/open-telemetry/opentelemetry-collector#development [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [observiq]: https://github.com/observIQ/observiq-otel-collector [splunk]: https://github.com/signalfx/splunk-otel-collector @@ -28,29 +30,88 @@ The configuration supports the following top-level fields: a driver-specific string usually consisting of at least a database name and connection information. This is sometimes referred to as the "connection string" in driver documentation. e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_ -- `queries`(required): A list of queries, where a query is a sql statement and one or more metrics (details below). +- `queries`(required): A list of queries, where a query is a sql statement and one or more `logs` and/or `metrics` sections (details below). - `collection_interval`(optional): The time interval between query executions. Defaults to _10s_. +- `storage` (optional, default `""`): The ID of a [storage][storage_extension] extension to be used to [track processed results](#tracking-processed-results). + +[storage_extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage ### Queries -A _query_ consists of a sql statement and one or more _metrics_, where each metric consists of a +A _query_ consists of a sql statement and one or more `logs` and/or `metrics` section. +At least one `logs` or one `metrics` section is required. +Note that technically you can put both `logs` and `metrics` sections in a single query section, +but it's probably not a real world use case, as the requirements for logs and metrics queries +are quite different. + +Additionally, each `query` section supports the following properties: + +- `tracking_column` (optional, default `""`) Applies only to logs. In case of a parameterized query, + defines the column to retrieve the value of the parameter on subsequent query runs. + See the below section [Tracking processed results](#tracking-processed-results). +- `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter. + See the below section [Tracking processed results](#tracking-processed-results). + +Example: + +```yaml +receivers: + sqlquery: + driver: postgres + datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable" + queries: + - sql: "select * from my_logs where log_id > $$1" + tracking_start_value: "10000" + tracking_column: log_id + logs: + - body_column: log_body + - sql: "select count(*) as count, genre from movie group by genre" + metrics: + - metric_name: movie.genres + value_column: "count" +``` + +#### Logs Queries + +The `logs` section is in development. + +- `body_column` (required) defines the column to use as the log record's body. + +##### Tracking processed results + +With the default configuration and a non-parameterized logs query like `select * from my_logs`, +the receiver will run the same query every collection interval, which can cause reading the same rows +over and over again, unless there's an external actor removing the old rows from the `my_logs` table. + +To prevent reading the same rows on every collection interval, use a parameterized query like `select * from my_logs where id_column > ?`, +together with the `tracking_start_value` and `tracking_column` configuration properties. +The receiver will use the configured `tracking_start_value` as the value for the query parameter when running the query for the first time. +After each query run, the receiver will store the value of the `tracking_column` from the last row of the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value. + +Note that the notation for the parameter depends on the database backend. For example in MySQL this is `?`, in PostgreSQL this is `$1`, in Oracle this is any string identifier starting with a colon `:`, for example `:my_parameter`. + +Use the `storage` configuration property of the receiver to persist the tracking value across collector restarts. + +#### Metrics queries + +Each `metrics` section consists of a `metric_name`, a `value_column`, and additional optional fields. Each _metric_ in the configuration will produce one OTel metric per row returned from its sql query. -* `metric_name`(required): the name assigned to the OTel metric. -* `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint. +- `metric_name`(required): the name assigned to the OTel metric. +- `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint. This may be case-sensitive, depending on the driver (e.g. Oracle DB). -* `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint. +- `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint. These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB). -* `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`. -* `value_type` (optional): can be `int` or `double`; defaults to `int`. -* `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over +- `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`. +- `value_type` (optional): can be `int` or `double`; defaults to `int`. +- `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over or resets); defaults to false. -* `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults +- `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults to `cumulative`. -* `description` (optional): the description applied to the metric. -* `unit` (optional): the units applied to the metric. -* `static_attributes` (optional): static attributes applied to the metrics +- `description` (optional): the description applied to the metric. +- `unit` (optional): the units applied to the metric. +- `static_attributes` (optional): static attributes applied to the metrics ### Example @@ -59,12 +120,18 @@ receivers: sqlquery: driver: postgres datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable" + storage: file_storage queries: + - sql: "select * from my_logs where log_id > $$1" + tracking_start_value: "10000" + tracking_column: log_id + logs: + - body_column: log_body - sql: "select count(*) as count, genre from movie group by genre" metrics: - metric_name: movie.genres value_column: "count" - attribute_columns: [ "genre" ] + attribute_columns: ["genre"] static_attributes: dbinstance: mydbinstance ``` @@ -72,7 +139,7 @@ receivers: Given a `movie` table with three rows: | name | genre | -|-----------|--------| +| --------- | ------ | | E.T. | sci-fi | | Star Wars | sci-fi | | Die Hard | action | @@ -80,7 +147,7 @@ Given a `movie` table with three rows: If there are two rows returned from the query `select count(*) as count, genre from movie group by genre`: | count | genre | -|-------|--------| +| ----- | ------ | | 2 | sci-fi | | 1 | action | @@ -94,7 +161,7 @@ Descriptor: NumberDataPoints #0 Data point attributes: -> genre: STRING(sci-fi) - -> dbinstance: STRING(mydbinstance) + -> dbinstance: STRING(mydbinstance) Value: 2 Metric #1 @@ -121,4 +188,3 @@ Oracle DB driver to connect and query the same table schema and contents as the The Oracle DB driver documentation can be found [here.](https://github.com/sijms/go-ora) Another usage example is the `go_ora` example [here.](https://blogs.oracle.com/developers/post/connecting-a-go-application-to-oracle-database) - diff --git a/receiver/sqlqueryreceiver/config.go b/receiver/sqlqueryreceiver/config.go index 280ef15c9ff9..925a9ba8827b 100644 --- a/receiver/sqlqueryreceiver/config.go +++ b/receiver/sqlqueryreceiver/config.go @@ -17,9 +17,10 @@ import ( type Config struct { scraperhelper.ScraperControllerSettings `mapstructure:",squash"` - Driver string `mapstructure:"driver"` - DataSource string `mapstructure:"datasource"` - Queries []Query `mapstructure:"queries"` + Driver string `mapstructure:"driver"` + DataSource string `mapstructure:"datasource"` + Queries []Query `mapstructure:"queries"` + StorageID *component.ID `mapstructure:"storage"` } func (c Config) Validate() error { @@ -41,8 +42,11 @@ func (c Config) Validate() error { } type Query struct { - SQL string `mapstructure:"sql"` - Metrics []MetricCfg `mapstructure:"metrics"` + SQL string `mapstructure:"sql"` + Metrics []MetricCfg `mapstructure:"metrics"` + Logs []LogsCfg `mapstructure:"logs"` + TrackingColumn string `mapstructure:"tracking_column"` + TrackingStartValue string `mapstructure:"tracking_start_value"` } func (q Query) Validate() error { @@ -50,8 +54,13 @@ func (q Query) Validate() error { if q.SQL == "" { errs = multierr.Append(errs, errors.New("'query.sql' cannot be empty")) } - if len(q.Metrics) == 0 { - errs = multierr.Append(errs, errors.New("'query.metrics' cannot be empty")) + if len(q.Logs) == 0 && len(q.Metrics) == 0 { + errs = multierr.Append(errs, errors.New("at least one of 'query.logs' and 'query.metrics' must not be empty")) + } + for _, logs := range q.Logs { + if err := logs.Validate(); err != nil { + errs = multierr.Append(errs, err) + } } for _, metric := range q.Metrics { if err := metric.Validate(); err != nil { @@ -61,6 +70,18 @@ func (q Query) Validate() error { return errs } +type LogsCfg struct { + BodyColumn string `mapstructure:"body_column"` +} + +func (config LogsCfg) Validate() error { + var errs error + if config.BodyColumn == "" { + errs = multierr.Append(errs, errors.New("'body_column' must not be empty")) + } + return errs +} + type MetricCfg struct { MetricName string `mapstructure:"metric_name"` ValueColumn string `mapstructure:"value_column"` diff --git a/receiver/sqlqueryreceiver/config_test.go b/receiver/sqlqueryreceiver/config_test.go index 4a844ba2ffab..56893e428f94 100644 --- a/receiver/sqlqueryreceiver/config_test.go +++ b/receiver/sqlqueryreceiver/config_test.go @@ -96,15 +96,44 @@ func TestLoadConfig(t *testing.T) { errorMessage: "'driver' cannot be empty", }, { - fname: "config-invalid-missing-metrics.yaml", + fname: "config-invalid-missing-logs-metrics.yaml", id: component.NewIDWithName(metadata.Type, ""), - errorMessage: "'query.metrics' cannot be empty", + errorMessage: "at least one of 'query.logs' and 'query.metrics' must not be empty", }, { fname: "config-invalid-missing-datasource.yaml", id: component.NewIDWithName(metadata.Type, ""), errorMessage: "'datasource' cannot be empty", }, + { + fname: "config-logs.yaml", + id: component.NewIDWithName(metadata.Type, ""), + expected: &Config{ + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ + CollectionInterval: 10 * time.Second, + InitialDelay: time.Second, + }, + Driver: "mydriver", + DataSource: "host=localhost port=5432 user=me password=s3cr3t sslmode=disable", + Queries: []Query{ + { + SQL: "select * from test_logs where log_id > ?", + TrackingColumn: "log_id", + TrackingStartValue: "10", + Logs: []LogsCfg{ + { + BodyColumn: "log_body", + }, + }, + }, + }, + }, + }, + { + fname: "config-logs-missing-body-column.yaml", + id: component.NewIDWithName(metadata.Type, ""), + errorMessage: "'body_column' must not be empty", + }, { fname: "config-unnecessary-aggregation.yaml", id: component.NewIDWithName(metadata.Type, ""), @@ -113,7 +142,7 @@ func TestLoadConfig(t *testing.T) { } for _, tt := range tests { - t.Run(tt.id.String(), func(t *testing.T) { + t.Run(tt.fname, func(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.fname)) require.NoError(t, err) diff --git a/receiver/sqlqueryreceiver/db_client.go b/receiver/sqlqueryreceiver/db_client.go index 80282217bc61..4c5056c07dc4 100644 --- a/receiver/sqlqueryreceiver/db_client.go +++ b/receiver/sqlqueryreceiver/db_client.go @@ -20,7 +20,7 @@ import ( type stringMap map[string]string type dbClient interface { - metricRows(ctx context.Context) ([]stringMap, error) + queryRows(ctx context.Context, args ...any) ([]stringMap, error) } type dbSQLClient struct { @@ -37,8 +37,8 @@ func newDbClient(db db, sql string, logger *zap.Logger) dbClient { } } -func (cl dbSQLClient) metricRows(ctx context.Context) ([]stringMap, error) { - sqlRows, err := cl.db.QueryContext(ctx, cl.sql) +func (cl dbSQLClient) queryRows(ctx context.Context, args ...any) ([]stringMap, error) { + sqlRows, err := cl.db.QueryContext(ctx, cl.sql, args...) if err != nil { return nil, err } diff --git a/receiver/sqlqueryreceiver/db_client_test.go b/receiver/sqlqueryreceiver/db_client_test.go index ed8d0c932998..4fb591c68be4 100644 --- a/receiver/sqlqueryreceiver/db_client_test.go +++ b/receiver/sqlqueryreceiver/db_client_test.go @@ -21,7 +21,7 @@ func TestDBSQLClient_SingleRow(t *testing.T) { logger: zap.NewNop(), sql: "", } - rows, err := cl.metricRows(context.Background()) + rows, err := cl.queryRows(context.Background()) require.NoError(t, err) assert.Len(t, rows, 1) assert.EqualValues(t, map[string]string{ @@ -42,7 +42,7 @@ func TestDBSQLClient_MultiRow(t *testing.T) { logger: zap.NewNop(), sql: "", } - rows, err := cl.metricRows(context.Background()) + rows, err := cl.queryRows(context.Background()) require.NoError(t, err) assert.Len(t, rows, 2) assert.EqualValues(t, map[string]string{ @@ -69,7 +69,7 @@ func TestDBSQLClient_Nulls(t *testing.T) { logger: zap.NewNop(), sql: "", } - rows, err := cl.metricRows(context.Background()) + rows, err := cl.queryRows(context.Background()) assert.Error(t, err) assert.True(t, errors.Is(err, errNullValueWarning)) assert.Len(t, rows, 1) @@ -88,7 +88,7 @@ func TestDBSQLClient_Nulls_MultiRow(t *testing.T) { logger: zap.NewNop(), sql: "", } - rows, err := cl.metricRows(context.Background()) + rows, err := cl.queryRows(context.Background()) assert.Error(t, err) errs := multierr.Errors(err) for _, err := range errs { @@ -152,7 +152,7 @@ type fakeDBClient struct { err error } -func (c *fakeDBClient) metricRows(context.Context) ([]stringMap, error) { +func (c *fakeDBClient) queryRows(context.Context, ...any) ([]stringMap, error) { if c.err != nil { return nil, c.err } diff --git a/receiver/sqlqueryreceiver/factory.go b/receiver/sqlqueryreceiver/factory.go index 5c2334d01c3d..b72c1c37a2f2 100644 --- a/receiver/sqlqueryreceiver/factory.go +++ b/receiver/sqlqueryreceiver/factory.go @@ -15,6 +15,7 @@ func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithMetrics(createReceiverFunc(sql.Open, newDbClient), metadata.MetricsStability), + receiver.WithLogs(createLogsReceiverFunc(sql.Open, newDbClient), metadata.LogsStability), + receiver.WithMetrics(createMetricsReceiverFunc(sql.Open, newDbClient), metadata.MetricsStability), ) } diff --git a/receiver/sqlqueryreceiver/factory_test.go b/receiver/sqlqueryreceiver/factory_test.go index 1577eca00f44..40a99baa78a2 100644 --- a/receiver/sqlqueryreceiver/factory_test.go +++ b/receiver/sqlqueryreceiver/factory_test.go @@ -21,4 +21,11 @@ func TestNewFactory(t *testing.T) { consumertest.NewNop(), ) require.NoError(t, err) + _, err = factory.CreateLogsReceiver( + context.Background(), + receivertest.NewNopCreateSettings(), + factory.CreateDefaultConfig(), + consumertest.NewNop(), + ) + require.NoError(t, err) } diff --git a/receiver/sqlqueryreceiver/go.mod b/receiver/sqlqueryreceiver/go.mod index 120d4d50dd5f..14c2fe960577 100644 --- a/receiver/sqlqueryreceiver/go.mod +++ b/receiver/sqlqueryreceiver/go.mod @@ -8,15 +8,19 @@ require ( github.com/docker/go-connections v0.4.0 github.com/go-sql-driver/mysql v1.7.1 github.com/lib/pq v1.10.9 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.79.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.77.0 github.com/sijms/go-ora/v2 v2.7.6 github.com/snowflakedb/gosnowflake v1.6.18 github.com/stretchr/testify v1.8.4 github.com/testcontainers/testcontainers-go v0.20.1 + go.opentelemetry.io/collector v0.79.1-0.20230609201858-ed8547a8e5d6 go.opentelemetry.io/collector/component v0.79.1-0.20230609201858-ed8547a8e5d6 go.opentelemetry.io/collector/confmap v0.79.1-0.20230609201858-ed8547a8e5d6 go.opentelemetry.io/collector/consumer v0.79.1-0.20230609201858-ed8547a8e5d6 + go.opentelemetry.io/collector/extension v0.0.0-20230609200026-525adf4a682a go.opentelemetry.io/collector/pdata v1.0.0-rcv0012.0.20230609201858-ed8547a8e5d6 go.opentelemetry.io/collector/receiver v0.79.1-0.20230609201858-ed8547a8e5d6 go.uber.org/multierr v1.11.0 @@ -30,6 +34,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.15.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect + github.com/antonmedv/expr v1.12.5 // indirect github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect @@ -81,6 +86,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mtibben/percent v0.2.1 // indirect + github.com/observiq/ctimefmt v1.0.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.79.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc2 // indirect @@ -91,7 +97,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/collector v0.79.1-0.20230609201858-ed8547a8e5d6 // indirect go.opentelemetry.io/collector/exporter v0.79.1-0.20230609201858-ed8547a8e5d6 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012.0.20230609201858-ed8547a8e5d6 // indirect go.opentelemetry.io/collector/processor v0.0.0-20230609193203-89d1060c7606 // indirect @@ -107,6 +112,7 @@ require ( golang.org/x/term v0.8.0 // indirect golang.org/x/text v0.10.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + gonum.org/v1/gonum v0.13.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect @@ -122,8 +128,12 @@ retract ( v0.65.0 ) -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza => ../../pkg/stanza diff --git a/receiver/sqlqueryreceiver/go.sum b/receiver/sqlqueryreceiver/go.sum index c7ccd1196d27..410ac3f60ab7 100644 --- a/receiver/sqlqueryreceiver/go.sum +++ b/receiver/sqlqueryreceiver/go.sum @@ -33,6 +33,7 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Microsoft/hcsshim v0.9.7 h1:mKNHW/Xvv1aFH87Jb6ERDzXTJTLPlmzfZ28VBFD/bfg= +github.com/Mottl/ctimefmt v0.0.0-20190803144728-fd2ac23a585a/go.mod h1:eyj2WSIdoPMPs2eNTLpSmM6Nzqo4V80/d6jHpnJ1SAI= github.com/SAP/go-hdb v1.3.6 h1:8j4FLYwp6s6JV/bjfs7P15rBqKBofGHKX0KLCV9BmiI= github.com/SAP/go-hdb v1.3.6/go.mod h1:oHuCjMonQNRgjWLPaHkDZPlJhjBUMciFB6mDNQaPN1s= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= @@ -42,6 +43,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antonmedv/expr v1.12.5 h1:Fq4okale9swwL3OeLLs9WD9H6GbgBLJyN/NUHRv+n0E= +github.com/antonmedv/expr v1.12.5/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -384,6 +387,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= +github.com/observiq/ctimefmt v1.0.0 h1:r7vTJ+Slkrt9fZ67mkf+mA6zAdR5nGIJRMTzkUyvilk= +github.com/observiq/ctimefmt v1.0.0/go.mod h1:mxi62//WbSpG/roCO1c6MqZ7zQTvjVtYheqHN3eOjvc= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= @@ -497,6 +502,8 @@ go.opentelemetry.io/collector/consumer v0.79.1-0.20230609201858-ed8547a8e5d6 h1: go.opentelemetry.io/collector/consumer v0.79.1-0.20230609201858-ed8547a8e5d6/go.mod h1:mgVk2Eaf5E+nxN9dAsGMMkfA8Imt/FzVUZ91dc6dQDo= go.opentelemetry.io/collector/exporter v0.79.1-0.20230609201858-ed8547a8e5d6 h1:Q7aFEA/FBsVG+G0lZo4nVQMWUIpV62gqKlZtLY5oPb4= go.opentelemetry.io/collector/exporter v0.79.1-0.20230609201858-ed8547a8e5d6/go.mod h1:DmtA7fKx1oftCr3PMqv+0g+e0rn04Sgm/A9NzOHObcI= +go.opentelemetry.io/collector/extension v0.0.0-20230609200026-525adf4a682a h1:I8WVnt7cj/JweYOaaiu+Zzx8szYwVR5jBVxBgzkFxaA= +go.opentelemetry.io/collector/extension v0.0.0-20230609200026-525adf4a682a/go.mod h1:7vkZTU69L7UBMLhIPWeIl/vQZHpwVLiu0AW6lhojfDw= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012.0.20230609201858-ed8547a8e5d6 h1:q/bCQl8udc8eZvfM1ReHLd7MVEngrK7rHit1kMDPAsQ= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012.0.20230609201858-ed8547a8e5d6/go.mod h1:0mE3mDLmUrOXVoNsuvj+7dV14h/9HFl/Fy9YTLoLObo= go.opentelemetry.io/collector/pdata v1.0.0-rcv0012.0.20230609201858-ed8547a8e5d6 h1:5owemI2Qf9djArnnA/l83hn/Un7cMeSEaDWI0LPlQ9k= @@ -683,6 +690,7 @@ gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJ gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/gonum v0.9.3/go.mod h1:TZumC3NeyVQskjXqmyWt4S3bINhy7B4eYwW69EbyX+0= gonum.org/v1/gonum v0.13.0 h1:a0T3bh+7fhRyqeNbiC3qVHYmkiQgit3wnNan/2c0HMM= +gonum.org/v1/gonum v0.13.0/go.mod h1:/WPYRckkfWrhWefxyYTfrTtQR0KH4iyHNuzxqXAKyAU= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= gonum.org/v1/plot v0.9.0/go.mod h1:3Pcqqmp6RHvJI72kgb8fThyUnav364FOsdDo2aGW5lY= diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index 18d32040ebc4..c25f2f90da8b 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -7,17 +7,28 @@ package sqlqueryreceiver import ( + "context" "fmt" + "io" "path/filepath" "runtime" + "strings" "testing" "time" "github.com/docker/go-connections/nat" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) @@ -28,7 +39,297 @@ const ( mysqlPort = "3306" ) -func TestPostgresqlIntegration(t *testing.T) { +func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { + // Start Postgres container. + externalPort := "15430" + dbContainer := startPostgresDbContainer(t, externalPort) + defer func() { + require.NoError(t, dbContainer.Terminate(context.Background())) + }() + + // Start the SQL Query receiver. + receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + host := componenttest.NewNopHost() + err := receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Verify there's 5 logs received. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + require.Equal(t, 5, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs()) + + // Stop the SQL Query receiver. + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // Start new SQL Query receiver with the same configuration. + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for some logs to come in. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // Verify that the same logs are collected again. + require.Equal(t, 5, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs()) +} + +func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { + // start Postgres container + externalPort := "15431" + dbContainer := startPostgresDbContainer(t, externalPort) + defer func() { + require.NoError(t, dbContainer.Terminate(context.Background())) + }() + + // create a File Storage extension writing to a temporary directory in local filesystem + storageDir := t.TempDir() + storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir) + + // create SQL Query receiver configured with the File Storage extension + receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.StorageID = &storageExtension.ID + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + + // start the SQL Query receiver + host := storagetest.NewStorageHost().WithExtension(storageExtension.ID, storageExtension) + err := receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for logs to come in. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // verify there's 5 logs received + initialLogCount := 5 + require.Equal(t, initialLogCount, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs()) + + // start the SQL Query receiver again + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.StorageID = &storageExtension.ID + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for some logs to come in. + time.Sleep(3 * time.Second) + + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // Verify that no new logs came in + require.Equal(t, 0, consumer.LogRecordCount()) + + // write a number of new logs to the database + newLogCount := 3 + insertPostgresSimpleLogs(t, dbContainer, initialLogCount, newLogCount) + + // start the SQL Query receiver again + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.StorageID = &storageExtension.ID + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for new logs to come in. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // Verify that the newly added logs were received. + require.Equal(t, newLogCount, consumer.LogRecordCount()) + printLogs(consumer.AllLogs()) +} + +func startPostgresDbContainer(t *testing.T, externalPort string) testcontainers.Container { + req := testcontainers.ContainerRequest{ + Image: "postgres:9.6.24", + Env: map[string]string{ + "POSTGRES_USER": "root", + "POSTGRES_PASSWORD": "otel", + "POSTGRES_DB": "otel", + }, + Files: []testcontainers.ContainerFile{{ + HostFilePath: filepath.Join("testdata", "integration", "postgresql", "init.sql"), + ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", + FileMode: 700, + }}, + ExposedPorts: []string{externalPort + ":" + postgresqlPort}, + WaitingFor: wait.ForListeningPort(nat.Port(postgresqlPort)). + WithStartupTimeout(2 * time.Minute), + } + + container, err := testcontainers.GenericContainer( + context.Background(), + testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }, + ) + require.NoError(t, err) + return container +} + +func createTestLogsReceiverForPostgres(t *testing.T, externalPort string) (*logsReceiver, *Config, *consumertest.LogsSink) { + factory := NewFactory() + config := factory.CreateDefaultConfig().(*Config) + config.CollectionInterval = time.Second + config.Driver = "postgres" + config.DataSource = fmt.Sprintf("host=localhost port=%s user=otel password=otel sslmode=disable", externalPort) + + consumer := &consumertest.LogsSink{} + receiverCreateSettings := receivertest.NewNopCreateSettings() + receiverCreateSettings.Logger = zap.NewExample() + receiver, err := factory.CreateLogsReceiver( + context.Background(), + receiverCreateSettings, + config, + consumer, + ) + require.NoError(t, err) + return receiver.(*logsReceiver), config, consumer +} + +func printLogs(allLogs []plog.Logs) { + for logIndex := 0; logIndex < len(allLogs); logIndex++ { + logs := allLogs[logIndex] + for resourceIndex := 0; resourceIndex < logs.ResourceLogs().Len(); resourceIndex++ { + resource := logs.ResourceLogs().At(resourceIndex) + for scopeIndex := 0; scopeIndex < resource.ScopeLogs().Len(); scopeIndex++ { + scope := resource.ScopeLogs().At(scopeIndex) + for recordIndex := 0; recordIndex < scope.LogRecords().Len(); recordIndex++ { + logRecord := scope.LogRecords().At(recordIndex) + fmt.Printf("log %v resource %v scope %v log %v body: %v\n", logIndex, resourceIndex, scopeIndex, recordIndex, logRecord.Body().Str()) + } + } + } + } +} + +func insertPostgresSimpleLogs(t *testing.T, container testcontainers.Container, existingLogID, newLogCount int) { + for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ { + query := fmt.Sprintf("insert into simple_logs (id, insert_time, body) values (%d, now(), 'another log %d');", newLogID, newLogID) + returnValue, returnMessageReader, err := container.Exec(context.Background(), []string{ + "psql", "-U", "otel", "-c", query, + }) + require.NoError(t, err) + returnMessageBuffer := new(strings.Builder) + _, err = io.Copy(returnMessageBuffer, returnMessageReader) + require.NoError(t, err) + returnMessage := returnMessageBuffer.String() + + assert.Equal(t, 0, returnValue) + assert.Contains(t, returnMessage, "INSERT 0 1") + } +} + +func TestPostgresqlIntegrationMetrics(t *testing.T) { scraperinttest.NewIntegrationTest( NewFactory(), scraperinttest.WithContainerRequest( @@ -141,7 +442,7 @@ func TestPostgresqlIntegration(t *testing.T) { // This test ensures the collector can connect to an Oracle DB, and properly get metrics. It's not intended to // test the receiver itself. -func TestOracleDBIntegration(t *testing.T) { +func TestOracleDBIntegrationMetrics(t *testing.T) { if runtime.GOARCH == "arm64" { t.Skip("Incompatible with arm64") } @@ -197,7 +498,7 @@ func TestOracleDBIntegration(t *testing.T) { ).Run(t) } -func TestMysqlIntegration(t *testing.T) { +func TestMysqlIntegrationMetrics(t *testing.T) { scraperinttest.NewIntegrationTest( NewFactory(), scraperinttest.WithContainerRequest( @@ -301,3 +602,20 @@ func TestMysqlIntegration(t *testing.T) { ), ).Run(t) } + +func testAllSimpleLogs(t *testing.T, logs []plog.Logs) { + assert.Equal(t, 1, len(logs)) + assert.Equal(t, 1, logs[0].ResourceLogs().Len()) + assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len()) + expectedEntries := []string{ + "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6197 4 \"-\" \"-\" 445af8e6c428303f -", + "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6205 5 \"-\" \"-\" 3285f43cd4baa202 -", + "- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -", + "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -", + "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -", + } + assert.Equal(t, len(expectedEntries), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + for i := range expectedEntries { + assert.Equal(t, expectedEntries[i], logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().Str()) + } +} diff --git a/receiver/sqlqueryreceiver/internal/metadata/generated_status.go b/receiver/sqlqueryreceiver/internal/metadata/generated_status.go index 826800fef4b7..f1ed4309364b 100644 --- a/receiver/sqlqueryreceiver/internal/metadata/generated_status.go +++ b/receiver/sqlqueryreceiver/internal/metadata/generated_status.go @@ -9,4 +9,5 @@ import ( const ( Type = "sqlquery" MetricsStability = component.StabilityLevelAlpha + LogsStability = component.StabilityLevelDevelopment ) diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go new file mode 100644 index 000000000000..ddbe3312a603 --- /dev/null +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -0,0 +1,314 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver" + +import ( + "context" + "database/sql" + "fmt" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/extension/experimental/storage" + "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/multierr" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver/internal/metadata" +) + +type logsReceiver struct { + config *Config + settings receiver.CreateSettings + createConnection dbProviderFunc + createClient clientProviderFunc + queryReceivers []*logsQueryReceiver + nextConsumer consumer.Logs + + isStarted bool + collectionIntervalTicker *time.Ticker + shutdownRequested chan struct{} + + id component.ID + storageClient storage.Client + obsrecv *obsreport.Receiver +} + +func newLogsReceiver( + config *Config, + settings receiver.CreateSettings, + sqlOpenerFunc sqlOpenerFunc, + createClient clientProviderFunc, + nextConsumer consumer.Logs, +) (*logsReceiver, error) { + + obsr, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: settings.ID, + ReceiverCreateSettings: settings, + }) + if err != nil { + return nil, err + } + + receiver := &logsReceiver{ + config: config, + settings: settings, + createConnection: func() (*sql.DB, error) { + return sqlOpenerFunc(config.Driver, config.DataSource) + }, + createClient: createClient, + nextConsumer: nextConsumer, + shutdownRequested: make(chan struct{}), + id: settings.ID, + obsrecv: obsr, + } + + return receiver, nil +} + +func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) error { + if receiver.isStarted { + receiver.settings.Logger.Debug("requested start, but already started, ignoring.") + return nil + } + receiver.settings.Logger.Debug("starting...") + receiver.isStarted = true + + var err error + receiver.storageClient, err = adapter.GetStorageClient(ctx, host, receiver.config.StorageID, receiver.settings.ID) + if err != nil { + return fmt.Errorf("error connecting to storage: %w", err) + } + + err = receiver.createQueryReceivers() + if err != nil { + return err + } + + for _, queryReceiver := range receiver.queryReceivers { + err := queryReceiver.start(ctx) + if err != nil { + return err + } + } + receiver.startCollecting() + receiver.settings.Logger.Debug("started.") + return nil +} + +func (receiver *logsReceiver) createQueryReceivers() error { + receiver.queryReceivers = nil + for i, query := range receiver.config.Queries { + if len(query.Logs) == 0 { + continue + } + id := fmt.Sprintf("query-%d: %s", i, query.SQL) + queryReceiver := newLogsQueryReceiver( + id, + query, + receiver.createConnection, + receiver.createClient, + receiver.settings.Logger, + receiver.storageClient, + ) + receiver.queryReceivers = append(receiver.queryReceivers, queryReceiver) + } + return nil +} + +func (receiver *logsReceiver) startCollecting() { + receiver.collectionIntervalTicker = time.NewTicker(receiver.config.CollectionInterval) + + go func() { + for { + select { + case <-receiver.collectionIntervalTicker.C: + receiver.collect() + case <-receiver.shutdownRequested: + return + } + } + }() +} + +func (receiver *logsReceiver) collect() { + logsChannel := make(chan plog.Logs) + for _, queryReceiver := range receiver.queryReceivers { + go func(queryReceiver *logsQueryReceiver) { + logs, err := queryReceiver.collect(context.Background()) + if err != nil { + receiver.settings.Logger.Error("error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID())) + } + logsChannel <- logs + }(queryReceiver) + } + + allLogs := plog.NewLogs() + for range receiver.queryReceivers { + logs := <-logsChannel + logs.ResourceLogs().MoveAndAppendTo(allLogs.ResourceLogs()) + } + + logRecordCount := allLogs.LogRecordCount() + if logRecordCount > 0 { + ctx := receiver.obsrecv.StartLogsOp(context.Background()) + err := receiver.nextConsumer.ConsumeLogs(context.Background(), allLogs) + receiver.obsrecv.EndLogsOp(ctx, metadata.Type, logRecordCount, err) + if err != nil { + receiver.settings.Logger.Error("failed to send logs: %w", zap.Error(err)) + } + } +} + +func (receiver *logsReceiver) Shutdown(ctx context.Context) error { + if !receiver.isStarted { + receiver.settings.Logger.Debug("Requested shutdown, but not started, ignoring.") + return nil + } + + receiver.settings.Logger.Debug("stopping...") + receiver.stopCollecting() + for _, queryReceiver := range receiver.queryReceivers { + queryReceiver.shutdown(ctx) + } + + var errors error + if receiver.storageClient != nil { + errors = multierr.Append(errors, receiver.storageClient.Close(ctx)) + } + + receiver.isStarted = false + receiver.settings.Logger.Debug("stopped.") + + return errors +} + +func (receiver *logsReceiver) stopCollecting() { + if receiver.collectionIntervalTicker != nil { + receiver.collectionIntervalTicker.Stop() + } + close(receiver.shutdownRequested) +} + +type logsQueryReceiver struct { + id string + query Query + createDb dbProviderFunc + createClient clientProviderFunc + logger *zap.Logger + + db *sql.DB + client dbClient + trackingValue string + // TODO: Extract persistence into its own component + storageClient storage.Client + trackingValueStorageKey string +} + +func newLogsQueryReceiver( + id string, + query Query, + dbProviderFunc dbProviderFunc, + clientProviderFunc clientProviderFunc, + logger *zap.Logger, + storageClient storage.Client, +) *logsQueryReceiver { + queryReceiver := &logsQueryReceiver{ + id: id, + query: query, + createDb: dbProviderFunc, + createClient: clientProviderFunc, + logger: logger, + storageClient: storageClient, + } + queryReceiver.trackingValue = queryReceiver.query.TrackingStartValue + queryReceiver.trackingValueStorageKey = fmt.Sprintf("%s.%s", queryReceiver.id, "trackingValue") + return queryReceiver +} + +func (queryReceiver *logsQueryReceiver) ID() string { + return queryReceiver.id +} + +func (queryReceiver *logsQueryReceiver) start(ctx context.Context) error { + var err error + queryReceiver.db, err = queryReceiver.createDb() + if err != nil { + return fmt.Errorf("failed to open db connection: %w", err) + } + queryReceiver.client = queryReceiver.createClient(dbWrapper{queryReceiver.db}, queryReceiver.query.SQL, queryReceiver.logger) + + queryReceiver.trackingValue = queryReceiver.retrieveTrackingValue(ctx) + + return nil +} + +// retrieveTrackingValue retrieves the tracking value from storage, if storage is configured. +// Otherwise, it returns the tracking value configured in `tracking_start_value`. +func (queryReceiver *logsQueryReceiver) retrieveTrackingValue(ctx context.Context) string { + trackingValueFromConfig := queryReceiver.query.TrackingStartValue + if queryReceiver.storageClient == nil { + return trackingValueFromConfig + } + + storedTrackingValueBytes, err := queryReceiver.storageClient.Get(ctx, queryReceiver.trackingValueStorageKey) + if err != nil || storedTrackingValueBytes == nil { + return trackingValueFromConfig + } + + return string(storedTrackingValueBytes) + +} + +func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, error) { + logs := plog.NewLogs() + + var rows []stringMap + var err error + if queryReceiver.query.TrackingColumn != "" { + rows, err = queryReceiver.client.queryRows(ctx, queryReceiver.trackingValue) + } else { + rows, err = queryReceiver.client.queryRows(ctx) + } + if err != nil { + return logs, fmt.Errorf("error getting rows: %w", err) + } + + var errs error + scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for logsConfigIndex, logsConfig := range queryReceiver.query.Logs { + for _, row := range rows { + rowToLog(row, logsConfig, scopeLogs.AppendEmpty()) + if logsConfigIndex == 0 { + errs = multierr.Append(errs, queryReceiver.storeTrackingValue(ctx, row)) + } + } + } + return logs, nil +} + +func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, row stringMap) error { + if queryReceiver.query.TrackingColumn == "" { + return nil + } + queryReceiver.trackingValue = row[queryReceiver.query.TrackingColumn] + if queryReceiver.storageClient != nil { + err := queryReceiver.storageClient.Set(ctx, queryReceiver.trackingValueStorageKey, []byte(queryReceiver.trackingValue)) + if err != nil { + return err + } + } + return nil +} + +func rowToLog(row stringMap, config LogsCfg, logRecord plog.LogRecord) { + logRecord.Body().SetStr(row[config.BodyColumn]) +} + +func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) { +} diff --git a/receiver/sqlqueryreceiver/metadata.yaml b/receiver/sqlqueryreceiver/metadata.yaml index ea5adb67c56c..5b0a6e159a6e 100644 --- a/receiver/sqlqueryreceiver/metadata.yaml +++ b/receiver/sqlqueryreceiver/metadata.yaml @@ -4,4 +4,5 @@ status: class: receiver stability: alpha: [metrics] + development: [logs] distributions: [contrib, splunk, observiq, sumo] diff --git a/receiver/sqlqueryreceiver/receiver.go b/receiver/sqlqueryreceiver/receiver.go index 6863e8bebe42..ef28b3c0092f 100644 --- a/receiver/sqlqueryreceiver/receiver.go +++ b/receiver/sqlqueryreceiver/receiver.go @@ -21,7 +21,19 @@ type dbProviderFunc func() (*sql.DB, error) type clientProviderFunc func(db, string, *zap.Logger) dbClient -func createReceiverFunc(sqlOpenerFunc sqlOpenerFunc, clientProviderFunc clientProviderFunc) receiver.CreateMetricsFunc { +func createLogsReceiverFunc(sqlOpenerFunc sqlOpenerFunc, clientProviderFunc clientProviderFunc) receiver.CreateLogsFunc { + return func( + ctx context.Context, + settings receiver.CreateSettings, + config component.Config, + consumer consumer.Logs, + ) (receiver.Logs, error) { + sqlQueryConfig := config.(*Config) + return newLogsReceiver(sqlQueryConfig, settings, sqlOpenerFunc, clientProviderFunc, consumer) + } +} + +func createMetricsReceiverFunc(sqlOpenerFunc sqlOpenerFunc, clientProviderFunc clientProviderFunc) receiver.CreateMetricsFunc { return func( ctx context.Context, settings receiver.CreateSettings, @@ -31,6 +43,9 @@ func createReceiverFunc(sqlOpenerFunc sqlOpenerFunc, clientProviderFunc clientPr sqlCfg := cfg.(*Config) var opts []scraperhelper.ScraperControllerOption for i, query := range sqlCfg.Queries { + if len(query.Metrics) == 0 { + continue + } id := component.NewIDWithName("sqlqueryreceiver", fmt.Sprintf("query-%d: %s", i, query.SQL)) mp := &scraper{ id: id, diff --git a/receiver/sqlqueryreceiver/receiver_test.go b/receiver/sqlqueryreceiver/receiver_test.go index 8a111fa28267..f4cfd4b34bdf 100644 --- a/receiver/sqlqueryreceiver/receiver_test.go +++ b/receiver/sqlqueryreceiver/receiver_test.go @@ -17,8 +17,34 @@ import ( "go.uber.org/zap" ) -func TestCreateReceiver(t *testing.T) { - createReceiver := createReceiverFunc(fakeDBConnect, mkFakeClient) +func TestCreateLogsReceiver(t *testing.T) { + createReceiver := createLogsReceiverFunc(fakeDBConnect, mkFakeClient) + ctx := context.Background() + receiver, err := createReceiver( + ctx, + receivertest.NewNopCreateSettings(), + &Config{ + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ + CollectionInterval: 10 * time.Second, + }, + Driver: "mydriver", + DataSource: "my-datasource", + Queries: []Query{{ + SQL: "select * from foo", + Logs: []LogsCfg{ + {}, + }, + }}, + }, + consumertest.NewNop(), + ) + require.NoError(t, err) + err = receiver.Start(ctx, componenttest.NewNopHost()) + require.NoError(t, err) +} + +func TestCreateMetricsReceiver(t *testing.T) { + createReceiver := createMetricsReceiverFunc(fakeDBConnect, mkFakeClient) ctx := context.Background() receiver, err := createReceiver( ctx, diff --git a/receiver/sqlqueryreceiver/row_scanner.go b/receiver/sqlqueryreceiver/row_scanner.go index 9ab685ce07ab..99c71ac0299a 100644 --- a/receiver/sqlqueryreceiver/row_scanner.go +++ b/receiver/sqlqueryreceiver/row_scanner.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "reflect" + "time" "go.uber.org/multierr" ) @@ -30,6 +31,9 @@ func newRowScanner(colTypes []colType) *rowScanner { return "", errNullValueWarning } format := "%v" + if t, isTime := v.(time.Time); isTime { + return t.Format(time.RFC3339), nil + } if reflect.TypeOf(v).Kind() == reflect.Slice { // The Postgres driver returns a []uint8 (ascii string) for decimal and numeric types, // which we want to render as strings. e.g. "4.1" instead of "[52, 46, 49]". diff --git a/receiver/sqlqueryreceiver/scraper.go b/receiver/sqlqueryreceiver/scraper.go index 767b6cda1efb..3ad80d028ad0 100644 --- a/receiver/sqlqueryreceiver/scraper.go +++ b/receiver/sqlqueryreceiver/scraper.go @@ -51,7 +51,7 @@ func (s *scraper) Start(context.Context, component.Host) error { func (s *scraper) Scrape(ctx context.Context) (pmetric.Metrics, error) { out := pmetric.NewMetrics() - rows, err := s.client.metricRows(ctx) + rows, err := s.client.queryRows(ctx) if err != nil { if errors.Is(err, errNullValueWarning) { s.logger.Warn("problems encountered getting metric rows", zap.Error(err)) @@ -81,5 +81,8 @@ func (s *scraper) Scrape(ctx context.Context) (pmetric.Metrics, error) { } func (s *scraper) Shutdown(_ context.Context) error { - return s.db.Close() + if s.db != nil { + return s.db.Close() + } + return nil } diff --git a/receiver/sqlqueryreceiver/testdata/config-invalid-missing-metrics.yaml b/receiver/sqlqueryreceiver/testdata/config-invalid-missing-logs-metrics.yaml similarity index 100% rename from receiver/sqlqueryreceiver/testdata/config-invalid-missing-metrics.yaml rename to receiver/sqlqueryreceiver/testdata/config-invalid-missing-logs-metrics.yaml diff --git a/receiver/sqlqueryreceiver/testdata/config-logs-missing-body-column.yaml b/receiver/sqlqueryreceiver/testdata/config-logs-missing-body-column.yaml new file mode 100644 index 000000000000..3d52afa9fad5 --- /dev/null +++ b/receiver/sqlqueryreceiver/testdata/config-logs-missing-body-column.yaml @@ -0,0 +1,8 @@ +sqlquery: + collection_interval: 10s + driver: mydriver + datasource: "host=localhost port=5432 user=me password=s3cr3t sslmode=disable" + queries: + - sql: "select * from test_logs" + logs: + - {} diff --git a/receiver/sqlqueryreceiver/testdata/config-logs.yaml b/receiver/sqlqueryreceiver/testdata/config-logs.yaml new file mode 100644 index 000000000000..898510a6516c --- /dev/null +++ b/receiver/sqlqueryreceiver/testdata/config-logs.yaml @@ -0,0 +1,10 @@ +sqlquery: + collection_interval: 10s + driver: mydriver + datasource: "host=localhost port=5432 user=me password=s3cr3t sslmode=disable" + queries: + - sql: "select * from test_logs where log_id > ?" + tracking_start_value: 10 + tracking_column: log_id + logs: + - body_column: log_body diff --git a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql index ad57610b4bf1..d8b0bc4905d9 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql @@ -17,3 +17,18 @@ insert into movie (title, genre, imdb_rating) values ('Die Hard', 'Action', 8.2); insert into movie (title, genre, imdb_rating) values ('Mission Impossible', 'Action', 7.1); + +create table simple_logs +( + id integer, + insert_time timestamp, + body text, + primary key (id) +); + +insert into simple_logs (id, insert_time, body) values +(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'), +(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'), +(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'), +(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'), +(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); diff --git a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql index 178e6c160213..c3581a8b8cbb 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql @@ -22,3 +22,22 @@ alter session set "_ORACLE_SCRIPT"=true; CREATE USER OTEL IDENTIFIED BY "p@ssw%rd"; GRANT CREATE SESSION TO OTEL; GRANT ALL ON movie TO OTEL; + +create table simple_logs +( + id number primary key, + insert_time timestamp with time zone, + body varchar2(4000) +); +grant select on simple_logs to otel; + +insert into simple_logs (id, insert_time, body) values +(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'); +insert into simple_logs (id, insert_time, body) values +(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'); +insert into simple_logs (id, insert_time, body) values +(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'); +insert into simple_logs (id, insert_time, body) values +(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'); +insert into simple_logs (id, insert_time, body) values +(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); diff --git a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql index 65c6d6563d1a..5f0ebfd2bed0 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql @@ -20,3 +20,19 @@ insert into movie (title, genre, imdb_rating) values ('Mission Impossible', 'Action', 7.1); grant select on movie to otel; + +create table simple_logs +( + id integer primary key, + insert_time timestamp, + body text +); +grant select, insert on simple_logs to otel; + +insert into simple_logs (id, insert_time, body) values +(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'), +(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'), +(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'), +(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'), +(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); +