diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 875ea52b350..049081b97df 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -168,13 +168,13 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { } // Shutdown is a method to turn off receiving. -func (r *otlpReceiver) Shutdown(context.Context) error { +func (r *otlpReceiver) Shutdown(ctx context.Context) error { var err error r.stopOnce.Do(func() { err = nil if r.serverHTTP != nil { - err = r.serverHTTP.Close() + err = r.serverHTTP.Shutdown(ctx) } if r.serverGRPC != nil { diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 27f2bc405c2..55ef8028d7e 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -370,15 +370,13 @@ func TestProtoHttp(t *testing.T) { } } } -func testHTTPProtobufRequest( + +func createHTTPProtobufRequest( t *testing.T, url string, - tSink *consumertest.TracesSink, encoding string, traceBytes []byte, - expectedErr error, - wantOtlp []*otlptrace.ResourceSpans, -) { +) *http.Request { var buf *bytes.Buffer var err error switch encoding { @@ -388,11 +386,25 @@ func testHTTPProtobufRequest( default: buf = bytes.NewBuffer(traceBytes) } - tSink.SetConsumeError(expectedErr) req, err := http.NewRequest("POST", url, buf) require.NoError(t, err, "Error creating trace POST request: %v", err) req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", encoding) + return req +} + +func testHTTPProtobufRequest( + t *testing.T, + url string, + tSink *consumertest.TracesSink, + encoding string, + traceBytes []byte, + expectedErr error, + wantOtlp []*otlptrace.ResourceSpans, +) { + tSink.SetConsumeError(expectedErr) + + req := createHTTPProtobufRequest(t, url, encoding, traceBytes) client := &http.Client{} resp, err := client.Do(req) @@ -402,14 +414,14 @@ func testHTTPProtobufRequest( require.NoError(t, err, "Error reading response from trace grpc-gateway") require.NoError(t, resp.Body.Close(), "Error closing response body") - allTraces := tSink.AllTraces() - require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") + allTraces := tSink.AllTraces() + if expectedErr == nil { require.Equal(t, 200, resp.StatusCode, "Unexpected return status") tmp := &collectortrace.ExportTraceServiceResponse{} - err = tmp.Unmarshal(respBytes) + err := tmp.Unmarshal(respBytes) require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") require.Len(t, allTraces, 1) @@ -556,6 +568,29 @@ func TestHTTPStartWithoutConsumers(t *testing.T) { require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } +func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest { + return &collectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + { + TraceId: data.NewTraceID( + [16]byte{ + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, + }, + ), + }, + }, + }, + }, + }, + }, + } +} + // TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver // is returning the proper response (return and metrics) when the next consumer // in the pipeline reports error. The test changes the responses returned by the @@ -595,26 +630,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { } addr := testutil.GetAvailableLocalAddress(t) - req := &collectortrace.ExportTraceServiceRequest{ - ResourceSpans: []*otlptrace.ResourceSpans{ - { - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - { - TraceId: data.NewTraceID( - [16]byte{ - 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, - 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, - }, - ), - }, - }, - }, - }, - }, - }, - } + req := createSingleSpanTrace() exportBidiFn := func( t *testing.T, @@ -775,3 +791,103 @@ func compressGzip(body []byte) (*bytes.Buffer, error) { return &buf, nil } + +type senderFunc func(msg *collectortrace.ExportTraceServiceRequest) + +func TestShutdown(t *testing.T) { + endpointGrpc := testutil.GetAvailableLocalAddress(t) + endpointHTTP := testutil.GetAvailableLocalAddress(t) + + nextSink := new(consumertest.TracesSink) + + // Create OTLP receiver with gRPC and HTTP protocols. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SetName(otlpReceiverName) + cfg.GRPC.NetAddr.Endpoint = endpointGrpc + cfg.HTTP.Endpoint = endpointHTTP + ocr := newReceiver(t, factory, cfg, nextSink, nil) + require.NotNil(t, ocr) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + + conn, err := grpc.Dial(endpointGrpc, grpc.WithInsecure(), grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + doneSignalGrpc := make(chan bool) + doneSignalHTTP := make(chan bool) + + senderGrpc := func(msg *collectortrace.ExportTraceServiceRequest) { + // Send request via OTLP/gRPC. + client := collectortrace.NewTraceServiceClient(conn) + client.Export(context.Background(), msg) + } + senderHTTP := func(msg *collectortrace.ExportTraceServiceRequest) { + // Send request via OTLP/HTTP. + traceBytes, err2 := msg.Marshal() + if err2 != nil { + t.Errorf("Error marshaling protobuf: %v", err2) + } + url := fmt.Sprintf("http://%s/v1/traces", endpointHTTP) + req := createHTTPProtobufRequest(t, url, "", traceBytes) + client := &http.Client{} + resp, err2 := client.Do(req) + if err2 == nil { + ioutil.ReadAll(resp.Body) + } + } + + // Send traces to the receiver until we signal via done channel, and then + // send one more trace after that. + go generateTraces(senderGrpc, doneSignalGrpc) + go generateTraces(senderHTTP, doneSignalHTTP) + + // Wait until the receiver outputs anything to the sink. + assert.Eventually(t, func() bool { + return nextSink.SpansCount() > 0 + }, time.Second, 1*time.Millisecond) + + // Now shutdown the receiver, while continuing sending traces to it. + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFn() + err = ocr.Shutdown(ctx) + assert.NoError(t, err) + + // Remember how many spans the sink received. This number should not change after this + // point because after Shutdown() returns the component is not allowed to produce + // any more data. + sinkSpanCountAfterShutdown := nextSink.SpansCount() + + // Now signal to generateTraces to exit the main generation loop, then send + // one more trace and stop. + doneSignalGrpc <- true + doneSignalHTTP <- true + + // Wait until all follow up traces are sent. + <-doneSignalGrpc + <-doneSignalHTTP + + // The last, additional trace should not be received by sink, so the number of spans in + // the sink should not change. + assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount()) +} + +func generateTraces(senderFn senderFunc, doneSignal chan bool) { + // Continuously generate spans until signaled to stop. +loop: + for { + select { + case <-doneSignal: + break loop + default: + } + senderFn(createSingleSpanTrace()) + } + + // After getting the signal to stop, send one more span and then + // finally stop. We should never receive this last span. + senderFn(createSingleSpanTrace()) + + // Indicate that we are done. + close(doneSignal) +}