Skip to content

Conversation

@brandur
Copy link
Contributor

@brandur brandur commented Mar 8, 2025

Here, add OpenTelemetry support for River with a middleware that will do
instrumentation on job insertion and job working.

The impetus is that we've had multiple requests for DataDog support now.
I wrote an equivalent middleware using DataDog's Go APIs, but found them
to be fairly unwieldy, with lots and lots of dependencies [1], and
DataDog appears to have first class OpenTelemetry support anyway.

There's also some potential for more sophisticated functionality here
like being able to trace a job in a distributed sense from when it's
inserted to when it's worked. I've opted to punt on that for now so we
can get a basic version of this together for testing sooner rather than
later, and under the principle of not developing too many features
prospectively that we don't know whether people will actually find
useful or not (let's see what people say about it).

[1] https://x.com/brandur/status/1898147112837357762

@brandur brandur force-pushed the brandur-open-telemetry branch 3 times, most recently from eaa1a8b to da59664 Compare March 8, 2025 03:16
@brandur brandur requested a review from bgentry March 8, 2025 03:18
@brandur
Copy link
Contributor Author

brandur commented Mar 8, 2025

@bgentry Thoughts on this? I'm kind of thinking minimal feature set for now, and then we can expand on it more as we go.

setup := func(t *testing.T) (*Middleware, *testBundle) {
t.Helper()

exporter := tracetest.NewInMemoryExporter()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oddly, they include tracetest for testing traces, but there's no equivalent metrictest for testing metrics. Sigh. It probably doesn't matter that much though as at least all the code paths get well exercised.

Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
TestOnly: true, // suitable only for use in tests; remove for live environments
WorkerMiddleware: []rivertype.WorkerMiddleware{
middleware,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still want to tool around on this API a bit before releasing because it's fairly unsightly to need this multiple install thing like this.

"github.com/riverqueue/rivercontrib/riveropentelemetry"
)

func Example_middleware() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really simple example so I don't need to bring in any database helpers right now. Just demonstrate basic installation and that's it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to think of what else might be useful to show in here. You could execute a job and show that there's an OTEL trace ID, but you can't output it while having predictable output for the test assertion 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeahhh ... I kinda figured that if I was the one reading docs, the main thing I'd want to know is just how to bootstrap this stuff with respect to River specifically. It might be semi useful to show a job being worked, but that'd be a distant secondary nicety. The trivial example isn't great, but it's better than no example.

I also added a separate package for DataDog that basically just shows you how to use this OTEL middleware but with a DataDog provider initialized. I think that's the kind of thing people will be most interested to know how to do.

@brandur
Copy link
Contributor Author

brandur commented Mar 8, 2025

I guess we could just put this in the main repo too instead of starting a separate rivercontrib. Hopefully the individual packages would insulate us from dependency hypergrowth, and it would make releases a bit easier.

Middleware: []rivertype.Middleware{
// Install the OpenTelemetry middleware to run for all jobs inserted
// or worked by this River client.
riveropentelemetry.NewMiddleware(nil),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bgentry CI's going to crash and burn, but I rebuilt this one based on the API in riverqueue/river#804 to show the code improvement for middlewares like this one.

Comment on lines 41 to 49
replace github.com/riverqueue/river => ../../river

replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../river/riverdriver/riverpgxv5

replace github.com/riverqueue/river/rivershared => ../../river/rivershared

replace github.com/riverqueue/river/rivertype => ../../river/rivertype
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leaving a note to not merge these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, yep, good point. I'll change them to @latest for now, but we should cut a release relatively soon.

Comment on lines 1 to 3
// Package riveropentelemetry provides OpenTelemetry utilities for the River job
// queue.
package riveropentelemetry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should definitely call this otelriver to be idiomatic. Had ChatGPT do a search for some of the most popular Go OpenTelemetry instrumentation package names:

  • otelhttp – Instrumentation for Go’s standard net/http package.
  • otelgrpc – Instrumentation for gRPC.
  • otelgin – Instrumentation for the Gin web framework.
  • otelmux – Instrumentation for the Gorilla Mux router.
  • otelecho – Instrumentation for the Echo web framework.
  • otelsql – Instrumentation for database/sql operations.
  • otelredis – Instrumentation for the go-redis client.
  • otelsarama – Instrumentation for Shopify’s Sarama Kafka client.
  • otelelasticsearch – Instrumentation for the Elasticsearch Go client.
  • otelpgx – Instrumentation for the pgx PostgreSQL driver.
  • otelmongo – Instrumentation for the official MongoDB Go driver.
  • otelnats – Instrumentation for the NATS messaging client.
  • otelchi – Instrumentation for the Chi HTTP router.
  • otelaws – Instrumentation for the AWS SDK for Go.
  • otelzap – Integration that connects Zap logging with OpenTelemetry.
  • otelfiber – Instrumentation for the Fiber web framework.
  • otelruntime – Instrumentation for capturing Go runtime metrics.
  • otellogrus – Integration for the Logrus logging library.
  • otelcasbin – Instrumentation for the Casbin authorization library.
  • otelxorm – Instrumentation for the XORM ORM.
  • otelkit – Instrumentation support for the go-kit toolkit.
  • otelsqlx – Instrumentation for the sqlx library built on top of database/sql.
  • otelsqlite – (If available) Instrumentation for SQLite drivers.
  • otelgocql – Instrumentation for the gocql Cassandra driver.
  • otelsqlmysql – (If available) Instrumentation tailored for MySQL drivers.

Also there's otelpgx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, man I kinda hate "OTEL" because it's just such an unnecessary acronym to obfuscate "OpenTelemetry" for no particularly good reason, but yeah, if the convention is so widespread, it makes sense. I'll change this around.

Comment on lines 71 to 77
mustInt64Counter := func(name string, options ...metric.Int64CounterOption) metric.Int64Counter {
metric, err := meter.Int64Counter(name, options...)
if err != nil {
panic(err)
}
return metric
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a panic in this context appropriate vs propagating the error? I'm not sure what kind of Meter interface implementations there are out there but it wouldn't surprise me if some of them could error

Copy link
Contributor Author

@brandur brandur Mar 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I strongly suspect this is what one of those theoretical errors that doesn't really manifest in reality. I took a look at the OTEL implementation to see under what conditions it errors, and unsurprisingly, it doesn't:

func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	if m.delegate != nil {
		return m.delegate.Int64Counter(name, options...)
	}

	cfg := metric.NewInt64CounterConfig(options...)
	id := instID{
		name:        name,
		kind:        reflect.TypeOf((*siCounter)(nil)),
		description: cfg.Description(),
		unit:        cfg.Unit(),
	}
	if f, ok := m.instruments[id]; ok {
		return f.(metric.Int64Counter), nil
	}
	i := &siCounter{name: name, opts: options}
	m.instruments[id] = i
	return i, nil
}

I tend to think that panics are better under conditions like these because:

  1. It's a huge boost to UX.
  2. Even if there was an error, it'd likely be configuration related, and you'd want to know right away instead of having to wait for some runtime problem to happen.

When I say it's a UX improvement, I'm talking about places like this:

	_, err := river.NewClient(riverpgxv5.New(nil), &river.Config{
		Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Middleware: []rivertype.Middleware{
			// Install the OpenTelemetry middleware to run for all jobs inserted
			// or worked by this River client.
			otelriver.NewMiddleware(nil),
		},
		TestOnly: true, // suitable only for use in tests; remove for live environments
	})
	if err != nil {
		panic(err)
	}

So you can inline your new instance instead of having to initializing it separately and reference it.

I tend to think Go has made a number of mistakes in the area of constructors-returning-errors over the years, with a notable one being http.NewRequest — it could theoretically return an error maybe, but when was the last time that happened? Even the Go core guys became annoyed enough with it that they introduced httptest.NewRequest so they could have a variant that doesn't return an error and have to be checked all the time.

I'm kind of thinking that in case this were to ever be a problem (and I'm guessing it never will be), what we could do is maybe introduce a NewMiddlewareSafely variant or something, kinda like we do for the worker bundle. Thoughts?

Comment on lines 109 to 113
attributes := []attribute.KeyValue{
attribute.Int("attempt", job.Attempt),
attribute.String("kind", job.Kind),
attribute.String("queue", job.Queue),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it you intentionally omitted the job.ID from here due to cardinality concerns? I think it's the right thing to do by default, though I wonder if we want to make it an option to add it. Could easily do that later I guess!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah exactly. I admit I was waffling a bit on this one.

At Stripe, I'm 100% sure that DataDog told us that we had to change some of our metrics because we were using tags whose cardinality was too great and it was causing big grief for their systems. However, as I was replying to DataDog/dd-trace-go#3245 I was searching for a reference to this in their public docs, and I couldn't find one.

So it's possible it's not in the docs, but it's also possible this was a legacy problem (this was 5+ years ago at least), and it just isn't one anymore.

I figured I'd play it a bit safer and leave out ID, but I don't know, I could see it either way.

"github.com/riverqueue/rivercontrib/riveropentelemetry"
)

func Example_middleware() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to think of what else might be useful to show in here. You could execute a job and show that there's an OTEL trace ID, but you can't output it while having predictable output for the test assertion 🤔

@brandur brandur force-pushed the brandur-open-telemetry branch from 0a709ab to 282b383 Compare March 15, 2025 20:16
@brandur brandur requested a review from bgentry March 15, 2025 21:14
@brandur
Copy link
Contributor Author

brandur commented Mar 15, 2025

@bgentry Arg, alright this one probably still isn't perfect, but I think we've gotten the broad strokes right, and given this isn't a super core part of the product, I figure it might be kinda nice to at least get an early version of it out in front of people, and then adjust things from there. I've added a datadogriver subpackage that shows some examples of use of DataDog + OTEL as well. Mind taking a look at this one again?

Copy link
Contributor

@bgentry bgentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM aside from possible filename issue and linter not running, agreed on getting this in early and iterating. Maybe want to add a note about its early status to readme or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filename with , instead of ., is that intentional because this isn't a passing executable test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah whoops. No that's an accident. Good catch.

@brandur brandur force-pushed the brandur-open-telemetry branch from 282b383 to 6d2f554 Compare March 16, 2025 18:27
Here, add OpenTelemetry support for River with a middleware that will do
instrumentation on job insertion and job working.

The impetus is that we've had multiple requests for DataDog support now.
I wrote an equivalent middleware using DataDog's Go APIs, but found them
to be fairly unwieldy, with lots and lots of dependencies [1], and
DataDog appears to have first class OpenTelemetry support anyway.

There's also some potential for more sophisticated functionality here
like being able to trace a job in a distributed sense from when it's
inserted to when it's worked. I've opted to punt on that for now so we
can get a basic version of this together for testing sooner rather than
later, and under the principle of not developing too many features
prospectively that we don't know whether people will actually find
useful or not (let's see what people say about it).

[1] https://x.com/brandur/status/1898147112837357762
@brandur brandur force-pushed the brandur-open-telemetry branch from 6d2f554 to 16db9df Compare March 16, 2025 18:42
@brandur
Copy link
Contributor Author

brandur commented Mar 16, 2025

Thanks! Alright, going to merge this and flip this repo public. It still points to River master unfortunately, but we'll get that updated before being too loud about the addition.

@brandur brandur merged commit 6dc7d18 into master Mar 16, 2025
2 checks passed
@brandur brandur deleted the brandur-open-telemetry branch March 16, 2025 18:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants