From c22f71e15cc45e6363f3836ca851a63b616fb361 Mon Sep 17 00:00:00 2001 From: Pouyan Heyratpour Date: Sun, 29 Sep 2024 01:42:51 +0330 Subject: [PATCH] feat(handywares): nats middleware stack * panic recover * otel Signed-off-by: Pouyan Heyratpour --- handywares/go.mod | 5 ++ handywares/go.sum | 10 ++++ handywares/nats.go | 111 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+) create mode 100644 handywares/nats.go diff --git a/handywares/go.mod b/handywares/go.mod index 700c4fc..c92486b 100644 --- a/handywares/go.mod +++ b/handywares/go.mod @@ -9,6 +9,7 @@ require ( github.com/hibiken/asynq v0.24.1 github.com/janstoon/toolbox/bricks v0.7.2 github.com/janstoon/toolbox/tricks v0.10.0 + github.com/nats-io/nats.go v1.37.0 github.com/rs/cors v1.11.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.29.0 @@ -33,14 +34,18 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/redis/go-redis/v9 v9.6.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/cast v1.7.0 // indirect go.mongodb.org/mongo-driver v1.16.1 // indirect + golang.org/x/crypto v0.27.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect diff --git a/handywares/go.sum b/handywares/go.sum index c46d1b8..692e994 100644 --- a/handywares/go.sum +++ b/handywares/go.sum @@ -55,6 +55,8 @@ github.com/janstoon/toolbox/tricks v0.10.0 h1:cz+y7f6OWm7MU5alsd2s+jEtReDSPNbBG2 github.com/janstoon/toolbox/tricks v0.10.0/go.mod h1:G5vKiYk5opiGDy9aYLmIVZ5pZYNwI5UGrw9gCtI7WlU= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -66,6 +68,12 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -98,6 +106,8 @@ go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/handywares/nats.go b/handywares/nats.go new file mode 100644 index 0000000..b717c33 --- /dev/null +++ b/handywares/nats.go @@ -0,0 +1,111 @@ +package handywares + +import ( + "context" + "errors" + "fmt" + "runtime/debug" + "strings" + + "github.com/janstoon/toolbox/bricks" + "github.com/janstoon/toolbox/tricks" + "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel/trace" +) + +type NatsMsgHandler func(ctx context.Context, msg *nats.Msg) error + +type NatsMiddlewareStack = tricks.MiddlewareStack[NatsMsgHandler] + +type PanicRecoverNatsMiddlewareOpt = tricks.InPlaceOption[any] + +func NatsPanicRecoverMiddleware(options ...PanicRecoverAsynqMiddlewareOpt) tricks.Middleware[NatsMsgHandler] { + return func(next NatsMsgHandler) NatsMsgHandler { + return func(ctx context.Context, msg *nats.Msg) error { + defer func() { + if r := recover(); r != nil { + span := trace.SpanFromContext(ctx) + span.AddEvent("panic recovered", trace.WithAttributes( + oaPanicValue.String(fmt.Sprintf("%+v", r)), + oaDebugStack.String(string(debug.Stack())), + )) + } + }() + + return next(ctx, msg) + } + } +} + +type CompensatorNatsMiddlewareOpt = tricks.InPlaceOption[any] + +// NatsCompensatorMiddleware tries to compensate the task if error is not bricks.ErrRetryable. +// It searches for a bricks.Compensator in returned error by msg handler and runs the first one. +func NatsCompensatorMiddleware(options ...CompensatorNatsMiddlewareOpt) tricks.Middleware[NatsMsgHandler] { + return func(next NatsMsgHandler) NatsMsgHandler { + return func(ctx context.Context, msg *nats.Msg) error { + err := next(ctx, msg) + if err == nil { + return nil + } + + if !errors.Is(err, bricks.ErrRetryable) { + var c bricks.Compensator + if errors.As(err, &c) { + err = errors.Join(err, c.Compensate(ctx, err)) + } + } + + return err + } + } +} + +type OtelNmw struct { + tracer trace.Tracer + + namePrefix string +} + +type OpenTelemetryNatsMiddlewareOpt = tricks.Option[OtelNmw] + +func OtelNatsSpanNamePrefix(prefix string) OpenTelemetryNatsMiddlewareOpt { + return tricks.OutOfPlaceOption[OtelNmw](func(nmw OtelNmw) OtelNmw { + nmw.namePrefix = prefix + + return nmw + }) +} + +func NatsOpenTelemetryMiddleware( + tracer trace.Tracer, options ...OpenTelemetryNatsMiddlewareOpt, +) tricks.Middleware[NatsMsgHandler] { + amw := &OtelNmw{ + tracer: tracer, + } + amw = tricks.ApplyOptions(amw, options...) + + return amw.builder +} + +func (nmw OtelNmw) builder(next NatsMsgHandler) NatsMsgHandler { + return func(ctx context.Context, msg *nats.Msg) error { + var span trace.Span + ctx, span = nmw.tracer.Start(ctx, nmw.spanName(msg.Subject), trace.WithSpanKind(trace.SpanKindConsumer)) + defer span.End() + + return next(ctx, msg) + } +} + +func (nmw OtelNmw) spanName(subject string) string { + sb := strings.Builder{} + if len(strings.TrimSpace(nmw.namePrefix)) > 0 { + sb.WriteString(nmw.namePrefix) + sb.WriteRune('/') + } + + sb.WriteString(subject) + + return sb.String() +}