feat(eventrecorder): add Kafka output#5246
Conversation
📝 WalkthroughWalkthroughThis PR adds Kafka output support to Alertmanager's event recorder. It introduces Kafka configuration options, implements a franz-go–backed producer with bounded buffering and async dispatch, adds proto-native fast-path support for outputs, refactors shared metrics, and provides comprehensive tests using an in-process fake Kafka cluster. ChangesKafka Event Recorder Output
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Recorder as Recorder
participant WriteLoop as writeLoop
participant Output as Destination
Caller->>Recorder: RecordEvent(ctx, eventData)
Recorder->>Recorder: wrap with timestamp/instance
Recorder->>WriteLoop: send on bounded queue
alt queue full
Recorder->>Recorder: increment drops, return
else queue available
WriteLoop->>Output: marshalAndSend(event)
end
Caller->>Recorder: ApplyConfig(cfg)
Recorder->>WriteLoop: signal config update
WriteLoop->>WriteLoop: rebuild outputs
WriteLoop->>Recorder: ack or keep old outputs
Caller->>Recorder: Close()
Recorder->>WriteLoop: signal shutdown
WriteLoop->>WriteLoop: drain queue
WriteLoop->>Output: Close()
WriteLoop->>Recorder: done
sequenceDiagram
participant KafkaOut as KafkaOutput
participant Enqueue as enqueue()
participant Dispatcher as dispatcher goroutine
participant Producer as franz-go Producer
participant Kafka
KafkaOut->>Enqueue: SendEvent/SendProto payload
alt buffer full
Enqueue->>Enqueue: warning log + drop increment
Enqueue-->>KafkaOut: success (dropped)
else buffer has space
Enqueue->>Dispatcher: send on channel
Enqueue-->>KafkaOut: success (queued)
end
Dispatcher->>Producer: Produce record keyed by instance
Producer->>Kafka: send with topic/acks/compression
Kafka-->>Producer: callback (error/success)
Producer->>Dispatcher: update error metrics
KafkaOut->>Dispatcher: Close() signal
Dispatcher->>Dispatcher: drain remaining buffered
Dispatcher->>Producer: Flush within budget
Producer-->>Dispatcher: flush result
Dispatcher->>KafkaOut: exit
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/configuration.md`:
- Line 2098: Replace the inconsistent placeholder `<secret_url>` with the
documented placeholder `<secret>` in the configuration examples (specifically
the webhook URL entry shown as `url: <secret_url>`); locate the occurrence of
`<secret_url>` in the docs/configuration.md snippet and update it to `url:
<secret>` so it matches the schema placeholder `<secret>` used elsewhere in the
document.
In `@eventrecorder/kafka.go`:
- Around line 151-156: The synchronous startup call to client.Ping using
pingCtx/defaultKafkaPingTimeout blocks initialization; remove the blocking ping
and instead spawn a background goroutine that performs the ping with its own
context.WithTimeout, calls client.Ping inside the goroutine, logs the same
warning (using logger.Warn, "output", name, "err", pingErr) on failure, and
ensures the context cancel is called inside the goroutine to avoid leaks; keep
the main init path non-blocking and do not call cancel() immediately from the
main thread.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 7d611017-d7d4-4c31-a56b-55a1f37efc7d
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
docs/configuration.mdeventrecorder/config.goeventrecorder/eventrecorder.goeventrecorder/eventrecorder_test.goeventrecorder/kafka.goeventrecorder/kafka_test.gogo.mod
72da8ae to
590738d
Compare
Document the event recorder configuration including all supported outputs. Signed-off-by: Siavash Safi <siavash@cloudflare.com>
Add a third event recorder destination alongside file and webhook that
produces serialized events to a Kafka topic via franz-go.
Configuration is per-output under `event_recorder.outputs`:
event_recorder:
outputs:
- type: kafka
brokers: ["kafka-1:9093", "kafka-2:9093"]
topic: alertmanager.events
format: json # or "protobuf"
acks: leader # "none" | "leader" | "all"
compression: snappy # "none" | "gzip" | "snappy" | "lz4" | "zstd"
buffer_size: 1024
tls_config: { ... }
Implementation notes:
- KafkaOutput buffers events in a bounded local channel and forwards
them to franz-go's async producer. When the buffer is full, events
are dropped (counted via alertmanager_event_output_drops_total) so a
slow or unreachable broker cannot block the upstream pipeline.
- Broker unreachability at startup is logged at warn level and does
not prevent Alertmanager from starting; franz-go retries connections
in the background.
- Records use the producing instance's hostname as the message key,
keeping per-instance ordering on the same partition.
- A new optional ProtoDestination interface lets the Kafka output
receive protobuf events directly, skipping JSON serialization when
no JSON-mode destination is configured.
- JSON marshalling in marshalAndSend is now lazy: it only happens
when at least one non-proto destination needs it.
- TLS is supported via prometheus/common's TLSConfig (mTLS or
server-only). SASL is intentionally out of scope for this change
and can be added later via franz-go's kgo.SASL options.
- Idempotent writes are disabled unless acks=all is explicitly set,
to keep the default leader-ack path compatible with franz-go.
Metric changes:
- Rename alertmanager_event_webhook_drops_total ->
alertmanager_event_output_drops_total{output}, shared by webhook and
kafka outputs. This is a breaking metric rename; dashboards and
alerts referencing the old name need to be updated.
- Add alertmanager_event_kafka_produce_errors_total{output,error_type}
populated from franz-go's produce callback.
Testing:
- Unit tests use github.com/twmb/franz-go/pkg/kfake to spin an
in-process broker for JSON, protobuf, message-key, drop-on-full,
close-flush, initial-ping-failure, name-stability, and config
validation cases, plus a proto fast-path test against marshalAndSend.
Dependencies added:
github.com/twmb/franz-go
github.com/twmb/franz-go/pkg/kfake (test)
github.com/twmb/franz-go/plugin/kslog
Signed-off-by: Siavash Safi <siavash@cloudflare.com>
590738d to
510008b
Compare
Signed-off-by: Siavash Safi <siavash@cloudflare.com>
| ko.logger.Warn("Kafka event recorder flush did not complete within budget; remaining records will be dropped", | ||
| "output", ko.name, "err", err) | ||
| } | ||
| ko.client.Close() |
There was a problem hiding this comment.
This could hang. Call ko.client.LeaveGroupContext() before
There was a problem hiding this comment.
Our client is a producer and not a consumer, therefore it is not joining any groups and calling LeaveGroupContext() will be a no-op.
I can push a code doc change to clarify that if you think it is necessary.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
eventrecorder/recorder.go (3)
161-169:⚠️ Potential issue | 🟠 Major | ⚡ Quick winNormalize a nil logger before wiring the recorder.
If
loggeris nil, laterlogger.Error(...)calls in output build/error paths can panic. Normalize once at construction time (Line 161) to keep error handling safe.Proposed fix
func NewRecorderFromConfig(cfg Config, instance string, logger *slog.Logger, r prometheus.Registerer) Recorder { + if logger == nil { + logger = slog.Default() + } core := &sharedRecorder{ instance: instance, logger: logger,🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@eventrecorder/recorder.go` around lines 161 - 169, NewRecorderFromConfig may receive a nil logger which leads to panics when sharedRecorder later calls logger.Error; normalize the logger at the start of NewRecorderFromConfig by replacing a nil logger with a safe default logger (e.g. the package's default slog logger) before constructing the sharedRecorder so all subsequent uses of logger in sharedRecorder, NewRecorderFromConfig and related methods are guaranteed non-nil.
364-369:⚠️ Potential issue | 🟠 Major | ⚡ Quick winStop accepting events after recorder shutdown starts.
After Line 403 closes
done,RecordEventcan still enqueue intoevents; those events may never be drained oncewriteLoopexits. Add adonecheck in the enqueue select.Proposed fix
select { + case <-r.core.done: + return case r.core.events <- writeRequest{event: wrappedEvent, eventType: eventType}: default: // Queue full; drop event to avoid blocking alertmanager. r.core.metrics.eventsDropped.WithLabelValues(eventType).Inc() }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@eventrecorder/recorder.go` around lines 364 - 369, RecordEvent can still enqueue into r.core.events after shutdown begins because there is no check for r.done; add a non-blocking case on the recorder's done channel in the same select where you send to r.core.events (the block with r.core.events <- writeRequest{...}) so that if <-r.done is ready you skip enqueuing (and increment r.core.metrics.eventsDropped.WithLabelValues(eventType).Inc() or return immediately) to ensure no events are left stranded when writeLoop exits; update the select inside RecordEvent to include case <-r.done to stop accepting events once shutdown starts.
319-324:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon’t abort fan-out after a JSON marshal failure.
Returning at Line 323 makes delivery order-dependent: later proto destinations are skipped if a JSON destination was encountered first and marshal failed. Skip only JSON-path deliveries and continue iterating outputs.
Proposed fix
func (c *sharedRecorder) marshalAndSend(req writeRequest, outputs []Destination) { var ( jsonData []byte jsonMarshalErr error jsonMarshalled bool + jsonErrLogged bool ) @@ if jsonMarshalErr != nil { - c.metrics.eventSerializeErrors.WithLabelValues(req.eventType).Inc() - c.logger.Error("Failed to marshal event", "event_type", req.eventType, "err", jsonMarshalErr) - // Don't try further JSON destinations with bad data. - return + if !jsonErrLogged { + c.metrics.eventSerializeErrors.WithLabelValues(req.eventType).Inc() + c.logger.Error("Failed to marshal event", "event_type", req.eventType, "err", jsonMarshalErr) + jsonErrLogged = true + } + // Skip JSON delivery for this output, but keep fan-out alive + // for remaining outputs (including proto destinations). + continue }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@eventrecorder/recorder.go` around lines 319 - 324, The current JSON-marshal error path in recorder.go logs and returns, which aborts the entire fan-out and skips later outputs; change the behavior so that when jsonMarshalErr != nil you increment c.metrics.eventSerializeErrors, log the error (using c.logger.Error with "event_type" and the error), and then continue to the next output instead of returning—i.e., replace the return with a continue (or otherwise skip only the JSON delivery branch in the loop that handles req/eventType/jsonMarshalErr) so proto destinations are still attempted.
🧹 Nitpick comments (3)
eventrecorder/webhook_test.go (1)
224-228: ⚡ Quick winAdd malformed-URL cases to webhook YAML validation tests.
The table covers missing
url, but not invalid present values (e.g., empty/relative URL). Adding those cases will lock in fail-fast validation behavior.Suggested test additions
{ name: "missing url", yaml: "type: webhook\n", wantErr: true, }, + { + name: "empty url", + yaml: "type: webhook\nurl: \"\"\n", + wantErr: true, + }, + { + name: "relative url", + yaml: "type: webhook\nurl: /hook\n", + wantErr: true, + }, }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@eventrecorder/webhook_test.go` around lines 224 - 228, Add additional negative test entries to the webhook YAML validation table (the test cases slice in webhook_test.go that currently includes the "missing url" case) to cover malformed URL values: include cases where yaml has url: "" (empty string), url: "/relative/path" (relative URL), and url: "ht!tp://bad" or another syntactically invalid URL; for each new case set wantErr: true so the parser/validator (the code exercised by the test harness that parses webhook YAML into the webhook config struct) fails fast on invalid url values.eventrecorder/events_test.go (1)
64-75: ⚡ Quick winAssert deterministic label ordering explicitly.
This test currently validates content but not the sorted order guarantee. Please assert the exact
proto.Labelssequence so ordering regressions are caught.Proposed test update
func TestLabelSetAsProto(t *testing.T) { ls := model.LabelSet{"foo": "bar", "baz": "qux"} proto := LabelSetAsProto(ls) require.Len(t, proto.Labels, 2) - found := map[string]string{} - for _, lp := range proto.Labels { - found[lp.Key] = lp.Value - } - require.Equal(t, "bar", found["foo"]) - require.Equal(t, "qux", found["baz"]) + require.Equal(t, "baz", proto.Labels[0].Key) + require.Equal(t, "qux", proto.Labels[0].Value) + require.Equal(t, "foo", proto.Labels[1].Key) + require.Equal(t, "bar", proto.Labels[1].Value) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@eventrecorder/events_test.go` around lines 64 - 75, Test LabelSetAsProto currently checks contents but not order; update the TestLabelSetAsProto test to assert the exact sequence of proto.Labels so ordering regressions are caught: after calling LabelSetAsProto(ls) assert proto.Labels length is 2 and then assert proto.Labels[0].Key/Value and proto.Labels[1].Key/Value match the expected deterministic order (use the expected sorted keys/values for the LabelSet input) rather than only comparing via a map.eventrecorder/kafka.go (1)
367-376: ⚡ Quick winDowngrade shutdown enqueue race: recorder design prevents concurrent
SendEvent/SendProtowithKafkaOutput.Close()The
selectinKafkaOutput.enqueuecan indeed choose theko.work <- valuecase even afterko.doneis closed, but in this codebaseout.SendEvent/out.SendProtoare only called fromsharedRecorder.writeLoop, andout.Close()happens later (whenwriteLoopstops sending). So the “post-close enqueue” scenario needed to strand events shouldn’t occur in normal operation.Optional: if
KafkaOutputis ever used directly with concurrentSend*calls whileClose()runs, then the proposed mutex/closed-guard would be appropriate.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@eventrecorder/kafka.go` around lines 367 - 376, The enqueue race can allow ko.work <- value to succeed after ko.done is closed; add a guard on KafkaOutput to prevent enqueues after Close by introducing a mutex-protected "closed" boolean: in Close() acquire lock, set closed=true and close ko.done (and ko.work if currently closed semantics require), and in enqueue() acquire a read-lock, return error if closed is true, then proceed to the existing select (ko.work, ko.done, default) so no send happens once Close set closed; reference KafkaOutput.enqueue, KafkaOutput.Close, and fields ko.work, ko.done, ko.drops, ko.logger, ko.name when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@eventrecorder/webhook.go`:
- Around line 34-38: The validateWebhook method currently only checks for nil
URL pointer; update Output.validateWebhook to also reject empty or non-absolute
URLs by dereferencing o.URL, returning an error if the string is empty, and
parsing it with net/url (e.g. url.Parse) to ensure it has a valid scheme and
host (or IsAbs) before returning nil. Use the Output.URL field and
validateWebhook function name to locate the logic and return descriptive errors
for empty and non-absolute URL cases.
---
Outside diff comments:
In `@eventrecorder/recorder.go`:
- Around line 161-169: NewRecorderFromConfig may receive a nil logger which
leads to panics when sharedRecorder later calls logger.Error; normalize the
logger at the start of NewRecorderFromConfig by replacing a nil logger with a
safe default logger (e.g. the package's default slog logger) before constructing
the sharedRecorder so all subsequent uses of logger in sharedRecorder,
NewRecorderFromConfig and related methods are guaranteed non-nil.
- Around line 364-369: RecordEvent can still enqueue into r.core.events after
shutdown begins because there is no check for r.done; add a non-blocking case on
the recorder's done channel in the same select where you send to r.core.events
(the block with r.core.events <- writeRequest{...}) so that if <-r.done is ready
you skip enqueuing (and increment
r.core.metrics.eventsDropped.WithLabelValues(eventType).Inc() or return
immediately) to ensure no events are left stranded when writeLoop exits; update
the select inside RecordEvent to include case <-r.done to stop accepting events
once shutdown starts.
- Around line 319-324: The current JSON-marshal error path in recorder.go logs
and returns, which aborts the entire fan-out and skips later outputs; change the
behavior so that when jsonMarshalErr != nil you increment
c.metrics.eventSerializeErrors, log the error (using c.logger.Error with
"event_type" and the error), and then continue to the next output instead of
returning—i.e., replace the return with a continue (or otherwise skip only the
JSON delivery branch in the loop that handles req/eventType/jsonMarshalErr) so
proto destinations are still attempted.
---
Nitpick comments:
In `@eventrecorder/events_test.go`:
- Around line 64-75: Test LabelSetAsProto currently checks contents but not
order; update the TestLabelSetAsProto test to assert the exact sequence of
proto.Labels so ordering regressions are caught: after calling
LabelSetAsProto(ls) assert proto.Labels length is 2 and then assert
proto.Labels[0].Key/Value and proto.Labels[1].Key/Value match the expected
deterministic order (use the expected sorted keys/values for the LabelSet input)
rather than only comparing via a map.
In `@eventrecorder/kafka.go`:
- Around line 367-376: The enqueue race can allow ko.work <- value to succeed
after ko.done is closed; add a guard on KafkaOutput to prevent enqueues after
Close by introducing a mutex-protected "closed" boolean: in Close() acquire
lock, set closed=true and close ko.done (and ko.work if currently closed
semantics require), and in enqueue() acquire a read-lock, return error if closed
is true, then proceed to the existing select (ko.work, ko.done, default) so no
send happens once Close set closed; reference KafkaOutput.enqueue,
KafkaOutput.Close, and fields ko.work, ko.done, ko.drops, ko.logger, ko.name
when making the change.
In `@eventrecorder/webhook_test.go`:
- Around line 224-228: Add additional negative test entries to the webhook YAML
validation table (the test cases slice in webhook_test.go that currently
includes the "missing url" case) to cover malformed URL values: include cases
where yaml has url: "" (empty string), url: "/relative/path" (relative URL), and
url: "ht!tp://bad" or another syntactically invalid URL; for each new case set
wantErr: true so the parser/validator (the code exercised by the test harness
that parses webhook YAML into the webhook config struct) fails fast on invalid
url values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 58f4a560-7735-463f-8b06-f179baa12fc3
📒 Files selected for processing (12)
eventrecorder/config.goeventrecorder/events.goeventrecorder/events_test.goeventrecorder/file.goeventrecorder/file_test.goeventrecorder/kafka.goeventrecorder/kafka_test.goeventrecorder/metrics.goeventrecorder/recorder.goeventrecorder/recorder_test.goeventrecorder/webhook.goeventrecorder/webhook_test.go
| func (o *Output) validateWebhook() error { | ||
| if o.URL == nil { | ||
| return errors.New("event_recorder webhook output requires a url") | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Reject empty/non-absolute webhook URLs at config load time.
validateWebhook only checks for nil, so values like url: "" or relative URLs pass unmarshal and fail later during sends. Validate URL shape here to fail fast.
Proposed fix
func (o *Output) validateWebhook() error {
if o.URL == nil {
return errors.New("event_recorder webhook output requires a url")
}
+ u, err := url.Parse(o.URL.String())
+ if err != nil || u.Scheme == "" || u.Host == "" {
+ return errors.New("event_recorder webhook output requires an absolute url")
+ }
return nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (o *Output) validateWebhook() error { | |
| if o.URL == nil { | |
| return errors.New("event_recorder webhook output requires a url") | |
| } | |
| return nil | |
| func (o *Output) validateWebhook() error { | |
| if o.URL == nil { | |
| return errors.New("event_recorder webhook output requires a url") | |
| } | |
| u, err := url.Parse(o.URL.String()) | |
| if err != nil || u.Scheme == "" || u.Host == "" { | |
| return errors.New("event_recorder webhook output requires an absolute url") | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@eventrecorder/webhook.go` around lines 34 - 38, The validateWebhook method
currently only checks for nil URL pointer; update Output.validateWebhook to also
reject empty or non-absolute URLs by dereferencing o.URL, returning an error if
the string is empty, and parsing it with net/url (e.g. url.Parse) to ensure it
has a valid scheme and host (or IsAbs) before returning nil. Use the Output.URL
field and validateWebhook function name to locate the logic and return
descriptive errors for empty and non-absolute URL cases.
There was a problem hiding this comment.
This is a pre-existing bug, we only moved the code, I can fix it in a separate PR or here.
There was a problem hiding this comment.
@siavashs — thanks for the context! Either approach is reasonable. Since validateWebhook is already being restructured in this PR, fixing it here would be the lowest-friction option and keeps the validation complete from the start. That said, if you'd prefer to keep this PR's scope tight, I'm happy to open a follow-up issue to track it. Just let me know which you'd prefer!
This seems to be missing |
Ah, that was for my local tests but it leaked into the commit message. |
| `--enable-feature=event-recorder` on the command line to activate it. | ||
| When the flag is not set, the recorder silently discards all events. | ||
|
|
||
| Event recording is configured under the top-level `event_recorder` key. |
There was a problem hiding this comment.
Is this enough? Or should there be a section like:
# Optional event recorder configuration
[ event_recorder: <event_recorder_config> ]
in the top level section?
|
Maybe add a note about topic creation to the docs (required if |
your call, either add it or remove the text claiming it's there ;) |
Add a third event recorder destination alongside file and webhook that
produces serialized events to a Kafka topic via franz-go.
Configuration is per-output under
event_recorder.outputs:Implementation notes:
KafkaOutputbuffers events in a bounded local channel and forwards them to franz-go's async producer. When the buffer is full, events are dropped (counted viaalertmanager_event_output_drops_total) so a slow or unreachable broker cannot block the upstream pipeline.Metric changes:
alertmanager_event_webhook_drops_total->alertmanager_event_output_drops_total{output}, shared by webhook and kafka outputs. This is a breaking metric rename; dashboards and alerts referencing the old name need to be updated.alertmanager_event_kafka_produce_errors_total{output,error_type}populated from franz-go's produce callback.Testing:
Dependencies added:
Pull Request Checklist
Please check all the applicable boxes.
benchstatto compare benchmarksWhich user-facing changes does this PR introduce?
Summary by CodeRabbit
New Features
Documentation