Skip to content

Commit

Permalink
[receiver/sqlquery] Add support of optional Start and Stop Timestamp (#…
Browse files Browse the repository at this point in the history
…19160)

In this PR, support for Start and Stop Timestamp to be picked up from
the metrics stored in the rows is added. This will allow SQL queries to
be more expressive and allows users to define the period of aggregation
which is not only dependent on the default system time set upon
processing.

**Link to tracking Issue:** 
#18925 
#14146

---------

Co-authored-by: Juraci Paixão Kröhling <juraci.github@kroehling.de>
  • Loading branch information
Pranav-SA and jpkrohling authored Jun 27, 2023
1 parent 1edf7c9 commit c4dd245
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 1 deletion.
16 changes: 16 additions & 0 deletions .chloggen/add-start-end-ts-sqlquery-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support of Start and End Timestamp Column in Metric Configuration.

# One or more tracking issues related to the change
issues: [18925, 14146]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
7 changes: 6 additions & 1 deletion receiver/sqlqueryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ Each _metric_ in the configuration will produce one OTel metric per row returned
to `cumulative`.
- `description` (optional): the description applied to the metric.
- `unit` (optional): the units applied to the metric.
- `static_attributes` (optional): static attributes applied to the metrics
- `static_attributes` (optional): static attributes applied to the metrics.
- `start_ts_column` (optional): the name of the column containing the start timestamp, the value of which is applied to
the metric's start timestamp (otherwise the current time is used). Only applies if the metric is of type cumulative
sum.
- `ts_column` (optional): the name of the column containing the timestamp, the value of which is applied to the
metric's timestamp. This can be current timestamp depending upon the time of last recorded metric's datapoint.

### Example

Expand Down
2 changes: 2 additions & 0 deletions receiver/sqlqueryreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type MetricCfg struct {
Unit string `mapstructure:"unit"`
Description string `mapstructure:"description"`
StaticAttributes map[string]string `mapstructure:"static_attributes"`
StartTsColumn string `mapstructure:"start_ts_column"`
TsColumn string `mapstructure:"ts_column"`
}

func (c MetricCfg) Validate() error {
Expand Down
22 changes: 22 additions & 0 deletions receiver/sqlqueryreceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,28 @@ func rowToMetric(row stringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc
dest.SetUnit(cfg.Unit)
dataPointSlice := setMetricFields(cfg, dest)
dataPoint := dataPointSlice.AppendEmpty()
if cfg.StartTsColumn != "" {
if val, found := row[cfg.StartTsColumn]; found {
timestamp, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err)
}
startTime = pcommon.Timestamp(timestamp)
} else {
return fmt.Errorf("rowToMetric: start_ts_column not found")
}
}
if cfg.TsColumn != "" {
if val, found := row[cfg.TsColumn]; found {
timestamp, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err)
}
ts = pcommon.Timestamp(timestamp)
} else {
return fmt.Errorf("rowToMetric: ts_column not found")
}
}
setTimestamp(cfg, dataPoint, startTime, ts, scrapeCfg)
value, found := row[cfg.ValueColumn]
if !found {
Expand Down
82 changes: 82 additions & 0 deletions receiver/sqlqueryreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.uber.org/zap"
Expand Down Expand Up @@ -398,3 +399,84 @@ func TestScraper_FakeDB_MultiRows_Error(t *testing.T) {
assert.Error(t, err)
assert.True(t, scrapererror.IsPartialScrapeError(err))
}

func TestScraper_StartAndTSColumn(t *testing.T) {
client := &fakeDBClient{
stringMaps: [][]stringMap{{
{
"mycol": "42",
"StartTs": "1682417791",
"Ts": "1682418264",
},
}},
}
scrpr := scraper{
client: client,
query: Query{
Metrics: []MetricCfg{{
MetricName: "my.name",
ValueColumn: "mycol",
TsColumn: "Ts",
StartTsColumn: "StartTs",
DataType: MetricTypeSum,
Aggregation: MetricAggregationCumulative,
}},
},
}
metrics, err := scrpr.Scrape(context.Background())
require.NoError(t, err)
metric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
assert.Equal(t, pcommon.Timestamp(1682417791), metric.Sum().DataPoints().At(0).StartTimestamp())
assert.Equal(t, pcommon.Timestamp(1682418264), metric.Sum().DataPoints().At(0).Timestamp())
}

func TestScraper_StartAndTS_ErrorOnColumnNotFound(t *testing.T) {
client := &fakeDBClient{
stringMaps: [][]stringMap{{
{
"mycol": "42",
"StartTs": "1682417791",
},
}},
}
scrpr := scraper{
client: client,
query: Query{
Metrics: []MetricCfg{{
MetricName: "my.name",
ValueColumn: "mycol",
TsColumn: "Ts",
StartTsColumn: "StartTs",
DataType: MetricTypeSum,
Aggregation: MetricAggregationCumulative,
}},
},
}
_, err := scrpr.Scrape(context.Background())
assert.Error(t, err)
}

func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) {
client := &fakeDBClient{
stringMaps: [][]stringMap{{
{
"mycol": "42",
"StartTs": "blah",
},
}},
}
scrpr := scraper{
client: client,
query: Query{
Metrics: []MetricCfg{{
MetricName: "my.name",
ValueColumn: "mycol",
StartTsColumn: "StartTs",
DataType: MetricTypeSum,
Aggregation: MetricAggregationCumulative,
}},
},
}
_, err := scrpr.Scrape(context.Background())
assert.Error(t, err)
}

0 comments on commit c4dd245

Please sign in to comment.