diff --git a/.chloggen/otelarrow-receiver-timeout.yaml b/.chloggen/otelarrow-receiver-timeout.yaml new file mode 100644 index 000000000000..540e2b379265 --- /dev/null +++ b/.chloggen/otelarrow-receiver-timeout.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add gRPC timeout propagation. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34742] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index 5d67b56a64b2..a0fcfcd723de 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -48,9 +48,15 @@ import ( type testParams struct { threadCount int requestUntil func(*testConsumer) bool + + // missingDeadline is configured so the zero value implies a deadline, + // which is the default. + missingDeadline bool } type testConsumer struct { + t *testing.T + sink consumertest.TracesSink sentSpans atomic.Int64 @@ -62,6 +68,8 @@ type testConsumer struct { recvSpans *tracetest.InMemoryExporter expSpans *tracetest.InMemoryExporter + + expectDeadline bool } var _ consumer.Traces = &testConsumer{} @@ -80,6 +88,19 @@ func (*testConsumer) Capabilities() consumer.Capabilities { func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { time.Sleep(time.Duration(float64(time.Millisecond) * (1 + rand.Float64()))) + + dead, hasDeadline := ctx.Deadline() + timeout := time.Until(dead) + + require.Equal(tc.t, tc.expectDeadline, hasDeadline, "deadline set or not set: %v", timeout) + if tc.expectDeadline { + // expect allows 1/6 of the deadline to elapse in transit, + // so 1m becomes 50s. + expect := tc.expCfg.TimeoutSettings.Timeout * 5 / 6 + require.Less(tc.t, expect, timeout) + require.Greater(tc.t, tc.expCfg.TimeoutSettings.Timeout, timeout) + } + return tc.sink.ConsumeTraces(ctx, td) } @@ -100,7 +121,7 @@ func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.Ob return tset, obslogs, exp } -func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) { +func basicTestConfig(t *testing.T, tp testParams, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) { ctx := context.Background() efact := otelarrowexporter.NewFactory() @@ -115,6 +136,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces addr := testutil.GetAvailableLocalAddress(t) receiverCfg.Protocols.GRPC.NetAddr.Endpoint = addr + exporterCfg.ClientConfig.Endpoint = addr exporterCfg.ClientConfig.WaitForReady = true exporterCfg.ClientConfig.TLSSetting.Insecure = true @@ -123,6 +145,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces exporterCfg.RetryConfig.Enabled = true exporterCfg.Arrow.NumStreams = 1 exporterCfg.Arrow.MaxStreamLifetime = 5 * time.Second + exporterCfg.Arrow.DisableDowngrade = true if cfgF != nil { cfgF(exporterCfg, receiverCfg) @@ -132,6 +155,8 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces recvTset, recvLogs, recvSpans := testLoggerSettings(t) testCon := &testConsumer{ + t: t, + recvCfg: receiverCfg, expCfg: exporterCfg, @@ -140,6 +165,8 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces recvSpans: recvSpans, expSpans: expSpans, + + expectDeadline: !tp.missingDeadline, } receiver, err := rfact.CreateTracesReceiver(ctx, receiver.Settings{ @@ -161,7 +188,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfgf CfgFunc, mkgen MkGen, errf ConsumerErrFunc, endf EndFunc) { host := componenttest.NewNopHost() - testCon, exporter, receiver := basicTestConfig(t, cfgf) + testCon, exporter, receiver := basicTestConfig(t, tp, cfgf) var startWG sync.WaitGroup var exporterShutdownWG sync.WaitGroup @@ -426,6 +453,33 @@ func TestIntegrationTracesSimple(t *testing.T) { } } +func TestIntegrationDeadlinePropagation(t *testing.T) { + for _, hasDeadline := range []bool{false, true} { + t.Run(fmt.Sprint("deadline=", hasDeadline), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Until at least one span is written. + var params = testParams{ + threadCount: 1, + requestUntil: func(test *testConsumer) bool { + return test.sink.SpanCount() < 1 + }, + missingDeadline: !hasDeadline, + } + + testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, _ *RecvConfig) { + if !hasDeadline { + // 0 disables the exporthelper-set timeout. + ecfg.TimeoutSettings.Timeout = 0 + } else { + ecfg.TimeoutSettings.Timeout = 37 * time.Minute + } + }, func() GenFunc { return makeTestTraces }, consumerSuccess, standardEnding) + }) + } +} + func TestIntegrationMemoryLimited(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index e7343e2db2ff..50390d5b0501 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar go 1.22.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.109.0 github.com/open-telemetry/otel-arrow v0.26.0 diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index e4c8a894fd21..717dbed42f0c 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -41,6 +41,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" @@ -173,7 +174,9 @@ func newHeaderReceiver(streamCtx context.Context, as auth.Server, includeMetadat // client.Info with additional key:values associated with the arrow batch. func (h *headerReceiver) combineHeaders(ctx context.Context, hdrsBytes []byte) (context.Context, map[string][]string, error) { if len(hdrsBytes) == 0 && len(h.streamHdrs) == 0 { - return ctx, nil, nil + // Note: call newContext in this case to ensure that + // connInfo is added to the context, for Auth. + return h.newContext(ctx, nil), nil, nil } if len(hdrsBytes) == 0 { @@ -420,8 +423,8 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr } } -func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) (context.Context, *inFlightData) { - ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") +func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) *inFlightData { + _, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") r.inFlightWG.Add(1) r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, 1) @@ -433,7 +436,7 @@ func (r *receiverStream) newInFlightData(ctx context.Context, method string, bat span: span, } id.refs.Add(1) - return ctx, id + return id } // inFlightData is responsible for storing the resources held by one request. @@ -549,35 +552,43 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, // or plog.Logs item. - req, err := serverStream.Recv() + req, recvErr := serverStream.Recv() + + // the incoming stream context is the parent of the in-flight context, which + // carries a span covering sequential stream-processing work. the context + // is severed at this point, with flight.span a contextless child that will be + // finished in recvDone(). + flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) // inflightCtx is carried through into consumeAndProcess on the success path. - inflightCtx, flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) + // this inherits the stream context so that its auth headers are present + // when the per-data Auth call is made. + inflightCtx := streamCtx defer flight.recvDone(inflightCtx, &retErr) - if err != nil { - if errors.Is(err, io.EOF) { - return err + if recvErr != nil { + if errors.Is(recvErr, io.EOF) { + return recvErr - } else if errors.Is(err, context.Canceled) { + } else if errors.Is(recvErr, context.Canceled) { // This is a special case to avoid introducing a span error // for a canceled operation. return io.EOF - } else if status, ok := status.FromError(err); ok && status.Code() == codes.Canceled { + } else if status, ok := status.FromError(recvErr); ok && status.Code() == codes.Canceled { // This is a special case to avoid introducing a span error // for a canceled operation. return io.EOF } // Note: err is directly from gRPC, should already have status. - return err + return recvErr } // Check for optional headers and set the incoming context. - inflightCtx, authHdrs, err := hrcv.combineHeaders(inflightCtx, req.GetHeaders()) - if err != nil { + inflightCtx, authHdrs, hdrErr := hrcv.combineHeaders(inflightCtx, req.GetHeaders()) + if hdrErr != nil { // Failing to parse the incoming headers breaks the stream. - return status.Errorf(codes.Internal, "arrow metadata error: %v", err) + return status.Errorf(codes.Internal, "arrow metadata error: %v", hdrErr) } // start this span after hrcv.combineHeaders returns extracted context. This will allow this span @@ -601,9 +612,29 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre // This is a compressed size so make sure to acquire the difference when request is decompressed. prevAcquiredBytes = int64(proto.Size(req)) } else { - prevAcquiredBytes, err = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64) - if err != nil { - return status.Errorf(codes.Internal, "failed to convert string to request size: %v", err) + var parseErr error + prevAcquiredBytes, parseErr = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64) + if parseErr != nil { + return status.Errorf(codes.Internal, "failed to convert string to request size: %v", parseErr) + } + } + + var callerCancel context.CancelFunc + if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 { + if timeout, decodeErr := grpcutil.DecodeTimeout(encodedTimeout[0]); decodeErr != nil { + r.telemetry.Logger.Debug("grpc-timeout parse error", zap.Error(decodeErr)) + } else { + // timeout parsed successfully + inflightCtx, callerCancel = context.WithTimeout(inflightCtx, timeout) + + // if we return before the new goroutine is started below + // cancel the context. callerCancel will be non-nil until + // the new goroutine is created at the end of this function. + defer func() { + if callerCancel != nil { + callerCancel() + } + }() } } @@ -612,19 +643,19 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre // immediately if there are too many waiters, or will // otherwise block until timeout or enough memory becomes // available. - err = r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes) - if err != nil { - return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", err) + acquireErr := r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes) + if acquireErr != nil { + return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", acquireErr) } flight.numAcquired = prevAcquiredBytes - data, numItems, uncompSize, err := r.consumeBatch(ac, req) + data, numItems, uncompSize, consumeErr := r.consumeBatch(ac, req) - if err != nil { - if errors.Is(err, arrowRecord.ErrConsumerMemoryLimit) { - return status.Errorf(codes.ResourceExhausted, "otel-arrow decode: %v", err) + if consumeErr != nil { + if errors.Is(consumeErr, arrowRecord.ErrConsumerMemoryLimit) { + return status.Errorf(codes.ResourceExhausted, "otel-arrow decode: %v", consumeErr) } - return status.Errorf(codes.Internal, "otel-arrow decode: %v", err) + return status.Errorf(codes.Internal, "otel-arrow decode: %v", consumeErr) } flight.uncompSize = uncompSize @@ -633,27 +664,35 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize) r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems)) - numAcquired, err := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound) + numAcquired, secondAcquireErr := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound) flight.numAcquired = numAcquired - if err != nil { - return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", err) + if secondAcquireErr != nil { + return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", secondAcquireErr) } // Recognize that the request is still in-flight via consumeAndRespond() flight.refs.Add(1) // consumeAndRespond consumes the data and returns control to the sender loop. - go r.consumeAndRespond(inflightCtx, data, flight) + go func(callerCancel context.CancelFunc) { + if callerCancel != nil { + defer callerCancel() + } + r.consumeAndRespond(inflightCtx, streamCtx, data, flight) + }(callerCancel) + + // Reset callerCancel so the deferred function above does not call it here. + callerCancel = nil return nil } // consumeAndRespond finishes the span started in recvOne and logs the // result after invoking the pipeline to consume the data. -func (r *Receiver) consumeAndRespond(ctx context.Context, data any, flight *inFlightData) { +func (r *Receiver) consumeAndRespond(ctx, streamCtx context.Context, data any, flight *inFlightData) { var err error - defer flight.consumeDone(ctx, &err) + defer flight.consumeDone(streamCtx, &err) // recoverErr is a special function because it recovers panics, so we // keep it in a separate defer than the processing above, which will diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 845f5e606511..11621acbba5c 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -138,6 +138,21 @@ func (u unhealthyTestChannel) onConsume(ctx context.Context) error { } } +type blockingTestChannel struct { + t *testing.T + cf func(context.Context) +} + +func newBlockingTestChannel(t *testing.T, cf func(context.Context)) *blockingTestChannel { + return &blockingTestChannel{t: t, cf: cf} +} + +func (h blockingTestChannel) onConsume(ctx context.Context) error { + h.cf(ctx) + <-ctx.Done() + return status.Error(codes.DeadlineExceeded, ctx.Err().Error()) +} + type recvResult struct { payload *arrowpb.BatchArrowRecords err error @@ -303,6 +318,14 @@ func statusUnavailableFor(batchID int64, msg string) *arrowpb.BatchStatus { } } +func statusDeadlineExceededFor(batchID int64, msg string) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_DEADLINE_EXCEEDED, + StatusMessage: msg, + } +} + func (ctc *commonTestCase) newRealConsumer() arrowRecord.ConsumerAPI { mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl) cons := arrowRecord.NewConsumer() @@ -603,6 +626,50 @@ func TestReceiverSendError(t *testing.T) { requireUnavailableStatus(t, err) } +func TestReceiverTimeoutError(t *testing.T) { + tc := newBlockingTestChannel(t, func(ctx context.Context) { + deadline, has := ctx.Deadline() + require.True(t, has, "context has deadline") + timeout := time.Until(deadline) + require.Less(t, time.Second/2, timeout) + require.GreaterOrEqual(t, time.Second, timeout) + }) + ctc := newCommonTestCase(t, tc) + + ld := testdata.GenerateLogs(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld) + require.NoError(t, err) + + ctc.stream.EXPECT().Send(statusDeadlineExceededFor(batch.BatchId, "context deadline exceeded")).Times(1).Return(nil) + + var hpb bytes.Buffer + hpe := hpack.NewEncoder(&hpb) + err = hpe.WriteField(hpack.HeaderField{ + Name: "grpc-timeout", + Value: "1000m", + }) + assert.NoError(t, err) + batch.Headers = make([]byte, hpb.Len()) + copy(batch.Headers, hpb.Bytes()) + + ctc.start(ctc.newRealConsumer, defaultBQ()) + ctc.putBatch(batch, nil) + + assert.EqualValues(t, ld, (<-ctc.consume).Data) + + start := time.Now() + for time.Since(start) < 5*time.Second { + if ctc.ctrl.Satisfied() { + break + } + time.Sleep(time.Second) + } + + close(ctc.receive) + err = ctc.wait() + require.NoError(t, err) +} + func TestReceiverConsumeError(t *testing.T) { stdTesting := otelAssert.NewStdUnitTest(t)