Skip to content

Commit

Permalink
[sumologicexporter] Enable metrics pipeline (#2117)
Browse files Browse the repository at this point in the history
* [sumologicexporter] Rename logs related buffer names to include log string

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* [sumologicexporter] Add metrics support to sender

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* [sumologicexporter] Initialise prometheusFormatter in exporter

* [sumologicexporter] Use prometheusFormatter in sendMetrics

* [sumologicexporter] Add support for PrometheusFormat in sender

* [sumologicexporter] Add tests for sendMetrics and fix sender

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* [sumologicexporter] Add and enable metrics for exporter

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* [sumologicexporter] Use consts for sender

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* [sumologicexporter] Skip some metrics tests due to time factor

* [sumologicexporter] Set prometheus as default metrics format

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* Fix default value used in tests

Co-authored-by: Przemek Maciolek <pmaciolek@sumologic.com>
  • Loading branch information
sumo-drosiek and pmm-sumo authored Feb 20, 2021
1 parent 15b531d commit acda163
Show file tree
Hide file tree
Showing 11 changed files with 768 additions and 115 deletions.
3 changes: 2 additions & 1 deletion exporter/sumologicexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Empty string means no compression
- `max_request_body_size` (optional): Max HTTP request body size in bytes before compression (if applied). By default `1_048_576` (1MB) is used.
- `metadata_attributes` (optional): List of regexes for attributes which should be send as metadata
- `log_format` (optional) (logs only): Format to use when sending logs to Sumo. (default `json`) (possible values: `json`, `text`)
- `metric_format` (optional) (metrics only): Format of the metrics to be sent, either graphite, carbon2 or prometheus (default is carbon2).
- `metric_format` (optional) (metrics only): Format of the metrics to be sent (default is `prometheus`).
`carbon2` and `graphite` are going to be supported soon.
- `source_category` (optional): Desired source category. Useful if you want to override the source category configured for the source.
- `source_name` (optional): Desired source name. Useful if you want to override the source name configured for the source.
- `source_host` (optional): Desired host name. Useful if you want to override the source host configured for the source.
Expand Down
4 changes: 2 additions & 2 deletions exporter/sumologicexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {
LogFormat LogFormatType `mapstructure:"log_format"`

// Metrics related configuration
// The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is carbon2)
// The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is prometheus)
MetricFormat MetricFormatType `mapstructure:"metric_format"`

// List of regexes for attributes which should be send as metadata
Expand Down Expand Up @@ -114,7 +114,7 @@ const (
// DefaultLogFormat defines default LogFormat
DefaultLogFormat LogFormatType = JSONFormat
// DefaultMetricFormat defines default MetricFormat
DefaultMetricFormat MetricFormatType = Carbon2Format
DefaultMetricFormat MetricFormatType = PrometheusFormat
// DefaultSourceCategory defines default SourceCategory
DefaultSourceCategory string = ""
// DefaultSourceName defines default SourceName
Expand Down
142 changes: 131 additions & 11 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

type sumologicexporter struct {
sources sourceFormats
config *Config
client *http.Client
filter filter
sources sourceFormats
config *Config
client *http.Client
filter filter
prometheusFormatter prometheusFormatter
}

func initExporter(cfg *Config) (*sumologicexporter, error) {
Expand Down Expand Up @@ -72,16 +73,22 @@ func initExporter(cfg *Config) (*sumologicexporter, error) {
return nil, err
}

pf, err := newPrometheusFormatter()
if err != nil {
return nil, err
}

httpClient, err := cfg.HTTPClientSettings.ToClient()
if err != nil {
return nil, fmt.Errorf("failed to create HTTP Client: %w", err)
}

se := &sumologicexporter{
config: cfg,
sources: sfs,
client: httpClient,
filter: f,
config: cfg,
sources: sfs,
client: httpClient,
filter: f,
prometheusFormatter: pf,
}

return se, nil
Expand All @@ -108,6 +115,27 @@ func newLogsExporter(
)
}

func newMetricsExporter(
cfg *Config,
params component.ExporterCreateParams,
) (component.MetricsExporter, error) {
se, err := initExporter(cfg)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(
cfg,
params.Logger,
se.pushMetricsData,
// Disable exporterhelper Timeout, since we are using a custom mechanism
// within exporter itself
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

// pushLogsData groups data with common metadata and sends them as separate batched requests.
// It returns the number of unsent logs and an error which contains a list of dropped records
// so they can be handled by OTC retry mechanism
Expand All @@ -124,7 +152,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i
if err != nil {
return 0, consumererror.PartialLogsError(fmt.Errorf("failed to initialize compressor: %w", err), ld)
}
sdr := newSender(se.config, se.client, se.filter, se.sources, c)
sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter)

// Iterate over ResourceLogs
rls := ld.ResourceLogs()
Expand Down Expand Up @@ -159,15 +187,15 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i
errs = append(errs, err)
droppedRecords = append(droppedRecords, dropped...)
}
sdr.cleanBuffer()
sdr.cleanLogsBuffer()
}

// assign metadata
previousMetadata = currentMetadata

// add log to the buffer
var dropped []pdata.LogRecord
dropped, err = sdr.batch(ctx, log, previousMetadata)
dropped, err = sdr.batchLog(ctx, log, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
Expand Down Expand Up @@ -202,3 +230,95 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) (i

return 0, nil
}

// pushMetricsData groups data with common metadata and send them as separate batched requests
// it returns number of unsent metrics and error which contains list of dropped records
// so they can be handle by the OTC retry mechanism
func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pdata.Metrics) (int, error) {
var (
currentMetadata fields
previousMetadata fields
errs []error
droppedRecords []metricPair
attributes pdata.AttributeMap
)

c, err := newCompressor(se.config.CompressEncoding)
if err != nil {
return 0, consumererror.PartialMetricsError(fmt.Errorf("failed to initialize compressor: %w", err), md)
}
sdr := newSender(se.config, se.client, se.filter, se.sources, c, se.prometheusFormatter)

// Iterate over ResourceMetrics
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)

attributes = rm.Resource().Attributes()

// iterate over InstrumentationLibraryMetrics
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)

// iterate over Metrics
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
mp := metricPair{
metric: m,
attributes: attributes,
}

currentMetadata = sdr.filter.filterIn(attributes)

// If metadata differs from currently buffered, flush the buffer
if currentMetadata.string() != previousMetadata.string() && previousMetadata.string() != "" {
var dropped []metricPair
dropped, err = sdr.sendMetrics(ctx, previousMetadata)
if err != nil {
errs = append(errs, err)
droppedRecords = append(droppedRecords, dropped...)
}
sdr.cleanMetricBuffer()
}

// assign metadata
previousMetadata = currentMetadata
var dropped []metricPair
// add metric to the buffer
dropped, err = sdr.batchMetric(ctx, mp, currentMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}
}
}
}

// Flush pending metrics
dropped, err := sdr.sendMetrics(ctx, previousMetadata)
if err != nil {
droppedRecords = append(droppedRecords, dropped...)
errs = append(errs, err)
}

if len(droppedRecords) > 0 {
// Move all dropped records to Metrics
droppedMetrics := pdata.NewMetrics()
rms := droppedMetrics.ResourceMetrics()
rms.Resize(len(droppedRecords))
for num, record := range droppedRecords {
rm := droppedMetrics.ResourceMetrics().At(num)
record.attributes.CopyTo(rm.Resource().Attributes())

ilms := rm.InstrumentationLibraryMetrics()
ilms.Resize(1)
ilms.At(0).Metrics().Append(record.metric)
}

return len(droppedRecords), consumererror.PartialMetricsError(componenterror.CombineErrors(errs), droppedMetrics)
}

return 0, nil
}
Loading

0 comments on commit acda163

Please sign in to comment.