Skip to content

Commit

Permalink
[receiver/sqlquery] support attributes for logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Grandys committed Aug 11, 2024
1 parent d46a7c3 commit 7e3252e
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 39 deletions.
27 changes: 27 additions & 0 deletions .chloggen/sqlqueryreceiver-logs-attrbutes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support populating log attributes from sql query

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24459]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 2 additions & 1 deletion internal/sqlquery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func (q Query) Validate() error {
}

type LogsCfg struct {
BodyColumn string `mapstructure:"body_column"`
BodyColumn string `mapstructure:"body_column"`
AttributeColumns []string `mapstructure:"attribute_columns"`
}

func (config LogsCfg) Validate() error {
Expand Down
4 changes: 2 additions & 2 deletions receiver/sqlqueryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Additionally, each `query` section supports the following properties:
See the below section [Tracking processed results](#tracking-processed-results).
- `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter.
See the below section [Tracking processed results](#tracking-processed-results).
- `attribute_columns`(optional): a list of column names in the returned dataset used to set attributes on the signal.
These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB).

Example:

Expand Down Expand Up @@ -104,8 +106,6 @@ Each _metric_ in the configuration will produce one OTel metric per row returned
- `metric_name`(required): the name assigned to the OTel metric.
- `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint.
This may be case-sensitive, depending on the driver (e.g. Oracle DB).
- `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint.
These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB).
- `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`.
- `value_type` (optional): can be `int` or `double`; defaults to `int`.
- `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over
Expand Down
35 changes: 25 additions & 10 deletions receiver/sqlqueryreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) {
SQL: "select * from simple_logs where id > $1",
Logs: []sqlquery.LogsCfg{
{
BodyColumn: "body",
BodyColumn: "body",
AttributeColumns: []string{"attribute"},
},
},
TrackingColumn: "id",
Expand Down Expand Up @@ -93,7 +94,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) {
SQL: "select * from simple_logs where id > $1",
Logs: []sqlquery.LogsCfg{
{
BodyColumn: "body",
BodyColumn: "body",
AttributeColumns: []string{"attribute"},
},
},
TrackingColumn: "id",
Expand Down Expand Up @@ -145,7 +147,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
SQL: "select * from simple_logs where id > $1",
Logs: []sqlquery.LogsCfg{
{
BodyColumn: "body",
BodyColumn: "body",
AttributeColumns: []string{"attribute"},
},
},
TrackingColumn: "id",
Expand Down Expand Up @@ -187,7 +190,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
SQL: "select * from simple_logs where id > $1",
Logs: []sqlquery.LogsCfg{
{
BodyColumn: "body",
BodyColumn: "body",
AttributeColumns: []string{"attribute"},
},
},
TrackingColumn: "id",
Expand Down Expand Up @@ -220,7 +224,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
SQL: "select * from simple_logs where id > $1",
Logs: []sqlquery.LogsCfg{
{
BodyColumn: "body",
BodyColumn: "body",
AttributeColumns: []string{"attribute"},
},
},
TrackingColumn: "id",
Expand Down Expand Up @@ -316,7 +321,7 @@ func printLogs(allLogs []plog.Logs) {

func insertPostgresSimpleLogs(t *testing.T, container testcontainers.Container, existingLogID, newLogCount int) {
for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ {
query := fmt.Sprintf("insert into simple_logs (id, insert_time, body) values (%d, now(), 'another log %d');", newLogID, newLogID)
query := fmt.Sprintf("insert into simple_logs (id, insert_time, body, attribute) values (%d, now(), 'another log %d', 'TLSv1.2');", newLogID, newLogID)
returnValue, returnMessageReader, err := container.Exec(context.Background(), []string{
"psql", "-U", "otel", "-c", query,
})
Expand Down Expand Up @@ -610,15 +615,25 @@ func testAllSimpleLogs(t *testing.T, logs []plog.Logs) {
assert.Equal(t, 1, len(logs))
assert.Equal(t, 1, logs[0].ResourceLogs().Len())
assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len())
expectedEntries := []string{
expectedLogBodies := []string{
"- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6197 4 \"-\" \"-\" 445af8e6c428303f -",
"- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6205 5 \"-\" \"-\" 3285f43cd4baa202 -",
"- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -",
"- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -",
"- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -",
}
assert.Equal(t, len(expectedEntries), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len())
for i := range expectedEntries {
assert.Equal(t, expectedEntries[i], logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().Str())
expectedLogAttributes := []string{
"TLSv1.2",
"TLSv1",
"TLSv1.2",
"TLSv1",
"TLSv1.2",
}
assert.Equal(t, len(expectedLogBodies), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len())
for i := range expectedLogBodies {
logRecord := logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i)
assert.Equal(t, expectedLogBodies[i], logRecord.Body().Str())
logAttribute, _ := logRecord.Attributes().Get("attribute")
assert.Equal(t, expectedLogAttributes[i], logAttribute.Str())
}
}
13 changes: 11 additions & 2 deletions receiver/sqlqueryreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs,
for logsConfigIndex, logsConfig := range queryReceiver.query.Logs {
for _, row := range rows {
logRecord := scopeLogs.AppendEmpty()
rowToLog(row, logsConfig, logRecord)
errs = append(errs, rowToLog(row, logsConfig, logRecord))
logRecord.SetObservedTimestamp(observedAt)
if logsConfigIndex == 0 {
errs = append(errs, queryReceiver.storeTrackingValue(ctx, row))
Expand All @@ -315,8 +315,17 @@ func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context,
return nil
}

func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) {
func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) error {
logRecord.Body().SetStr(row[config.BodyColumn])
attrs := logRecord.Attributes()
for _, columnName := range config.AttributeColumns {
if attrVal, found := row[columnName]; found {
attrs.PutStr(columnName, attrVal)
} else {
return fmt.Errorf("rowToLog: attribute_column not found: '%s'", columnName)
}
}
return nil
}

func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error {
Expand Down
13 changes: 7 additions & 6 deletions receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ create table simple_logs
id integer,
insert_time timestamp,
body text,
attribute text,
primary key (id)
);

insert into simple_logs (id, insert_time, body) values
(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'),
(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'),
(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'),
(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'),
(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -');
insert into simple_logs (id, insert_time, body, attribute) values
(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'),
(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'),
(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'),
(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'),
(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');
23 changes: 12 additions & 11 deletions receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ create table simple_logs
(
id number primary key,
insert_time timestamp with time zone,
body varchar2(4000)
body varchar2(4000),
attribute varchar2(100)
);
grant select on simple_logs to otel;

insert into simple_logs (id, insert_time, body) values
(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -');
insert into simple_logs (id, insert_time, body) values
(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -');
insert into simple_logs (id, insert_time, body) values
(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -');
insert into simple_logs (id, insert_time, body) values
(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -');
insert into simple_logs (id, insert_time, body) values
(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -');
insert into simple_logs (id, insert_time, body, attribute) values
(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2');
insert into simple_logs (id, insert_time, body, attribute) values
(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1');
insert into simple_logs (id, insert_time, body, attribute) values
(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2');
insert into simple_logs (id, insert_time, body, attribute) values
(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1');
insert into simple_logs (id, insert_time, body, attribute) values
(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ create table simple_logs
(
id integer primary key,
insert_time timestamp,
body text
body text,
attribute text
);
grant select, insert on simple_logs to otel;

insert into simple_logs (id, insert_time, body) values
(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'),
(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'),
(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'),
(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'),
(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -');
insert into simple_logs (id, insert_time, body, attribute) values
(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'),
(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'),
(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'),
(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'),
(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');

0 comments on commit 7e3252e

Please sign in to comment.