Skip to content
This repository was archived by the owner on Dec 21, 2023. It is now read-only.

Commit d9a621b

Browse files
authored
feat: Wait for all event handlers to complete before exiting controlPlane.Register() (#496)
* feat: Provide Shutdown method to run logic that should be executed right before application shutdown Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
1 parent d00898a commit d9a621b

File tree

6 files changed

+103
-39
lines changed

6 files changed

+103
-39
lines changed

examples/cp-connector/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func main() {
4848
// If you don't want/need that, you can simply pass nil
4949
logForwarder := logforwarder.New(keptnAPI.LogsV1())
5050

51-
// Create a control plane component that is the main component of cp-connecter and start it
51+
// Create a control plane component that is the main component of cp-connector and start it
5252
// using RunWithGraceFulShutdown
5353
controlPlane := controlplane.New(subscriptionSource, eventSource, logForwarder, controlplane.WithLogger(logger))
5454
if err := controlplane.RunWithGracefulShutdown(controlPlane, LocalService{}, time.Second*10); err != nil {
@@ -65,6 +65,10 @@ type LocalService struct{}
6565
//
6666
// Note, that you are responsible for sending corresponding .started and .finished events
6767
// on your own.
68+
// Also note, that if you need to ensure that every incoming event is completely processed before the pod running your
69+
// integration is shut down (e.g., due to an upgrade to a newer version), the OnEvent method should process the incoming events synchronously,
70+
// i.e. not in a separate go routine. If you need to process events asynchronously, you need to implement your own synchronization mechanism to ensure all
71+
// events have been completely processed before a shutdown
6872
func (e LocalService) OnEvent(ctx context.Context, event models.KeptnContextExtendedCE) error {
6973
// You can grab handle the event and grab a sender to send back started / finished events to keptn
7074
// eventSender := ctx.Value(controlplane.EventSenderKeyType{}).(types.EventSender)

pkg/sdk/connector/controlplane/controlplane.go

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@ type Integration interface {
3838

3939
// ControlPlane can be used to connect to the Keptn Control Plane
4040
type ControlPlane struct {
41-
subscriptionSource subscriptionsource.SubscriptionSource
42-
eventSource eventsource.EventSource
43-
currentSubscriptions []models.EventSubscription
44-
logger logger.Logger
45-
registered bool
46-
integrationID string
47-
logForwarder logforwarder.LogForwarder
48-
mtx *sync.RWMutex
41+
subscriptionSource subscriptionsource.SubscriptionSource
42+
eventSource eventsource.EventSource
43+
currentSubscriptions []models.EventSubscription
44+
logger logger.Logger
45+
registered bool
46+
integrationID string
47+
logForwarder logforwarder.LogForwarder
48+
mtx *sync.RWMutex
49+
eventHandlerWaitGroup *sync.WaitGroup
4950
}
5051

5152
// WithLogger sets the logger to use
@@ -61,7 +62,7 @@ func WithLogger(logger logger.Logger) func(plane *ControlPlane) {
6162
//
6263
// This call is blocking.
6364
//
64-
//If you want to start the controlplane component with an own context you need to call the Regiser(ctx,integration)
65+
//If you want to start the controlPlane component with an own context you need to call the Register(ctx,integration)
6566
// method on your own
6667
func RunWithGracefulShutdown(controlPlane *ControlPlane, integration Integration, shutdownTimeout time.Duration) error {
6768
ctxShutdown, cancel := context.WithCancel(context.Background())
@@ -76,6 +77,7 @@ func RunWithGracefulShutdown(controlPlane *ControlPlane, integration Integration
7677
}()
7778

7879
return controlPlane.Register(ctxShutdown, integration)
80+
7981
}
8082

8183
// New creates a new ControlPlane
@@ -84,13 +86,14 @@ func RunWithGracefulShutdown(controlPlane *ControlPlane, integration Integration
8486
// and a LogForwarder to forward error logs
8587
func New(subscriptionSource subscriptionsource.SubscriptionSource, eventSource eventsource.EventSource, logForwarder logforwarder.LogForwarder, opts ...func(plane *ControlPlane)) *ControlPlane {
8688
cp := &ControlPlane{
87-
subscriptionSource: subscriptionSource,
88-
eventSource: eventSource,
89-
currentSubscriptions: []models.EventSubscription{},
90-
logger: logger.NewDefaultLogger(),
91-
logForwarder: logForwarder,
92-
registered: false,
93-
mtx: &sync.RWMutex{},
89+
subscriptionSource: subscriptionSource,
90+
eventSource: eventSource,
91+
currentSubscriptions: []models.EventSubscription{},
92+
logger: logger.NewDefaultLogger(),
93+
logForwarder: logForwarder,
94+
registered: false,
95+
mtx: &sync.RWMutex{},
96+
eventHandlerWaitGroup: &sync.WaitGroup{},
9497
}
9598
for _, o := range opts {
9699
o(cp)
@@ -148,30 +151,50 @@ func (cp *ControlPlane) Register(ctx context.Context, integration Integration) e
148151

149152
// control plane cancelled via context
150153
case <-ctx.Done():
151-
cp.logger.Debug("Controlplane cancelled via context. Unregistering...")
154+
cp.logger.Info("ControlPlane cancelled via context. Unregistering...")
152155
wg.Wait()
156+
cp.waitForEventHandlers()
157+
cp.cleanup()
153158
cp.setRegistrationStatus(false)
154159
return nil
155160

156161
// control plane cancelled via error in either one of the sub components
157162
case e := <-errC:
158-
cp.logger.Debugf("Stopping control plane due to error: %v", e)
159-
cp.cleanup()
160-
cp.logger.Debug("Waiting for components to shutdown")
163+
cp.logger.Errorf("Stopping control plane due to error: %v", e)
164+
cp.logger.Info("Waiting for components to shutdown")
161165
wg.Wait()
166+
cp.waitForEventHandlers()
167+
cp.cleanup()
162168
cp.setRegistrationStatus(false)
163169
return nil
164170
}
165171
}
166172
}
167173

174+
func (cp *ControlPlane) waitForEventHandlers() {
175+
cp.logger.Info("Wait for all event handlers to finish")
176+
cp.eventHandlerWaitGroup.Wait()
177+
cp.logger.Info("All event handlers done - ready to shut down")
178+
}
179+
168180
// IsRegistered can be called to detect whether the controlPlane is registered and ready to receive events
169181
func (cp *ControlPlane) IsRegistered() bool {
170182
cp.mtx.RLock()
171183
defer cp.mtx.RUnlock()
172184
return cp.registered
173185
}
174186

187+
func (cp *ControlPlane) cleanup() {
188+
cp.logger.Info("Stopping subscription source...")
189+
if err := cp.subscriptionSource.Stop(); err != nil {
190+
log.Fatalf("Unable to stop subscription source: %v", err)
191+
}
192+
cp.logger.Info("Stopping event source...")
193+
if err := cp.eventSource.Stop(); err != nil {
194+
log.Fatalf("Unable to stop event source: %v", err)
195+
}
196+
}
197+
175198
func (cp *ControlPlane) handle(ctx context.Context, eventUpdate types.EventUpdate, integration Integration) error {
176199
cp.logger.Debugf("Received an event of type: %s", *eventUpdate.KeptnEvent.Type)
177200
for _, subscription := range cp.currentSubscriptions {
@@ -204,6 +227,11 @@ func (cp *ControlPlane) getSender(sender types.EventSender) types.EventSender {
204227
}
205228

206229
func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscription models.EventSubscription) error {
230+
// increase the eventHandler WaitGroup
231+
cp.eventHandlerWaitGroup.Add(1)
232+
// when the event handler is done, decrease the WaitGroup again
233+
defer cp.eventHandlerWaitGroup.Done()
234+
207235
err := eventUpdate.KeptnEvent.AddTemporaryData(
208236
tmpDataDistributorKey,
209237
types.AdditionalSubscriptionData{
@@ -226,17 +254,6 @@ func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate typ
226254
return nil
227255
}
228256

229-
func (cp *ControlPlane) cleanup() {
230-
cp.logger.Info("Stopping subscription source...")
231-
if err := cp.subscriptionSource.Stop(); err != nil {
232-
log.Fatalf("Unable to stop subscription source: %v", err)
233-
}
234-
cp.logger.Info("Stopping event source...")
235-
if err := cp.eventSource.Stop(); err != nil {
236-
log.Fatalf("Unable to stop event source: %v", err)
237-
}
238-
}
239-
240257
func (cp *ControlPlane) setRegistrationStatus(registered bool) {
241258
cp.mtx.Lock()
242259
defer cp.mtx.Unlock()

pkg/sdk/connector/controlplane/controlplane_test.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
108108
var eventChan chan types.EventUpdate
109109
var subsChan chan []models.EventSubscription
110110
var integrationReceivedEvent models.KeptnContextExtendedCE
111+
var subscriptionSourceStopCalled bool
112+
var eventSourceStopCalled bool
111113

112114
mtx := sync.RWMutex{}
113115
eventUpdate := types.EventUpdate{KeptnEvent: models.KeptnContextExtendedCE{ID: "some-id", Type: strutils.Stringp("sh.keptn.event.echo.triggered")}, MetaData: types.EventUpdateMetaData{Subject: "sh.keptn.event.echo.triggered"}}
@@ -119,21 +121,35 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
119121
mtx.Lock()
120122
defer mtx.Unlock()
121123
subsChan = c
124+
wg.Done()
122125
return nil
123126
},
124127
RegisterFn: func(integration models.Integration) (string, error) {
125128
return "some-id", nil
126129
},
130+
StopFn: func() error {
131+
mtx.Lock()
132+
defer mtx.Unlock()
133+
subscriptionSourceStopCalled = true
134+
return nil
135+
},
127136
}
128137
esm := &fake.EventSourceMock{
129138
StartFn: func(ctx context.Context, data types.RegistrationData, ces chan types.EventUpdate, errC chan error, wg *sync.WaitGroup) error {
130139
mtx.Lock()
131140
defer mtx.Unlock()
132141
eventChan = ces
142+
wg.Done()
133143
return nil
134144
},
135145
OnSubscriptionUpdateFn: func(strings []models.EventSubscription) {},
136146
SenderFn: func() types.EventSender { return callBackSender },
147+
StopFn: func() error {
148+
mtx.Lock()
149+
defer mtx.Unlock()
150+
eventSourceStopCalled = true
151+
return nil
152+
},
137153
}
138154
fm := &LogForwarderMock{
139155
ForwardFn: func(keptnEvent models.KeptnContextExtendedCE, integrationID string) error {
@@ -152,7 +168,8 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
152168
return nil
153169
},
154170
}
155-
go controlPlane.Register(context.TODO(), integration)
171+
ctx, cancel := context.WithCancel(context.TODO())
172+
go controlPlane.Register(ctx, integration)
156173
require.Eventually(t, func() bool {
157174
mtx.RLock()
158175
defer mtx.RUnlock()
@@ -185,6 +202,14 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
185202
},
186203
},
187204
}, eventData)
205+
206+
cancel()
207+
208+
require.Eventually(t, func() bool {
209+
mtx.RLock()
210+
defer mtx.RUnlock()
211+
return subscriptionSourceStopCalled && eventSourceStopCalled
212+
}, 5*time.Second, 100*time.Millisecond)
188213
}
189214

190215
func TestControlPlaneInboundEventIsForwardedToIntegrationWithoutLogForwarder(t *testing.T) {
@@ -527,6 +552,9 @@ func TestControlPlane_IsRegistered(t *testing.T) {
527552
RegisterFn: func(integration models.Integration) (string, error) {
528553
return "some-id", nil
529554
},
555+
StopFn: func() error {
556+
return nil
557+
},
530558
}
531559
esm := &fake.EventSourceMock{
532560
StartFn: func(ctx context.Context, data types.RegistrationData, ces chan types.EventUpdate, errC chan error, wg *sync.WaitGroup) error {
@@ -541,6 +569,9 @@ func TestControlPlane_IsRegistered(t *testing.T) {
541569
},
542570
OnSubscriptionUpdateFn: func(subscriptions []models.EventSubscription) {},
543571
SenderFn: func() types.EventSender { return callBackSender },
572+
StopFn: func() error {
573+
return nil
574+
},
544575
}
545576
fm := &LogForwarderMock{
546577
ForwardFn: func(keptnEvent models.KeptnContextExtendedCE, integrationID string) error {
@@ -598,6 +629,7 @@ func TestControlPlane_StoppedByReceivingErrEvent(t *testing.T) {
598629
defer mtx.Unlock()
599630
subsChan = subC
600631
errorC = errC
632+
wg.Done()
601633
return nil
602634
},
603635
RegisterFn: func(integration models.Integration) (string, error) {
@@ -616,6 +648,7 @@ func TestControlPlane_StoppedByReceivingErrEvent(t *testing.T) {
616648
defer mtx.Unlock()
617649
eventChan = evC
618650
errorC = errC
651+
wg.Done()
619652
return nil
620653
},
621654
OnSubscriptionUpdateFn: func(subscriptions []models.EventSubscription) {},

pkg/sdk/connector/nats/nats.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,16 +174,25 @@ func (nc *NatsConnector) Publish(event models.KeptnContextExtendedCE) error {
174174
if err != nil {
175175
return fmt.Errorf("could not connect to NATS to publish event: %w", err)
176176
}
177-
return conn.Publish(*event.Type, serializedEvent)
177+
if err := conn.Publish(*event.Type, serializedEvent); err != nil {
178+
return fmt.Errorf("could not publish message to NATS: %w", err)
179+
}
180+
return nil
178181
}
179182

180183
// Disconnect disconnects/closes the connection to NATS
181184
func (nc *NatsConnector) Disconnect() error {
182-
connection, err := nc.ensureConnection()
185+
// if we are already disconnected, there is no need to do anything
186+
if !nc.connection.IsConnected() {
187+
return nil
188+
}
189+
// call the Flush() method to make sure the payload does not stay in the buffer and will get lost if a shutdown happens
190+
nc.logger.Debug("flushing NATS buffer")
191+
err := nc.connection.Flush()
183192
if err != nil {
184-
return fmt.Errorf("could not disconnect from NATS: %w", err)
193+
nc.logger.Errorf("Could not flush connection: %v", err)
185194
}
186-
connection.Close()
195+
nc.connection.Close()
187196
return nil
188197
}
189198

pkg/sdk/connector/nats/nats_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ func TestNewFromEnv(t *testing.T) {
2525
require.NotNil(t, sub)
2626
}
2727

28-
func TestConnectFails(t *testing.T) {
28+
func TestNoConnection(t *testing.T) {
2929
nc := nats2.New("nats://something:3456")
3030
require.NotNil(t, nc)
31-
err := nc.Disconnect()
31+
err := nc.Publish(models.KeptnContextExtendedCE{})
3232
require.NotNil(t, err)
3333
}
3434

pkg/sdk/keptn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ func (k *Keptn) Start() error {
303303
// add additional waiting time to ensure the waitGroup has been increased for all events that have been received between receiving SIGTERM and this point
304304
<-time.After(5 * time.Second)
305305
wg.Wait()
306+
306307
return err
307308
}
308309

0 commit comments

Comments
 (0)