Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `otelriver` option `EnableWorkSpanJobKindSuffix` which appends the job kind a suffix to work spans so they look like `river.work/my_job` instead of `river.work`.

## [0.3.0] - 2025-04-14

### Added
Expand Down
10 changes: 6 additions & 4 deletions otelriver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ The middleware supports these options:

``` go
middleware := otelriver.NewMiddleware(&MiddlewareConfig{
DurationUnit: "ms",
EnableSemanticMetrics: true,
MeterProvider: meterProvider,
TracerProvider: tracerProvider,
DurationUnit: "ms",
EnableSemanticMetrics: true,
EnableWorkSpanJobKindSuffix: true,
MeterProvider: meterProvider,
TracerProvider: tracerProvider,
})
```

* `DurationUnit`: The unit which durations are emitted as, either "ms" (milliseconds) or "s" (seconds). Defaults to seconds.
* `EnableSemanticMetrics`: Causes the middleware to emit metrics compliant with OpenTelemetry's ["semantic conventions"](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/) for message clients. This has the effect of having all messaging systems share the same common metric names, with attributes differentiating them.
* `EnableWorkSpanJobKindSuffix`: Appends the job kind a suffix to work spans so they look like `river.work/my_job` instead of `river.work`.
* `MeterProvider`: Injected OpenTelemetry meter provider. The global meter provider is used by default.
* `TracerProvider`: Injected OpenTelemetry tracer provider. The global tracer provider is used by default.

Expand Down
11 changes: 10 additions & 1 deletion otelriver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type MiddlewareConfig struct {
// metric names, with attributes differentiating them.
EnableSemanticMetrics bool

// EnableWorkSpanJobKindSuffix appends the job kind a suffix to work spans
// so they look like `river.work/my_job` instead of `river.work`.
EnableWorkSpanJobKindSuffix bool

// MeterProvider is a MeterProvider to base metrics on. May be left as nil
// to use the default global provider.
MeterProvider metric.MeterProvider
Expand Down Expand Up @@ -179,7 +183,12 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
}

func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
ctx, span := m.tracer.Start(ctx, prefix+"work",
spanName := prefix + "work"
if m.config.EnableWorkSpanJobKindSuffix {
spanName += "/" + job.Kind
}

ctx, span := m.tracer.Start(ctx, spanName,
trace.WithSpanKind(trace.SpanKindConsumer))
defer span.End()

Expand Down
24 changes: 24 additions & 0 deletions otelriver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,30 @@ func TestMiddleware(t *testing.T) {
require.Equal(t, "s", metric.Unit)
}
})

t.Run("WorkEnableWorkSpanJobKindSuffix ", func(t *testing.T) {
t.Parallel()

middleware, bundle := setupConfig(t, &MiddlewareConfig{
EnableWorkSpanJobKindSuffix: true,
})

doInner := func(ctx context.Context) error {
return nil
}

err := middleware.Work(ctx, &rivertype.JobRow{
ID: 123,
Kind: "no_op",
}, doInner)
require.NoError(t, err)

spans := bundle.traceExporter.GetSpans()
require.Len(t, spans, 1)

span := spans[0]
require.Equal(t, "river.work/no_op", span.Name)
})
}

func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value {
Expand Down
Loading