Skip to content

Commit

Permalink
feat: add NATS reconnect handling (#1419)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aenimus authored Dec 5, 2024
1 parent d6f3ac5 commit babb47c
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion router-tests/structured_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRouterStartLogs(t *testing.T) {
},
}, func(t *testing.T, xEnv *testenv.Environment) {
logEntries := xEnv.Observer().All()
require.Len(t, logEntries, 11)
require.Len(t, logEntries, 13)
natsLogs := xEnv.Observer().FilterMessageSnippet("Nats Event source enabled").All()
require.Len(t, natsLogs, 4)
providerIDFields := xEnv.Observer().FilterField(zap.String("provider_id", "default")).All()
Expand Down
25 changes: 21 additions & 4 deletions router/core/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,36 @@ func (b *ExecutorConfigurationBuilder) Build(ctx context.Context, opts *Executor
}

func buildNatsOptions(eventSource config.NatsEventSource, logger *zap.Logger) ([]nats.Option, error) {

opts := []nats.Option{
nats.Name(fmt.Sprintf("cosmo.router.edfs.nats.%s", eventSource.ID)),
nats.ReconnectJitter(500*time.Millisecond, 2*time.Second),
nats.ClosedHandler(func(conn *nats.Conn) {
logger.Info("NATS connection closed", zap.String("provider_id", eventSource.ID), zap.Error(conn.LastError()))
}),
nats.ConnectHandler(func(nc *nats.Conn) {
logger.Info("url", zap.String("url", nc.ConnectedUrlRedacted()))
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
if err != nil {
logger.Error("NATS disconnected; will attempt to reconnect", zap.Error(err), zap.String("provider_id", eventSource.ID))
} else {
logger.Info("NATS disconnected", zap.String("provider_id", eventSource.ID))
}
}),
nats.ErrorHandler(func(conn *nats.Conn, subscription *nats.Subscription, err error) {

if errors.Is(err, nats.ErrSlowConsumer) {
logger.Warn(
"Nats slow consumer detected. Events are being dropped. Please consider increasing the buffer size or reducing the number of messages being sent.",
"NATS slow consumer detected. Events are being dropped. Please consider increasing the buffer size or reducing the number of messages being sent.",
zap.Error(err),
zap.String("provider_id", eventSource.ID),
)
} else {
logger.Error("nats error", zap.Error(err))
logger.Error("NATS error", zap.Error(err))
}
}),
nats.ReconnectHandler(func(conn *nats.Conn) {
logger.Info("NATS reconnected", zap.String("provider_id", eventSource.ID), zap.String("url", conn.ConnectedUrlRedacted()))
}),
}

if eventSource.Authentication != nil {
Expand Down
2 changes: 1 addition & 1 deletion router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func NewRouter(opts ...Option) (*Router, error) {
}

for _, source := range r.eventsConfig.Providers.Nats {
r.logger.Info("Nats Event source enabled", zap.String("provider_id", source.ID), zap.String("url", source.URL))
r.logger.Info("Nats Event source enabled", zap.String("provider_id", source.ID))
}
for _, source := range r.eventsConfig.Providers.Kafka {
r.logger.Info("Kafka Event source enabled", zap.String("provider_id", source.ID), zap.Strings("brokers", source.Brokers))
Expand Down
5 changes: 4 additions & 1 deletion router/pkg/pubsub/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ func (p *natsPubSub) Shutdown(ctx context.Context) error {
err = errors.Join(err, fErr)
}

p.conn.Close()
drainErr := p.conn.Drain()
if drainErr != nil {
p.logger.Error("error draining NATS connection", zap.Error(drainErr))
}

// Wait for all subscriptions to be closed
p.closeWg.Wait()
Expand Down

0 comments on commit babb47c

Please sign in to comment.