Skip to content

Commit

Permalink
[exporter/splunkhec] apply multimetric metric merge for the whole batch
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jun 13, 2023
1 parent 9f854da commit cc3938c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
20 changes: 20 additions & 0 deletions .chloggen/splunkhec-exporter-multimetric-batch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Apply multi-metric merge at the level of the whole batch rather than within events emitted for one metric.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23365]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
65 changes: 64 additions & 1 deletion exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS

// Parsing metric record to Splunk event.
events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)

tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
if c.config.UseMultiMetricFormat {
merged, err := mergeEventsToMultiMetricFormat(events)
Expand Down Expand Up @@ -298,6 +299,62 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
return iterState{done: true}, permanentErrors
}

func (c *client) fillMetricsBufferMultiMetrics(metrics pmetric.Metrics, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

var events []*splunk.Event

for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := is.library; j < rm.ScopeMetrics().Len(); j++ {
is.library = 0 // Reset library index for next resource.
sm := rm.ScopeMetrics().At(j)
for k := is.record; k < sm.Metrics().Len(); k++ {
is.record = 0 // Reset record index for next library.
metric := sm.Metrics().At(k)

// Parsing metric record to Splunk event.
events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...)
}
}
}

merged, err := mergeEventsToMultiMetricFormat(events)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error merging events: %w", err)))
}

tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
for _, event := range merged {
// JSON encoding event and writing to buffer.
b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if jsonErr != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err)))
continue
}
tempBuf.Write(b)
}

// Continue adding events to buffer up to capacity.
b := tempBuf.Bytes()
_, err = buf.Write(b)
if err == nil {
return iterState{done: true}, permanentErrors
}
if errors.Is(err, errOverCapacity) {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
} else {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error writing the event: %w", err)))
}
return iterState{done: true}, permanentErrors
}

func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
Expand Down Expand Up @@ -356,7 +413,13 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric

for !is.done {
buf.Reset()
latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, buf, is)
var latestIterState iterState
var batchPermanentErrors []error
if c.config.UseMultiMetricFormat {
latestIterState, batchPermanentErrors = c.fillMetricsBufferMultiMetrics(md, buf, is)
} else {
latestIterState, batchPermanentErrors = c.fillMetricsBuffer(md, buf, is)
}
permanentErrors = append(permanentErrors, batchPermanentErrors...)
if !buf.Empty() {
if err := c.postEvents(ctx, buf, headers); err != nil {
Expand Down

0 comments on commit cc3938c

Please sign in to comment.