From 98124e0b6d79f72b7e130b6a7510b7ce0f29310e Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 14 Nov 2023 11:06:44 -0700 Subject: [PATCH] Fix panic when LogRecordCount is called after ConsumeLogs (#29274) Resolves #29107 Resolves #27469 --- .chloggen/consume-logs-race-fix-2.yaml | 27 +++++++++++++++++++++ .chloggen/consume-logs-race-fix-3.yaml | 27 +++++++++++++++++++++ .chloggen/consume-logs-race-fix-4.yaml | 27 +++++++++++++++++++++ .chloggen/consume-logs-race-fix-5.yaml | 27 +++++++++++++++++++++ .chloggen/consume-logs-race-fix-6.yaml | 27 +++++++++++++++++++++ .chloggen/consume-logs-race-fix.yaml | 27 +++++++++++++++++++++ pkg/stanza/adapter/receiver.go | 3 ++- receiver/fluentforwardreceiver/collector.go | 5 ++-- receiver/k8sobjectsreceiver/receiver.go | 3 ++- receiver/kafkareceiver/kafka_receiver.go | 4 +-- receiver/lokireceiver/loki.go | 6 +++-- receiver/otlpjsonfilereceiver/file.go | 5 ++-- 12 files changed, 178 insertions(+), 10 deletions(-) create mode 100755 .chloggen/consume-logs-race-fix-2.yaml create mode 100755 .chloggen/consume-logs-race-fix-3.yaml create mode 100755 .chloggen/consume-logs-race-fix-4.yaml create mode 100755 .chloggen/consume-logs-race-fix-5.yaml create mode 100755 .chloggen/consume-logs-race-fix-6.yaml create mode 100755 .chloggen/consume-logs-race-fix.yaml diff --git a/.chloggen/consume-logs-race-fix-2.yaml b/.chloggen/consume-logs-race-fix-2.yaml new file mode 100755 index 000000000000..23a21b9afc0e --- /dev/null +++ b/.chloggen/consume-logs-race-fix-2.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where counting number of logs emitted could cause panic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27469, 29107] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/consume-logs-race-fix-3.yaml b/.chloggen/consume-logs-race-fix-3.yaml new file mode 100755 index 000000000000..d71a1f2d6646 --- /dev/null +++ b/.chloggen/consume-logs-race-fix-3.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: lokireceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where counting number of logs emitted could cause panic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27469, 29107] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/consume-logs-race-fix-4.yaml b/.chloggen/consume-logs-race-fix-4.yaml new file mode 100755 index 000000000000..1d6019aaa432 --- /dev/null +++ b/.chloggen/consume-logs-race-fix-4.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where counting number of logs emitted could cause panic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27469, 29107] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/consume-logs-race-fix-5.yaml b/.chloggen/consume-logs-race-fix-5.yaml new file mode 100755 index 000000000000..3421a70c8788 --- /dev/null +++ b/.chloggen/consume-logs-race-fix-5.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sobjectsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where counting number of logs emitted could cause panic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27469, 29107] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/consume-logs-race-fix-6.yaml b/.chloggen/consume-logs-race-fix-6.yaml new file mode 100755 index 000000000000..5b8967b43eb8 --- /dev/null +++ b/.chloggen/consume-logs-race-fix-6.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: fluentforwardreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where counting number of logs emitted could cause panic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27469, 29107] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/consume-logs-race-fix.yaml b/.chloggen/consume-logs-race-fix.yaml new file mode 100755 index 000000000000..b90afbd340e9 --- /dev/null +++ b/.chloggen/consume-logs-race-fix.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otlpjsonfilereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where counting number of logs emitted could cause panic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27469, 29107] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/adapter/receiver.go b/pkg/stanza/adapter/receiver.go index fc4682231d4d..ffdeeb1b9417 100644 --- a/pkg/stanza/adapter/receiver.go +++ b/pkg/stanza/adapter/receiver.go @@ -121,11 +121,12 @@ func (r *receiver) consumerLoop(ctx context.Context) { continue } obsrecvCtx := r.obsrecv.StartLogsOp(ctx) + logRecordCount := pLogs.LogRecordCount() cErr := r.consumer.ConsumeLogs(ctx, pLogs) if cErr != nil { r.logger.Error("ConsumeLogs() failed", zap.Error(cErr)) } - r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", pLogs.LogRecordCount(), cErr) + r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr) } } } diff --git a/receiver/fluentforwardreceiver/collector.go b/receiver/fluentforwardreceiver/collector.go index 26448a4c0473..415ef203bb17 100644 --- a/receiver/fluentforwardreceiver/collector.go +++ b/receiver/fluentforwardreceiver/collector.go @@ -54,10 +54,11 @@ func (c *Collector) processEvents(ctx context.Context) { // efficiency on LogResource allocations. c.fillBufferUntilChanEmpty(logSlice) - stats.Record(context.Background(), observ.RecordsGenerated.M(int64(out.LogRecordCount()))) + logRecordCount := out.LogRecordCount() + stats.Record(context.Background(), observ.RecordsGenerated.M(int64(logRecordCount))) obsCtx := c.obsrecv.StartLogsOp(ctx) err := c.nextConsumer.ConsumeLogs(obsCtx, out) - c.obsrecv.EndLogsOp(obsCtx, "fluent", out.LogRecordCount(), err) + c.obsrecv.EndLogsOp(obsCtx, "fluent", logRecordCount, err) } } } diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index 636bb7f9c41d..b19265f6852b 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -139,8 +139,9 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC } else if len(objects.Items) > 0 { logs := pullObjectsToLogData(objects, time.Now(), config) obsCtx := kr.obsrecv.StartLogsOp(ctx) + logRecordCount := logs.LogRecordCount() err = kr.consumer.ConsumeLogs(obsCtx, logs) - kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, logs.LogRecordCount(), err) + kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, logRecordCount, err) } case <-stopperChan: return diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 5f10d5478783..9758a5e87743 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -676,9 +676,9 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess return err } c.headerExtractor.extractHeadersLogs(logs, message) + logRecordCount := logs.LogRecordCount() err = c.nextConsumer.ConsumeLogs(session.Context(), logs) - // TODO - c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err) + c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err) if err != nil { if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") diff --git a/receiver/lokireceiver/loki.go b/receiver/lokireceiver/loki.go index f2c77fd3bf39..33c1bd436a2f 100644 --- a/receiver/lokireceiver/loki.go +++ b/receiver/lokireceiver/loki.go @@ -163,8 +163,9 @@ func (r *lokiReceiver) Push(ctx context.Context, pushRequest *push.PushRequest) return &push.PushResponse{}, err } ctx = r.obsrepGRPC.StartLogsOp(ctx) + logRecordCount := logs.LogRecordCount() err = r.nextConsumer.ConsumeLogs(ctx, logs) - r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logs.LogRecordCount(), err) + r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logRecordCount, err) return &push.PushResponse{}, nil } @@ -218,8 +219,9 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, r *lokiReceiver) { return } ctx := r.obsrepHTTP.StartLogsOp(req.Context()) + logRecordCount := logs.LogRecordCount() err = r.nextConsumer.ConsumeLogs(ctx, logs) - r.obsrepHTTP.EndLogsOp(ctx, "json", logs.LogRecordCount(), err) + r.obsrepHTTP.EndLogsOp(ctx, "json", logRecordCount, err) resp.WriteHeader(http.StatusNoContent) } diff --git a/receiver/otlpjsonfilereceiver/file.go b/receiver/otlpjsonfilereceiver/file.go index c34ffbf89cf9..e7eabc633c89 100644 --- a/receiver/otlpjsonfilereceiver/file.go +++ b/receiver/otlpjsonfilereceiver/file.go @@ -80,10 +80,11 @@ func createLogsReceiver(_ context.Context, settings receiver.CreateSettings, con if err != nil { obsrecv.EndLogsOp(ctx, metadata.Type, 0, err) } else { - if l.ResourceLogs().Len() != 0 { + logRecordCount := l.LogRecordCount() + if logRecordCount != 0 { err = logs.ConsumeLogs(ctx, l) } - obsrecv.EndLogsOp(ctx, metadata.Type, l.LogRecordCount(), err) + obsrecv.EndLogsOp(ctx, metadata.Type, logRecordCount, err) } return nil })