Skip to content

Commit

Permalink
Fix OTLP receiver Shutdown() bug
Browse files Browse the repository at this point in the history
Ensure that the receiver finishes all ongoing requests, and does not accept any new request after shutdown is finished.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Mar 1, 2021
1 parent e6319ac commit d60dd7a
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 32 deletions.
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,17 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
}

// Shutdown is a method to turn off receiving.
func (r *otlpReceiver) Shutdown(context.Context) error {
func (r *otlpReceiver) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = nil

if r.serverHTTP != nil {
err = r.serverHTTP.Close()
err = r.serverHTTP.Shutdown(ctx)
}

if r.serverGRPC != nil {
r.serverGRPC.Stop()
r.serverGRPC.GracefulStop()
}
})
return err
Expand Down
157 changes: 128 additions & 29 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,13 @@ func TestProtoHttp(t *testing.T) {
}
}
}
func testHTTPProtobufRequest(

func createHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
) *http.Request {
var buf *bytes.Buffer
var err error
switch encoding {
Expand All @@ -388,11 +386,25 @@ func testHTTPProtobufRequest(
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeError(expectedErr)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", encoding)
return req
}

func testHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
tSink.SetConsumeError(expectedErr)

req := createHTTPProtobufRequest(t, url, encoding, traceBytes)

client := &http.Client{}
resp, err := client.Do(req)
Expand All @@ -402,14 +414,14 @@ func testHTTPProtobufRequest(
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

allTraces := tSink.AllTraces()

require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

allTraces := tSink.AllTraces()

if expectedErr == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
err := tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")

require.Len(t, allTraces, 1)
Expand Down Expand Up @@ -556,6 +568,12 @@ func TestHTTPStartWithoutConsumers(t *testing.T) {
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest {
return &collectortrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan()),
}
}

// TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
Expand Down Expand Up @@ -595,26 +613,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
}

addr := testutil.GetAvailableLocalAddress(t)
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
req := createSingleSpanTrace()

exportBidiFn := func(
t *testing.T,
Expand Down Expand Up @@ -775,3 +774,103 @@ func compressGzip(body []byte) (*bytes.Buffer, error) {

return &buf, nil
}

type senderFunc func(msg *collectortrace.ExportTraceServiceRequest)

func TestShutdown(t *testing.T) {
endpointGrpc := testutil.GetAvailableLocalAddress(t)
endpointHTTP := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)

// Create OTLP receiver with gRPC and HTTP protocols.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SetName(otlpReceiverName)
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
cfg.HTTP.Endpoint = endpointHTTP
ocr := newReceiver(t, factory, cfg, nextSink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))

conn, err := grpc.Dial(endpointGrpc, grpc.WithInsecure(), grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

doneSignalGrpc := make(chan bool)
doneSignalHTTP := make(chan bool)

senderGrpc := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/gRPC.
client := collectortrace.NewTraceServiceClient(conn)
client.Export(context.Background(), msg)
}
senderHTTP := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/HTTP.
traceBytes, err2 := msg.Marshal()
if err2 != nil {
t.Errorf("Error marshaling protobuf: %v", err2)
}
url := fmt.Sprintf("http://%s/v1/traces", endpointHTTP)
req := createHTTPProtobufRequest(t, url, "", traceBytes)
client := &http.Client{}
resp, err2 := client.Do(req)
if err2 == nil {
ioutil.ReadAll(resp.Body)
}
}

// Send traces to the receiver until we signal via done channel, and then
// send one more trace after that.
go generateTraces(senderGrpc, doneSignalGrpc)
go generateTraces(senderHTTP, doneSignalHTTP)

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpansCount() > 0
}, time.Second, 1*time.Millisecond)

// Now shutdown the receiver, while continuing sending traces to it.
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
err = ocr.Shutdown(ctx)
assert.NoError(t, err)

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpansCount()

// Now signal to generateTraces to exit the main generation loop, then send
// one more trace and stop.
doneSignalGrpc <- true
doneSignalHTTP <- true

// Wait until all follow up traces are sent.
<-doneSignalGrpc
<-doneSignalHTTP

// The last, additional trace should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount())
}

func generateTraces(senderFn senderFunc, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
for {
select {
case <-doneSignal:
break loop
default:
}
senderFn(createSingleSpanTrace())
}

// After getting the signal to stop, send one more span and then
// finally stop. We should never receive this last span.
senderFn(createSingleSpanTrace())

// Indicate that we are done.
close(doneSignal)
}

0 comments on commit d60dd7a

Please sign in to comment.