From acda1638ca5c054d8ce879a0972e4beb6f1b7ea5 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Date: Sat, 20 Feb 2021 18:55:26 +0100 Subject: [PATCH] [sumologicexporter] Enable metrics pipeline (#2117) * [sumologicexporter] Rename logs related buffer names to include log string Signed-off-by: Dominik Rosiek * [sumologicexporter] Add metrics support to sender Signed-off-by: Dominik Rosiek * [sumologicexporter] Initialise prometheusFormatter in exporter * [sumologicexporter] Use prometheusFormatter in sendMetrics * [sumologicexporter] Add support for PrometheusFormat in sender * [sumologicexporter] Add tests for sendMetrics and fix sender Signed-off-by: Dominik Rosiek * [sumologicexporter] Add and enable metrics for exporter Signed-off-by: Dominik Rosiek * [sumologicexporter] Use consts for sender Signed-off-by: Dominik Rosiek * [sumologicexporter] Skip some metrics tests due to time factor * [sumologicexporter] Set prometheus as default metrics format Signed-off-by: Dominik Rosiek * Fix default value used in tests Co-authored-by: Przemek Maciolek --- exporter/sumologicexporter/README.md | 3 +- exporter/sumologicexporter/config.go | 4 +- exporter/sumologicexporter/exporter.go | 142 ++++++++- exporter/sumologicexporter/exporter_test.go | 198 +++++++++++++ exporter/sumologicexporter/factory.go | 14 + exporter/sumologicexporter/factory_test.go | 2 +- .../sumologicexporter/prometheus_formatter.go | 6 +- .../prometheus_formatter_test.go | 37 ++- exporter/sumologicexporter/sender.go | 188 +++++++++--- exporter/sumologicexporter/sender_test.go | 277 +++++++++++++++--- exporter/sumologicexporter/test_data.go | 12 + 11 files changed, 768 insertions(+), 115 deletions(-) diff --git a/exporter/sumologicexporter/README.md b/exporter/sumologicexporter/README.md index fdc154dff74e..70c6843c61ec 100644 --- a/exporter/sumologicexporter/README.md +++ b/exporter/sumologicexporter/README.md @@ -10,7 +10,8 @@ Empty string means no compression - `max_request_body_size` (optional): Max HTTP request body size in bytes before compression (if applied). By default `1_048_576` (1MB) is used. - `metadata_attributes` (optional): List of regexes for attributes which should be send as metadata - `log_format` (optional) (logs only): Format to use when sending logs to Sumo. (default `json`) (possible values: `json`, `text`) -- `metric_format` (optional) (metrics only): Format of the metrics to be sent, either graphite, carbon2 or prometheus (default is carbon2). +- `metric_format` (optional) (metrics only): Format of the metrics to be sent (default is `prometheus`). + `carbon2` and `graphite` are going to be supported soon. - `source_category` (optional): Desired source category. Useful if you want to override the source category configured for the source. - `source_name` (optional): Desired source name. Useful if you want to override the source name configured for the source. - `source_host` (optional): Desired host name. Useful if you want to override the source host configured for the source. diff --git a/exporter/sumologicexporter/config.go b/exporter/sumologicexporter/config.go index f8639b792629..db1de62df95a 100644 --- a/exporter/sumologicexporter/config.go +++ b/exporter/sumologicexporter/config.go @@ -43,7 +43,7 @@ type Config struct { LogFormat LogFormatType `mapstructure:"log_format"` // Metrics related configuration - // The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is carbon2) + // The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is prometheus) MetricFormat MetricFormatType `mapstructure:"metric_format"` // List of regexes for attributes which should be send as metadata @@ -114,7 +114,7 @@ const ( // DefaultLogFormat defines default LogFormat DefaultLogFormat LogFormatType = JSONFormat // DefaultMetricFormat defines default MetricFormat - DefaultMetricFormat MetricFormatType = Carbon2Format + DefaultMetricFormat MetricFormatType = PrometheusFormat // DefaultSourceCategory defines default SourceCategory DefaultSourceCategory string = "" // DefaultSourceName defines default SourceName diff --git a/exporter/sumologicexporter/exporter.go b/exporter/sumologicexporter/exporter.go index d89d8674dac5..ddae45963247 100644 --- a/exporter/sumologicexporter/exporter.go +++ b/exporter/sumologicexporter/exporter.go @@ -28,10 +28,11 @@ import ( ) type sumologicexporter struct { - sources sourceFormats - config *Config - client *http.Client - filter filter + sources sourceFormats + config *Config + client *http.Client + filter filter + prometheusFormatter prometheusFormatter } func initExporter(cfg *Config) (*sumologicexporter, error) { @@ -72,16 +73,22 @@ func initExporter(cfg *Config) (*sumologicexporter, error) { return nil, err } + pf, err := newPrometheusFormatter() + if err != nil { + return nil, err + } + httpClient, err := cfg.HTTPClientSettings.ToClient() if err != nil { return nil, fmt.Errorf("failed to create HTTP Client: %w", err) } se := &sumologicexporter{ - config: cfg, - sources: sfs, - client: httpClient, - filter: f, + config: cfg, + sources: sfs, + client: httpClient, + filter: f, + prometheusFormatter: pf, } return se, nil @@ -108,6 +115,27 @@ func newLogsExporter( ) } +func newMetricsExporter( + cfg *Config, + params component.ExporterCreateParams, +) (component.MetricsExporter, error) { + se, err := initExporter(cfg) + if err != nil { + return nil, err + } + + return exporterhelper.NewMetricsExporter( + cfg, + params.Logger, + se.pushMetricsData, + // Disable exporterhelper Timeout, since we are using a custom mechanism + // within exporter itself + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithRetry(cfg.RetrySettings), + exporterhelper.WithQueue(cfg.QueueSettings), + ) +} + // pushLogsData groups data with common metadata and sends them as separate batched requests. // It returns the number of unsent logs and an error which contains a list of dropped records // so they can be handled by OTC retry mechanism @@ -124,7 +152,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i if err != nil { return 0, consumererror.PartialLogsError(fmt.Errorf("failed to initialize compressor: %w", err), ld) } - sdr := newSender(se.config, se.client, se.filter, se.sources, c) + sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter) // Iterate over ResourceLogs rls := ld.ResourceLogs() @@ -159,7 +187,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i errs = append(errs, err) droppedRecords = append(droppedRecords, dropped...) } - sdr.cleanBuffer() + sdr.cleanLogsBuffer() } // assign metadata @@ -167,7 +195,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i // add log to the buffer var dropped []pdata.LogRecord - dropped, err = sdr.batch(ctx, log, previousMetadata) + dropped, err = sdr.batchLog(ctx, log, previousMetadata) if err != nil { droppedRecords = append(droppedRecords, dropped...) errs = append(errs, err) @@ -202,3 +230,95 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i return 0, nil } + +// pushMetricsData groups data with common metadata and send them as separate batched requests +// it returns number of unsent metrics and error which contains list of dropped records +// so they can be handle by the OTC retry mechanism +func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pdata.Metrics) (int, error) { + var ( + currentMetadata fields + previousMetadata fields + errs []error + droppedRecords []metricPair + attributes pdata.AttributeMap + ) + + c, err := newCompressor(se.config.CompressEncoding) + if err != nil { + return 0, consumererror.PartialMetricsError(fmt.Errorf("failed to initialize compressor: %w", err), md) + } + sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter) + + // Iterate over ResourceMetrics + rms := md.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + + attributes = rm.Resource().Attributes() + + // iterate over InstrumentationLibraryMetrics + ilms := rm.InstrumentationLibraryMetrics() + for j := 0; j < ilms.Len(); j++ { + ilm := ilms.At(j) + + // iterate over Metrics + ms := ilm.Metrics() + for k := 0; k < ms.Len(); k++ { + m := ms.At(k) + mp := metricPair{ + metric: m, + attributes: attributes, + } + + currentMetadata = sdr.filter.filterIn(attributes) + + // If metadata differs from currently buffered, flush the buffer + if currentMetadata.string() != previousMetadata.string() && previousMetadata.string() != "" { + var dropped []metricPair + dropped, err = sdr.sendMetrics(ctx, previousMetadata) + if err != nil { + errs = append(errs, err) + droppedRecords = append(droppedRecords, dropped...) + } + sdr.cleanMetricBuffer() + } + + // assign metadata + previousMetadata = currentMetadata + var dropped []metricPair + // add metric to the buffer + dropped, err = sdr.batchMetric(ctx, mp, currentMetadata) + if err != nil { + droppedRecords = append(droppedRecords, dropped...) + errs = append(errs, err) + } + } + } + } + + // Flush pending metrics + dropped, err := sdr.sendMetrics(ctx, previousMetadata) + if err != nil { + droppedRecords = append(droppedRecords, dropped...) + errs = append(errs, err) + } + + if len(droppedRecords) > 0 { + // Move all dropped records to Metrics + droppedMetrics := pdata.NewMetrics() + rms := droppedMetrics.ResourceMetrics() + rms.Resize(len(droppedRecords)) + for num, record := range droppedRecords { + rm := droppedMetrics.ResourceMetrics().At(num) + record.attributes.CopyTo(rm.Resource().Attributes()) + + ilms := rm.InstrumentationLibraryMetrics() + ilms.Resize(1) + ilms.At(0).Metrics().Append(record.metric) + } + + return len(droppedRecords), consumererror.PartialMetricsError(componenterror.CombineErrors(errs), droppedMetrics) + } + + return 0, nil +} diff --git a/exporter/sumologicexporter/exporter_test.go b/exporter/sumologicexporter/exporter_test.go index 63454287e769..660e8eaf69a2 100644 --- a/exporter/sumologicexporter/exporter_test.go +++ b/exporter/sumologicexporter/exporter_test.go @@ -277,6 +277,7 @@ func TestPushFailedBatch(t *testing.T) { defer func() { test.srv.Close() }() logs := LogRecordsToLogs(exampleLog()) + logs.ResourceLogs().Resize(maxBufferSize + 1) log := logs.ResourceLogs().At(0) for i := 0; i < maxBufferSize; i++ { @@ -287,3 +288,200 @@ func TestPushFailedBatch(t *testing.T) { assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") assert.Equal(t, maxBufferSize, count) } + +func TestAllMetricsSuccess(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000 +gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }) + defer func() { test.srv.Close() }() + test.exp.config.MetricFormat = PrometheusFormat + + metrics := metricPairToMetrics([]metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + }) + + _, err := test.exp.pushMetricsData(context.Background(), metrics) + assert.NoError(t, err) +} + +func TestAllMetricsFailed(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000 +gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }) + defer func() { test.srv.Close() }() + test.exp.config.MetricFormat = PrometheusFormat + + metrics := metricPairToMetrics([]metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + }) + + dropped, err := test.exp.pushMetricsData(context.Background(), metrics) + assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.Equal(t, 2, dropped) + + partial, ok := err.(consumererror.PartialError) + require.True(t, ok) + assert.Equal(t, metrics, partial.GetMetrics()) +} + +func TestMetricsPartiallyFailed(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000` + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }) + defer func() { test.srv.Close() }() + test.exp.config.MetricFormat = PrometheusFormat + test.exp.config.MaxRequestBodySize = 1 + + records := []metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + } + metrics := metricPairToMetrics(records) + expected := metricPairToMetrics(records[:1]) + + dropped, err := test.exp.pushMetricsData(context.Background(), metrics) + assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.Equal(t, 1, dropped) + + partial, ok := err.(consumererror.PartialError) + require.True(t, ok) + assert.Equal(t, expected, partial.GetMetrics()) +} + +func TestPushMetricsInvalidCompressor(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, `Example log`, body) + assert.Equal(t, "", req.Header.Get("X-Sumo-Fields")) + }, + }) + defer func() { test.srv.Close() }() + + metrics := metricPairToMetrics([]metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + }) + + test.exp.config.CompressEncoding = "invalid" + + _, err := test.exp.pushMetricsData(context.Background(), metrics) + assert.EqualError(t, err, "failed to initialize compressor: invalid format: invalid") +} + +func TestMetricsDifferentMetadata(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value",key1="value1"} 14500 1605534165000` + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `gauge_metric_name{foo="bar",key2="value2",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",key2="value2",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }) + defer func() { test.srv.Close() }() + test.exp.config.MetricFormat = PrometheusFormat + test.exp.config.MaxRequestBodySize = 1 + + f, err := newFilter([]string{`key\d`}) + require.NoError(t, err) + test.exp.filter = f + + records := []metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + } + + records[0].attributes.InsertString("key1", "value1") + records[1].attributes.InsertString("key2", "value2") + + metrics := metricPairToMetrics(records) + expected := metricPairToMetrics(records[:1]) + + dropped, err := test.exp.pushMetricsData(context.Background(), metrics) + assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.Equal(t, 1, dropped) + + partial, ok := err.(consumererror.PartialError) + require.True(t, ok) + assert.Equal(t, expected, partial.GetMetrics()) +} + +func TestPushMetricsFailedBatch(t *testing.T) { + t.Skip("Skip test due to prometheus format complexity. Execution can take over 30s") + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + body := extractBody(t, req) + + expected := fmt.Sprintf( + "%s%s", + strings.Repeat("test_metric_data{test=\"test_value\",test2=\"second_value\"} 14500 1605534165000\n", maxBufferSize-1), + `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000`, + ) + + assert.Equal(t, expected, body) + }, + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(200) + body := extractBody(t, req) + + assert.Equal(t, `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000`, body) + }, + }) + defer func() { test.srv.Close() }() + test.exp.config.MetricFormat = PrometheusFormat + test.exp.config.MaxRequestBodySize = 1024 * 1024 * 1024 * 1024 + + metrics := metricPairToMetrics([]metricPair{exampleIntMetric()}) + metrics.ResourceMetrics().Resize(maxBufferSize + 1) + metric := metrics.ResourceMetrics().At(0) + + for i := 0; i < maxBufferSize; i++ { + metrics.ResourceMetrics().Append(metric) + } + + count, err := test.exp.pushMetricsData(context.Background(), metrics) + assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.Equal(t, maxBufferSize, count) +} diff --git a/exporter/sumologicexporter/factory.go b/exporter/sumologicexporter/factory.go index a83ab3f0fb94..2794db937a78 100644 --- a/exporter/sumologicexporter/factory.go +++ b/exporter/sumologicexporter/factory.go @@ -34,6 +34,7 @@ func NewFactory() component.ExporterFactory { typeStr, createDefaultConfig, exporterhelper.WithLogs(createLogsExporter), + exporterhelper.WithMetrics(createMetricsExporter), ) } @@ -74,3 +75,16 @@ func createLogsExporter( return exp, nil } + +func createMetricsExporter( + _ context.Context, + params component.ExporterCreateParams, + cfg configmodels.Exporter, +) (component.MetricsExporter, error) { + exp, err := newMetricsExporter(cfg.(*Config), params) + if err != nil { + return nil, fmt.Errorf("failed to create the metrics exporter: %w", err) + } + + return exp, nil +} diff --git a/exporter/sumologicexporter/factory_test.go b/exporter/sumologicexporter/factory_test.go index 2fa4d2c04918..0b5e46c69a5d 100644 --- a/exporter/sumologicexporter/factory_test.go +++ b/exporter/sumologicexporter/factory_test.go @@ -45,7 +45,7 @@ func TestCreateDefaultConfig(t *testing.T) { CompressEncoding: "gzip", MaxRequestBodySize: 1_048_576, LogFormat: "json", - MetricFormat: "carbon2", + MetricFormat: "prometheus", SourceCategory: "", SourceName: "", SourceHost: "", diff --git a/exporter/sumologicexporter/prometheus_formatter.go b/exporter/sumologicexporter/prometheus_formatter.go index fd953077195d..bbe70c2b3e36 100644 --- a/exporter/sumologicexporter/prometheus_formatter.go +++ b/exporter/sumologicexporter/prometheus_formatter.go @@ -42,16 +42,16 @@ const ( prometheusInfValue string = "+Inf" ) -func newPrometheusFormatter() prometheusFormatter { +func newPrometheusFormatter() (prometheusFormatter, error) { sanitNameRegex, err := regexp.Compile(`[^0-9a-zA-Z]`) if err != nil { - return prometheusFormatter{} + return prometheusFormatter{}, err } return prometheusFormatter{ sanitNameRegex: sanitNameRegex, replacer: strings.NewReplacer(`\`, `\\`, `"`, `\"`), - } + }, nil } // PrometheusLabels returns all attributes as sanitized prometheus labels string diff --git a/exporter/sumologicexporter/prometheus_formatter_test.go b/exporter/sumologicexporter/prometheus_formatter_test.go index ba4ffb85a50f..8d67f5ce9207 100644 --- a/exporter/sumologicexporter/prometheus_formatter_test.go +++ b/exporter/sumologicexporter/prometheus_formatter_test.go @@ -18,11 +18,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/pdata" ) func TestSanitizeKey(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) key := "&^*123-abc-ABC!?" expected := "___123_abc_ABC__" @@ -30,7 +32,8 @@ func TestSanitizeKey(t *testing.T) { } func TestSanitizeValue(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) value := `&^*123-abc-ABC!?"\\n` expected := `&^*123-abc-ABC!?\"\\\n` @@ -38,7 +41,8 @@ func TestSanitizeValue(t *testing.T) { } func TestTags2StringNoLabels(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) mp := exampleIntMetric() mp.attributes.InitEmptyWithCapacity(0) @@ -46,7 +50,8 @@ func TestTags2StringNoLabels(t *testing.T) { } func TestTags2String(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) mp := exampleIntMetric() assert.Equal( @@ -57,7 +62,8 @@ func TestTags2String(t *testing.T) { } func TestTags2StringNoAttributes(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) mp := exampleIntMetric() mp.attributes.InitEmptyWithCapacity(0) @@ -65,7 +71,8 @@ func TestTags2StringNoAttributes(t *testing.T) { } func TestPrometheusMetricDataTypeIntGauge(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) metric := exampleIntGaugeMetric() result := f.metric2String(metric) @@ -75,7 +82,8 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1 } func TestPrometheusMetricDataTypeDoubleGauge(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) metric := exampleDoubleGaugeMetric() result := f.metric2String(metric) @@ -85,7 +93,8 @@ gauge_metric_name_double_test{foo="bar",local_name="156155",endpoint="http://ano } func TestPrometheusMetricDataTypeIntSum(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) metric := exampleIntSumMetric() result := f.metric2String(metric) @@ -95,7 +104,8 @@ sum_metric_int_test{foo="bar",name="156155",address="http://another_url"} 1238 1 } func TestPrometheusMetricDataTypeDoubleSum(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) metric := exampleDoubleSumMetric() result := f.metric2String(metric) @@ -105,7 +115,8 @@ sum_metric_double_test{foo="bar",pod_name="opsum",namespace="kube-config"} 1238. } func TestPrometheusMetricDataTypeDoubleSummary(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) metric := exampleDoubleSummaryMetric() result := f.metric2String(metric) @@ -119,7 +130,8 @@ summary_metric_double_test_count{foo="bar",pod_name="sit",namespace="main"} 7 16 } func TestPrometheusMetricDataTypeIntHistogram(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) metric := exampleIntHistogramMetric() result := f.metric2String(metric) @@ -143,7 +155,8 @@ histogram_metric_int_test_count{foo="bar",pod_name="sit",namespace="main"} 5 160 } func TestPrometheusMetricDataTypeDoubleHistogram(t *testing.T) { - f := newPrometheusFormatter() + f, err := newPrometheusFormatter() + require.NoError(t, err) metric := exampleDoubleHistogramMetric() result := f.metric2String(metric) diff --git a/exporter/sumologicexporter/sender.go b/exporter/sumologicexporter/sender.go index a368765a82f2..1416f9937981 100644 --- a/exporter/sumologicexporter/sender.go +++ b/exporter/sumologicexporter/sender.go @@ -42,18 +42,34 @@ type metricPair struct { } type sender struct { - buffer []pdata.LogRecord - config *Config - client *http.Client - filter filter - sources sourceFormats - compressor compressor + logBuffer []pdata.LogRecord + metricBuffer []metricPair + config *Config + client *http.Client + filter filter + sources sourceFormats + compressor compressor + prometheusFormatter prometheusFormatter } const ( logKey string = "log" - // maxBufferSize defines size of the buffer (maximum number of pdata.LogRecord entries) + // maxBufferSize defines size of the logBuffer (maximum number of pdata.LogRecord entries) maxBufferSize int = 1024 * 1024 + + headerContentType string = "Content-Type" + headerContentEncoding string = "Content-Encoding" + headerClient string = "X-Sumo-Client" + headerHost string = "X-Sumo-Host" + headerName string = "X-Sumo-Name" + headerCategory string = "X-Sumo-Category" + headerFields string = "X-Sumo-Fields" + + contentTypeLogs string = "application/x-www-form-urlencoded" + contentTypePrometheus string = "application/vnd.sumologic.prometheus" + + contentEncodingGzip string = "gzip" + contentEncodingDeflate string = "deflate" ) func newAppendResponse() appendResponse { @@ -68,13 +84,15 @@ func newSender( f filter, s sourceFormats, c compressor, + pf prometheusFormatter, ) *sender { return &sender{ - config: cfg, - client: cl, - filter: f, - sources: s, - compressor: c, + config: cfg, + client: cl, + filter: f, + sources: s, + compressor: c, + prometheusFormatter: pf, } } @@ -92,35 +110,39 @@ func (s *sender) send(ctx context.Context, pipeline PipelineType, body io.Reader // Add headers switch s.config.CompressEncoding { case GZIPCompression: - req.Header.Set("Content-Encoding", "gzip") + req.Header.Set(headerContentEncoding, contentEncodingGzip) case DeflateCompression: - req.Header.Set("Content-Encoding", "deflate") + req.Header.Set(headerContentEncoding, contentEncodingDeflate) case NoCompression: default: return fmt.Errorf("invalid content encoding: %s", s.config.CompressEncoding) } - req.Header.Add("X-Sumo-Client", s.config.Client) + req.Header.Add(headerClient, s.config.Client) if s.sources.host.isSet() { - req.Header.Add("X-Sumo-Host", s.sources.host.format(flds)) + req.Header.Add(headerHost, s.sources.host.format(flds)) } if s.sources.name.isSet() { - req.Header.Add("X-Sumo-Name", s.sources.name.format(flds)) + req.Header.Add(headerName, s.sources.name.format(flds)) } if s.sources.category.isSet() { - req.Header.Add("X-Sumo-Category", s.sources.category.format(flds)) + req.Header.Add(headerCategory, s.sources.category.format(flds)) } switch pipeline { case LogsPipeline: - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - req.Header.Add("X-Sumo-Fields", flds.string()) + req.Header.Add(headerContentType, contentTypeLogs) + req.Header.Add(headerFields, flds.string()) case MetricsPipeline: - // ToDo: Implement metrics pipeline - return errors.New("current sender version doesn't support metrics") + switch s.config.MetricFormat { + case PrometheusFormat: + req.Header.Add(headerContentType, contentTypePrometheus) + default: + return fmt.Errorf("unsupported metrics format: %s", s.config.MetricFormat) + } default: return errors.New("unexpected pipeline") } @@ -153,7 +175,7 @@ func (s *sender) logToJSON(record pdata.LogRecord) (string, error) { return bytes.NewBuffer(nextLine).String(), nil } -// sendLogs sends log records from the buffer formatted according +// sendLogs sends log records from the logBuffer formatted according // to configured LogFormat and as the result of execution // returns array of records which has not been sent correctly and error func (s *sender) sendLogs(ctx context.Context, flds fields) ([]pdata.LogRecord, error) { @@ -164,7 +186,7 @@ func (s *sender) sendLogs(ctx context.Context, flds fields) ([]pdata.LogRecord, currentRecords []pdata.LogRecord ) - for _, record := range s.buffer { + for _, record := range s.logBuffer { var formattedLine string var err error @@ -206,9 +228,73 @@ func (s *sender) sendLogs(ctx context.Context, flds fields) ([]pdata.LogRecord, } } - if err := s.send(ctx, LogsPipeline, strings.NewReader(body.String()), flds); err != nil { - errs = append(errs, err) - droppedRecords = append(droppedRecords, currentRecords...) + if body.Len() > 0 { + if err := s.send(ctx, LogsPipeline, strings.NewReader(body.String()), flds); err != nil { + errs = append(errs, err) + droppedRecords = append(droppedRecords, currentRecords...) + } + } + + if len(errs) > 0 { + return droppedRecords, componenterror.CombineErrors(errs) + } + return droppedRecords, nil +} + +// sendMetrics sends metrics in right format basing on the s.config.MetricFormat +func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, error) { + var ( + body strings.Builder + errs []error + droppedRecords []metricPair + currentRecords []metricPair + ) + + for _, record := range s.metricBuffer { + var formattedLine string + var err error + + switch s.config.MetricFormat { + case PrometheusFormat: + formattedLine = s.prometheusFormatter.metric2String(record) + default: + err = fmt.Errorf("unexpected metric format: %s", s.config.MetricFormat) + } + + if err != nil { + droppedRecords = append(droppedRecords, record) + errs = append(errs, err) + continue + } + + ar, err := s.appendAndSend(ctx, formattedLine, MetricsPipeline, &body, flds) + if err != nil { + errs = append(errs, err) + if ar.sent { + droppedRecords = append(droppedRecords, currentRecords...) + } + + if !ar.appended { + droppedRecords = append(droppedRecords, record) + } + } + + // If data was sent, cleanup the currentTimeSeries counter + if ar.sent { + currentRecords = currentRecords[:0] + } + + // If log has been appended to body, increment the currentTimeSeries + if ar.appended { + currentRecords = append(currentRecords, record) + } + } + + if body.Len() > 0 { + if err := s.send(ctx, MetricsPipeline, strings.NewReader(body.String()), flds); err != nil { + errs = append(errs, err) + droppedRecords = append(droppedRecords, currentRecords...) + } } if len(errs) > 0 { @@ -218,7 +304,7 @@ func (s *sender) sendLogs(ctx context.Context, flds fields) ([]pdata.LogRecord, } // appendAndSend appends line to the request body that will be sent and sends -// the accumulated data if the internal buffer has been filled (with maxBufferSize elements). +// the accumulated data if the internal logBuffer has been filled (with maxBufferSize elements). // It returns appendResponse func (s *sender) appendAndSend( ctx context.Context, @@ -260,26 +346,50 @@ func (s *sender) appendAndSend( return ar, nil } -// cleanBuffer zeroes buffer -func (s *sender) cleanBuffer() { - s.buffer = (s.buffer)[:0] +// cleanLogsBuffer zeroes logBuffer +func (s *sender) cleanLogsBuffer() { + s.logBuffer = (s.logBuffer)[:0] } -// batch adds log to the buffer and flushes them if buffer is full to avoid overflow +// batchLog adds log to the logBuffer and flushes them if logBuffer is full to avoid overflow // returns list of log records which were not sent successfully -func (s *sender) batch(ctx context.Context, log pdata.LogRecord, metadata fields) ([]pdata.LogRecord, error) { - s.buffer = append(s.buffer, log) +func (s *sender) batchLog(ctx context.Context, log pdata.LogRecord, metadata fields) ([]pdata.LogRecord, error) { + s.logBuffer = append(s.logBuffer, log) - if s.count() >= maxBufferSize { + if s.countLogs() >= maxBufferSize { dropped, err := s.sendLogs(ctx, metadata) - s.cleanBuffer() + s.cleanLogsBuffer() + return dropped, err + } + + return nil, nil +} + +// countLogs returns number of logs in logBuffer +func (s *sender) countLogs() int { + return len(s.logBuffer) +} + +// cleanMetricBuffer zeroes metricBuffer +func (s *sender) cleanMetricBuffer() { + s.metricBuffer = (s.metricBuffer)[:0] +} + +// batchMetric adds metric to the metricBuffer and flushes them if metricBuffer is full to avoid overflow +// returns list of metric records which were not sent successfully +func (s *sender) batchMetric(ctx context.Context, metric metricPair, metadata fields) ([]metricPair, error) { + s.metricBuffer = append(s.metricBuffer, metric) + + if s.countMetrics() >= maxBufferSize { + dropped, err := s.sendMetrics(ctx, metadata) + s.cleanMetricBuffer() return dropped, err } return nil, nil } -// count returns number of logs in buffer -func (s *sender) count() int { - return len(s.buffer) +// countMetrics returns number of metrics in metricBuffer +func (s *sender) countMetrics() int { + return len(s.metricBuffer) } diff --git a/exporter/sumologicexporter/sender_test.go b/exporter/sumologicexporter/sender_test.go index f62eca4eef2c..27564ec854ff 100644 --- a/exporter/sumologicexporter/sender_test.go +++ b/exporter/sumologicexporter/sender_test.go @@ -64,6 +64,9 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. c, err := newCompressor(NoCompression) require.NoError(t, err) + pf, err := newPrometheusFormatter() + require.NoError(t, err) + return &senderTest{ srv: testServer, exp: exp, @@ -79,6 +82,7 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. name: getTestSourceFormat(t, "source_name"), }, c, + pf, ), } } @@ -126,7 +130,7 @@ func exampleTwoDifferentLogs() []pdata.LogRecord { return buffer } -func TestSend(t *testing.T) { +func TestSendLogs(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) @@ -138,13 +142,13 @@ func TestSend(t *testing.T) { }) defer func() { test.srv.Close() }() - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() _, err := test.s.sendLogs(context.Background(), fields{"key1": "value", "key2": "value2"}) assert.NoError(t, err) } -func TestSendSplit(t *testing.T) { +func TestSendLogsSplit(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) @@ -157,12 +161,12 @@ func TestSendSplit(t *testing.T) { }) defer func() { test.srv.Close() }() test.s.config.MaxRequestBodySize = 10 - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() _, err := test.s.sendLogs(context.Background(), fields{}) assert.NoError(t, err) } -func TestSendSplitFailedOne(t *testing.T) { +func TestSendLogsSplitFailedOne(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -178,14 +182,14 @@ func TestSendSplitFailedOne(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.MaxRequestBodySize = 10 test.s.config.LogFormat = TextFormat - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() dropped, err := test.s.sendLogs(context.Background(), fields{}) assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") - assert.Equal(t, test.s.buffer[0:1], dropped) + assert.Equal(t, test.s.logBuffer[0:1], dropped) } -func TestSendSplitFailedAll(t *testing.T) { +func TestSendLogsSplitFailedAll(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -203,7 +207,7 @@ func TestSendSplitFailedAll(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.MaxRequestBodySize = 10 test.s.config.LogFormat = TextFormat - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() dropped, err := test.s.sendLogs(context.Background(), fields{}) assert.EqualError( @@ -211,10 +215,10 @@ func TestSendSplitFailedAll(t *testing.T) { err, "[error during sending data: 500 Internal Server Error; error during sending data: 404 Not Found]", ) - assert.Equal(t, test.s.buffer[0:2], dropped) + assert.Equal(t, test.s.logBuffer[0:2], dropped) } -func TestSendJson(t *testing.T) { +func TestSendLogsJson(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) @@ -228,13 +232,13 @@ func TestSendJson(t *testing.T) { }) defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() _, err := test.s.sendLogs(context.Background(), fields{"key": "value"}) assert.NoError(t, err) } -func TestSendJsonSplit(t *testing.T) { +func TestSendLogsJsonSplit(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) @@ -248,13 +252,13 @@ func TestSendJsonSplit(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat test.s.config.MaxRequestBodySize = 10 - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() _, err := test.s.sendLogs(context.Background(), fields{}) assert.NoError(t, err) } -func TestSendJsonSplitFailedOne(t *testing.T) { +func TestSendLogsJsonSplitFailedOne(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -270,14 +274,14 @@ func TestSendJsonSplitFailedOne(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat test.s.config.MaxRequestBodySize = 10 - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() dropped, err := test.s.sendLogs(context.Background(), fields{}) assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") - assert.Equal(t, test.s.buffer[0:1], dropped) + assert.Equal(t, test.s.logBuffer[0:1], dropped) } -func TestSendJsonSplitFailedAll(t *testing.T) { +func TestSendLogsJsonSplitFailedAll(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -295,7 +299,7 @@ func TestSendJsonSplitFailedAll(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.LogFormat = JSONFormat test.s.config.MaxRequestBodySize = 10 - test.s.buffer = exampleTwoLogs() + test.s.logBuffer = exampleTwoLogs() dropped, err := test.s.sendLogs(context.Background(), fields{}) assert.EqualError( @@ -303,20 +307,22 @@ func TestSendJsonSplitFailedAll(t *testing.T) { err, "[error during sending data: 500 Internal Server Error; error during sending data: 404 Not Found]", ) - assert.Equal(t, test.s.buffer[0:2], dropped) + assert.Equal(t, test.s.logBuffer[0:2], dropped) } -func TestSendUnexpectedFormat(t *testing.T) { +func TestSendLogsUnexpectedFormat(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { }, }) defer func() { test.srv.Close() }() test.s.config.LogFormat = "dummy" - test.s.buffer = exampleTwoLogs() + logs := exampleTwoLogs() + test.s.logBuffer = logs - _, err := test.s.sendLogs(context.Background(), fields{}) + dropped, err := test.s.sendLogs(context.Background(), fields{}) assert.Error(t, err) + assert.Equal(t, logs, dropped) } func TestOverrideSourceName(t *testing.T) { @@ -328,7 +334,7 @@ func TestOverrideSourceName(t *testing.T) { defer func() { test.srv.Close() }() test.s.sources.name = getTestSourceFormat(t, "Test source name/%{key1}") - test.s.buffer = exampleLog() + test.s.logBuffer = exampleLog() _, err := test.s.sendLogs(context.Background(), fields{"key1": "test_name"}) assert.NoError(t, err) @@ -343,7 +349,7 @@ func TestOverrideSourceCategory(t *testing.T) { defer func() { test.srv.Close() }() test.s.sources.category = getTestSourceFormat(t, "Test source category/%{key1}") - test.s.buffer = exampleLog() + test.s.logBuffer = exampleLog() _, err := test.s.sendLogs(context.Background(), fields{"key1": "test_name"}) assert.NoError(t, err) @@ -358,34 +364,34 @@ func TestOverrideSourceHost(t *testing.T) { defer func() { test.srv.Close() }() test.s.sources.host = getTestSourceFormat(t, "Test source host/%{key1}") - test.s.buffer = exampleLog() + test.s.logBuffer = exampleLog() _, err := test.s.sendLogs(context.Background(), fields{"key1": "test_name"}) assert.NoError(t, err) } -func TestBuffer(t *testing.T) { +func TestLogsBuffer(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) defer func() { test.srv.Close() }() - assert.Equal(t, test.s.count(), 0) + assert.Equal(t, test.s.countLogs(), 0) logs := exampleTwoLogs() - droppedLogs, err := test.s.batch(context.Background(), logs[0], fields{}) + droppedLogs, err := test.s.batchLog(context.Background(), logs[0], fields{}) require.NoError(t, err) assert.Nil(t, droppedLogs) - assert.Equal(t, 1, test.s.count()) - assert.Equal(t, []pdata.LogRecord{logs[0]}, test.s.buffer) + assert.Equal(t, 1, test.s.countLogs()) + assert.Equal(t, []pdata.LogRecord{logs[0]}, test.s.logBuffer) - droppedLogs, err = test.s.batch(context.Background(), logs[1], fields{}) + droppedLogs, err = test.s.batchLog(context.Background(), logs[1], fields{}) require.NoError(t, err) assert.Nil(t, droppedLogs) - assert.Equal(t, 2, test.s.count()) - assert.Equal(t, logs, test.s.buffer) + assert.Equal(t, 2, test.s.countLogs()) + assert.Equal(t, logs, test.s.logBuffer) - test.s.cleanBuffer() - assert.Equal(t, 0, test.s.count()) - assert.Equal(t, []pdata.LogRecord{}, test.s.buffer) + test.s.cleanLogsBuffer() + assert.Equal(t, 0, test.s.countLogs()) + assert.Equal(t, []pdata.LogRecord{}, test.s.logBuffer) } func TestInvalidEndpoint(t *testing.T) { @@ -393,7 +399,7 @@ func TestInvalidEndpoint(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.HTTPClientSettings.Endpoint = ":" - test.s.buffer = exampleLog() + test.s.logBuffer = exampleLog() _, err := test.s.sendLogs(context.Background(), fields{}) assert.EqualError(t, err, `parse ":": missing protocol scheme`) @@ -404,35 +410,37 @@ func TestInvalidPostRequest(t *testing.T) { defer func() { test.srv.Close() }() test.s.config.HTTPClientSettings.Endpoint = "" - test.s.buffer = exampleLog() + test.s.logBuffer = exampleLog() _, err := test.s.sendLogs(context.Background(), fields{}) assert.EqualError(t, err, `Post "": unsupported protocol scheme ""`) } -func TestBufferOverflow(t *testing.T) { +func TestLogsBufferOverflow(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) defer func() { test.srv.Close() }() test.s.config.HTTPClientSettings.Endpoint = ":" log := exampleLog() - for test.s.count() < maxBufferSize-1 { - _, err := test.s.batch(context.Background(), log[0], fields{}) + for test.s.countLogs() < maxBufferSize-1 { + _, err := test.s.batchLog(context.Background(), log[0], fields{}) require.NoError(t, err) } - _, err := test.s.batch(context.Background(), log[0], fields{}) + _, err := test.s.batchLog(context.Background(), log[0], fields{}) assert.EqualError(t, err, `parse ":": missing protocol scheme`) - assert.Equal(t, 0, test.s.count()) + assert.Equal(t, 0, test.s.countLogs()) } -func TestMetricsPipeline(t *testing.T) { +func TestInvalidMetricFormat(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) defer func() { test.srv.Close() }() + test.s.config.MetricFormat = "invalid" + err := test.s.send(context.Background(), MetricsPipeline, strings.NewReader(""), fields{}) - assert.EqualError(t, err, `current sender version doesn't support metrics`) + assert.EqualError(t, err, `unsupported metrics format: invalid`) } func TestInvalidPipeline(t *testing.T) { @@ -512,3 +520,180 @@ func TestInvalidContentEncoding(t *testing.T) { err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) assert.EqualError(t, err, "invalid content encoding: test") } + +func TestSendMetrics(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000 +gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }) + defer func() { test.srv.Close() }() + + test.s.config.MetricFormat = PrometheusFormat + test.s.metricBuffer = []metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + } + _, err := test.s.sendMetrics(context.Background(), fields{"key1": "value", "key2": "value2"}) + assert.NoError(t, err) +} + +func TestSendMetricsSplit(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000` + assert.Equal(t, expected, body) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.MaxRequestBodySize = 10 + test.s.config.MetricFormat = PrometheusFormat + test.s.metricBuffer = []metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + } + + _, err := test.s.sendMetrics(context.Background(), fields{}) + assert.NoError(t, err) +} + +func TestSendMetricsSplitFailedOne(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000` + assert.Equal(t, expected, body) + }, + func(w http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.MaxRequestBodySize = 10 + test.s.config.MetricFormat = PrometheusFormat + test.s.metricBuffer = []metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + } + + dropped, err := test.s.sendMetrics(context.Background(), fields{}) + assert.EqualError(t, err, "error during sending data: 500 Internal Server Error") + assert.Equal(t, test.s.metricBuffer[0:1], dropped) +} + +func TestSendMetricsSplitFailedAll(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + expected := `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000` + assert.Equal(t, expected, body) + }, + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(404) + + body := extractBody(t, req) + expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + }, + }) + defer func() { test.srv.Close() }() + test.s.config.MaxRequestBodySize = 10 + test.s.config.MetricFormat = PrometheusFormat + test.s.metricBuffer = []metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + } + + dropped, err := test.s.sendMetrics(context.Background(), fields{}) + assert.EqualError( + t, + err, + "[error during sending data: 500 Internal Server Error; error during sending data: 404 Not Found]", + ) + assert.Equal(t, test.s.metricBuffer[0:2], dropped) +} + +func TestSendMetricsUnexpectedFormat(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + }, + }) + defer func() { test.srv.Close() }() + test.s.config.MetricFormat = "invalid" + metrics := []metricPair{ + exampleIntMetric(), + } + test.s.metricBuffer = metrics + + dropped, err := test.s.sendMetrics(context.Background(), fields{}) + assert.EqualError(t, err, "unexpected metric format: invalid") + assert.Equal(t, dropped, metrics) +} + +func TestMetricsBuffer(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + assert.Equal(t, test.s.countMetrics(), 0) + metrics := []metricPair{ + exampleIntMetric(), + exampleIntGaugeMetric(), + } + + droppedMetrics, err := test.s.batchMetric(context.Background(), metrics[0], fields{}) + require.NoError(t, err) + assert.Nil(t, droppedMetrics) + assert.Equal(t, 1, test.s.countMetrics()) + assert.Equal(t, metrics[0:1], test.s.metricBuffer) + + droppedMetrics, err = test.s.batchMetric(context.Background(), metrics[1], fields{}) + require.NoError(t, err) + assert.Nil(t, droppedMetrics) + assert.Equal(t, 2, test.s.countMetrics()) + assert.Equal(t, metrics, test.s.metricBuffer) + + test.s.cleanMetricBuffer() + assert.Equal(t, 0, test.s.countMetrics()) + assert.Equal(t, []metricPair{}, test.s.metricBuffer) +} + +func TestMetricsBufferOverflow(t *testing.T) { + t.Skip("Skip test due to prometheus format complexity. Execution can take over 30s") + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + defer func() { test.srv.Close() }() + + test.s.config.HTTPClientSettings.Endpoint = ":" + test.s.config.MetricFormat = PrometheusFormat + test.s.config.MaxRequestBodySize = 1024 * 1024 * 1024 * 1024 + metric := exampleIntMetric() + + for test.s.countMetrics() < maxBufferSize-1 { + _, err := test.s.batchMetric(context.Background(), metric, fields{}) + require.NoError(t, err) + } + + _, err := test.s.batchMetric(context.Background(), metric, fields{}) + assert.EqualError(t, err, `parse ":": missing protocol scheme`) + assert.Equal(t, 0, test.s.countMetrics()) +} diff --git a/exporter/sumologicexporter/test_data.go b/exporter/sumologicexporter/test_data.go index c5be40745440..7d54ce01c169 100644 --- a/exporter/sumologicexporter/test_data.go +++ b/exporter/sumologicexporter/test_data.go @@ -259,3 +259,15 @@ func exampleDoubleHistogramMetric() metricPair { return metric } + +func metricPairToMetrics(mp []metricPair) pdata.Metrics { + metrics := pdata.NewMetrics() + metrics.ResourceMetrics().Resize(len(mp)) + for num, record := range mp { + record.attributes.CopyTo(metrics.ResourceMetrics().At(num).Resource().Attributes()) + metrics.ResourceMetrics().At(num).InstrumentationLibraryMetrics().Resize(1) + metrics.ResourceMetrics().At(num).InstrumentationLibraryMetrics().At(0).Metrics().Append(record.metric) + } + + return metrics +}