Skip to content

Commit

Permalink
Allow multiple OTLP to be created (#2743)
Browse files Browse the repository at this point in the history
* Allow multiple OTLP to be created

* Add stop wait group to ensure proper shutdown

* Fix lifecycle test

* Rename shutdown WaitGroup

* Remove local lifecycle test, use the defaultcomponents one

* Remove shutdown receiver and undo register function changes

* Left over from previous interaction

* Add comments on removing receiver from map
  • Loading branch information
pjanotti authored Mar 22, 2021
1 parent 3657cf6 commit 705e996
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
2 changes: 2 additions & 0 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,6 @@ func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceive
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = map[*Config]*otlpReceiver{}
18 changes: 16 additions & 2 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once
shutdownWG sync.WaitGroup

logger *zap.Logger
}
Expand Down Expand Up @@ -96,8 +97,11 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
defer r.shutdownWG.Done()

if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && errGrpc != grpc.ErrServerStopped {
host.ReportFatalError(errGrpc)
}
}()
Expand All @@ -111,8 +115,11 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
defer r.shutdownWG.Done()

if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
Expand Down Expand Up @@ -180,6 +187,13 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}

r.shutdownWG.Wait()

// delete the receiver from the map so it doesn't leak and it becomes possible to create
// another instance with the same configuration that functions properly. Notice that an
// OTLP object can only be started and shutdown once.
delete(receivers, r.cfg)
})
return err
}
Expand Down
3 changes: 1 addition & 2 deletions service/defaultcomponents/default_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestDefaultReceivers(t *testing.T) {
skipLifecyle: true, // TODO: Usage of CMux doesn't allow proper shutdown.
},
{
receiver: "otlp",
skipLifecyle: true, // TODO: Upcoming PR to fix zipkin lifecycle.
receiver: "otlp",
},
{
receiver: "prometheus",
Expand Down

0 comments on commit 705e996

Please sign in to comment.