Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent batch processor features → batch processor #11248

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
remove panic/recover logic
  • Loading branch information
jmacd committed Sep 23, 2024
commit 20403c09074ed01af3a2f9daabbb24ca89932f7d
15 changes: 3 additions & 12 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,6 @@ func newBatchLogsProcessor(set processor.Settings, next consumer.Logs, cfg *Conf
return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) })
}

func recoverError(retErr *error) {
if r := recover(); r != nil {
*retErr = fmt.Errorf("%v", r)
}
}

type batchTraces struct {
nextConsumer consumer.Traces
traceData ptrace.Traces
Expand Down Expand Up @@ -649,8 +643,7 @@ func (bt *batchTraces) sizeBytes(data any) int {
return bt.sizer.TracesSize(data.(ptrace.Traces))
}

func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) {
defer recoverError(&retErr)
func (bt *batchTraces) export(ctx context.Context, req any) error {
td := req.(ptrace.Traces)
return bt.nextConsumer.ConsumeTraces(ctx, td)
}
Expand Down Expand Up @@ -690,8 +683,7 @@ func (bm *batchMetrics) sizeBytes(data any) int {
return bm.sizer.MetricsSize(data.(pmetric.Metrics))
}

func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) {
defer recoverError(&retErr)
func (bm *batchMetrics) export(ctx context.Context, req any) error {
md := req.(pmetric.Metrics)
return bm.nextConsumer.ConsumeMetrics(ctx, md)
}
Expand Down Expand Up @@ -743,8 +735,7 @@ func (bl *batchLogs) sizeBytes(data any) int {
return bl.sizer.LogsSize(data.(plog.Logs))
}

func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) {
defer recoverError(&retErr)
func (bl *batchLogs) export(ctx context.Context, req any) error {
ld := req.(plog.Logs)
return bl.nextConsumer.ConsumeLogs(ctx, ld)
}
Expand Down
104 changes: 0 additions & 104 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,110 +122,6 @@ func (pc *panicConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func TestBatchProcessorSpansPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchTracesProcessor(creationSet, &panicConsumer{}, cfg)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
spansPerRequest := 100
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
}
td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty())
// ConsumeTraces is a blocking function and should be run in a go routine
// until batch size reached to unblock.
wg.Add(1)
go func() {
consumeErr := bp.ConsumeTraces(context.Background(), td)
assert.Contains(t, consumeErr.Error(), "testing panic")
wg.Done()
}()
}

wg.Wait()
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
metricsPerRequest := 100
sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics()
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
md := testdata.GenerateMetrics(metricsPerRequest)
metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ {
metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex))
}
md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty())
wg.Add(1)
go func() {
consumeErr := bp.ConsumeMetrics(context.Background(), md)
assert.Contains(t, consumeErr.Error(), "testing panic")
wg.Done()
}()
}

wg.Wait()
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorLogsPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
logsPerRequest := 100
sentResourceLogs := plog.NewLogs().ResourceLogs()
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogs(logsPerRequest)
logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex))
}
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
wg.Add(1)
go func() {
consumeErr := bp.ConsumeLogs(context.Background(), ld)
assert.Contains(t, consumeErr.Error(), "testing panic")
wg.Done()
}()
}

wg.Wait()
require.NoError(t, bp.Shutdown(context.Background()))
}

type blockingConsumer struct {
lock sync.Mutex
numItems int
Expand Down