Skip to content

Commit

Permalink
Merge pull request open-telemetry#6 from dmolenda-sumo/persistence
Browse files Browse the repository at this point in the history
feat: persist tracking value in storage
  • Loading branch information
andrzej-stencel authored May 16, 2023
2 parents a63f68b + 8268a3b commit 55c8add
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 89 deletions.
16 changes: 10 additions & 6 deletions receiver/sqlqueryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ The configuration supports the following top-level fields:
e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_
- `queries`(required): A list of queries, where a query is a sql statement and one or more `logs` and/or `metrics` sections (details below).
- `collection_interval`(optional): The time interval between query executions. Defaults to _10s_.
- `storage` (optional, default `""`): The ID of a [storage][storage_extension] extension to be used to [track processed results](#tracking-processed-results).

[storage_extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage

### Queries

Expand All @@ -35,10 +38,10 @@ are quite different.

Additionally, each `query` section supports the following properties:

- `tracking_column` (optional, default "") Applies only to logs. In case of a parameterized query,
- `tracking_column` (optional, default `""`) Applies only to logs. In case of a parameterized query,
defines the column to retrieve the value of the parameter on subsequent query runs.
See the below section [Tracking processed results](#tracking-processed-results).
- `tracking_start_value` (optional, default 0) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter.
- `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter.
See the below section [Tracking processed results](#tracking-processed-results).

Example:
Expand All @@ -50,7 +53,7 @@ receivers:
datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable"
queries:
- sql: "select * from my_logs where log_id > $1"
tracking_start_value: 10000
tracking_start_value: "10000"
tracking_column: log_id
logs:
- body_column: log_body
Expand All @@ -73,9 +76,9 @@ the receiver will run the same query every collection interval, which can cause
over and over again, unless there's an external actor removing the old rows from the `my_logs` table.

To prevent reading the same rows on every collection interval, use a parameterized query like `select * from my_logs where id_column > ?`,
together with the `tracking_start_value` configuration property that specifies the initial value for the parameter.
together with the `tracking_start_value` and `tracking_column` configuration properties.
The receiver will use the configured `tracking_start_value` as the value for the query parameter when running the query for the first time.
On each query run, the receiver will retrieve the last value from the `tracking_column` from the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value.
After each query run, the receiver will store the value of the `tracking_column` from the last row of the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value.

Note that the notation for the parameter depends on the database backend. For example in MySQL this is `?`, in PostgreSQL this is `$1`, in Oracle this is any string identifier starting with a colon `:`, for example `:my_parameter`.

Expand Down Expand Up @@ -107,9 +110,10 @@ receivers:
sqlquery:
driver: postgres
datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable"
storage: file_storage
queries:
- sql: "select * from my_logs where log_id > $1"
tracking_start_value: 10000
tracking_start_value: "10000"
tracking_column: log_id
logs:
- body_column: log_body
Expand Down
7 changes: 4 additions & 3 deletions receiver/sqlqueryreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

type Config struct {
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
Driver string `mapstructure:"driver"`
DataSource string `mapstructure:"datasource"`
Queries []Query `mapstructure:"queries"`
Driver string `mapstructure:"driver"`
DataSource string `mapstructure:"datasource"`
Queries []Query `mapstructure:"queries"`
StorageID *component.ID `mapstructure:"storage"`
}

func (c Config) Validate() error {
Expand Down
3 changes: 0 additions & 3 deletions receiver/sqlqueryreceiver/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,3 @@ func (cl dbSQLClient) queryRows(ctx context.Context, args ...any) ([]stringMap,
}
return out, warnings
}

// getRowsSinceId(ctx context.Context, id int)
// getRowsSinceTimestamp(ctx context.Context, timestamp time.Time)
11 changes: 9 additions & 2 deletions receiver/sqlqueryreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ require (
github.com/docker/go-connections v0.4.0
github.com/go-sql-driver/mysql v1.7.1
github.com/lib/pq v1.10.9
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.77.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.77.0
github.com/sijms/go-ora/v2 v2.7.4
github.com/snowflakedb/gosnowflake v1.6.18
github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go v0.20.1
go.opentelemetry.io/collector v0.77.0
go.opentelemetry.io/collector/component v0.77.0
go.opentelemetry.io/collector/confmap v0.77.0
go.opentelemetry.io/collector/consumer v0.77.0
Expand All @@ -28,6 +31,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/antonmedv/expr v1.12.5 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect
Expand All @@ -43,6 +47,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.6.19 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
Expand Down Expand Up @@ -78,6 +83,8 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/observiq/ctimefmt v1.0.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
Expand All @@ -87,7 +94,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.77.0 // indirect
go.opentelemetry.io/collector/exporter v0.77.0 // indirect
go.opentelemetry.io/collector/featuregate v0.77.0 // indirect
go.opentelemetry.io/otel v1.15.1 // indirect
Expand All @@ -102,7 +108,8 @@ require (
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
gonum.org/v1/gonum v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
19 changes: 16 additions & 3 deletions receiver/sqlqueryreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 55c8add

Please sign in to comment.