Skip to content

feat(eventrecorder): add Kafka output#5246

Open
siavashs wants to merge 3 commits into
prometheus:mainfrom
siavashs:feat/event-recorder-kafka-integration
Open

feat(eventrecorder): add Kafka output#5246
siavashs wants to merge 3 commits into
prometheus:mainfrom
siavashs:feat/event-recorder-kafka-integration

Conversation

@siavashs
Copy link
Copy Markdown
Contributor

@siavashs siavashs commented May 19, 2026

Add a third event recorder destination alongside file and webhook that
produces serialized events to a Kafka topic via franz-go.

Configuration is per-output under event_recorder.outputs:

  event_recorder:
    outputs:
      - type: kafka
        brokers: ["kafka-1:9093", "kafka-2:9093"]
        topic: alertmanager.events
        format: json          # or "protobuf"
        acks: leader          # "none" | "leader" | "all"
        compression: snappy   # "none" | "gzip" | "snappy" | "lz4" | "zstd"
        buffer_size: 1024
        tls_config: { ... }

Implementation notes:

  • KafkaOutput buffers events in a bounded local channel and forwards them to franz-go's async producer. When the buffer is full, events are dropped (counted via alertmanager_event_output_drops_total) so a slow or unreachable broker cannot block the upstream pipeline.
  • Broker unreachability at startup is logged at warn level and does not prevent Alertmanager from starting; franz-go retries connections in the background.
  • Records use the producing instance's hostname as the message key, keeping per-instance ordering on the same partition.
  • A new optional ProtoDestination interface lets the Kafka output receive protobuf events directly, skipping JSON serialization when no JSON-mode destination is configured.
  • JSON marshalling in marshalAndSend is now lazy: it only happens when at least one non-proto destination needs it.
  • TLS is supported via prometheus/common's TLSConfig (mTLS or server-only). SASL is intentionally out of scope for this change and can be added later via franz-go's kgo.SASL options.
  • Idempotent writes are disabled unless acks=all is explicitly set, to keep the default leader-ack path compatible with franz-go.

Metric changes:

  • Rename alertmanager_event_webhook_drops_total -> alertmanager_event_output_drops_total{output}, shared by webhook and kafka outputs. This is a breaking metric rename; dashboards and alerts referencing the old name need to be updated.
  • Add alertmanager_event_kafka_produce_errors_total{output,error_type} populated from franz-go's produce callback.

Testing:

  • Unit tests use github.com/twmb/franz-go/pkg/kfake to spin an in-process broker for JSON, protobuf, message-key, drop-on-full, close-flush, initial-ping-failure, name-stability, and config validation cases, plus a proto fast-path test against marshalAndSend.

Dependencies added:

  • github.com/twmb/franz-go
  • github.com/twmb/franz-go/pkg/kfake (test)
  • github.com/twmb/franz-go/plugin/kslog

Pull Request Checklist

Please check all the applicable boxes.

  • Please list all open issue(s) discussed with maintainers related to this change
    • Fixes #
  • Is this a new Receiver integration?
  • Is this a bugfix?
    • I have added tests that can reproduce the bug which pass with this bugfix applied
  • Is this a new feature?
    • I have added tests that test the new feature's functionality
  • Does this change affect performance?
    • I have provided benchmarks comparison that shows performance is improved or is not degraded
      • You can use benchstat to compare benchmarks
    • I have added new benchmarks if required or requested by maintainers
  • Is this a breaking change?
    • My changes do not break the existing cluster messages
    • My changes do not break the existing api
  • I have added/updated the required documentation
  • I have signed-off my commits
  • I will follow best practices for contributing to this project

Which user-facing changes does this PR introduce?

[CHANGE] Rename alertmanager_event_webhook_drops_total  to alertmanager_event_output_drops_total{output},

Summary by CodeRabbit

  • New Features

    • Added Kafka output support for event recording alongside file and webhook options.
    • Optimized event delivery with lazy JSON serialization for protobuf-native destinations.
    • New metrics for event drops and Kafka production errors.
  • Documentation

    • Added configuration guide for event recording with file, webhook, and Kafka output types, including timeout, retry, compression, and TLS settings.

Review Change Stack

@siavashs siavashs requested a review from a team as a code owner May 19, 2026 11:37
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 19, 2026

📝 Walkthrough

Walkthrough

This PR adds Kafka output support to Alertmanager's event recorder. It introduces Kafka configuration options, implements a franz-go–backed producer with bounded buffering and async dispatch, adds proto-native fast-path support for outputs, refactors shared metrics, and provides comprehensive tests using an in-process fake Kafka cluster.

Changes

Kafka Event Recorder Output

Layer / File(s) Summary
Configuration documentation and schema
docs/configuration.md, eventrecorder/config.go
Event-recorder documentation added for file/webhook/kafka output types; Output struct extended with Kafka fields (brokers, topic, client ID, format, acks, compression, buffer, TLS); YAML unmarshalling and configEqual refactored to dispatch by output type.
Event proto conversion helpers and tests
eventrecorder/events.go, eventrecorder/events_test.go
Pure-functional converters for Alertmanager domain types (labels, alerts, silence, inhibit rules) to eventrecorderpb protobuf; event-data constructors and event-type extraction; unit tests verify proto shape and matcher type mapping.
Metrics infrastructure and counters
eventrecorder/metrics.go
Shared prometheus CounterVec metrics for events, bytes, drops, serialization errors, output-local drops, and Kafka produce errors with conditional registration for test/production usage.
Recorder core, buildOutputs, and writeLoop
eventrecorder/recorder.go, eventrecorder/recorder_test.go
Recorder handle and background writeLoop with bounded event queue, hot-reload via ApplyConfig, output construction (buildOutputs), graceful shutdown, and RecordEvent/SetClusterPeer; test doubles verify configEqual inequality cases.
Proto fast-path and event marshalling
eventrecorder/eventrecorder.go
ProtoDestination interface for proto-native outputs; OutputKafka constant; shared output-drops and kafka-produce-errors metrics; buildOutputs wiring metrics/instance; marshalAndSend rewritten to conditionally SendProto or lazily JSON-marshal and SendEvent per output type.
Kafka producer implementation
eventrecorder/kafka.go
KafkaOutput using franz-go with exported format/acks/compression constants; config validation/normalization; stable output naming; producer client construction (acks/compression/TLS); startup broker ping (warn-only); bounded enqueue with drop-on-full; single dispatcher producing asynchronously with classified error metrics; graceful Close draining and flushing.
Kafka integration and unit tests
eventrecorder/kafka_test.go
In-process fake Kafka cluster; JSON/protobuf round-trips with keying; drops when buffer full; queue drain on Close; unreachable-broker robustness; config validation; deterministic naming; broker-order independence for equality.
File/webhook validation and equality helpers
eventrecorder/file.go, eventrecorder/file_test.go, eventrecorder/webhook.go, eventrecorder/webhook_test.go
Per-output-type validation (file path required, webhook URL required) and equality comparison; YAML unmarshalling and configEqual tests for each output type.
Go module updates
go.mod
Direct dependencies added: franz-go with kfake/kslog, prometheus client_model; indirect pins updated for compression/lz4/kmsg and transitive tools/crypto/sys.

Sequence Diagram(s)

sequenceDiagram
  participant Caller
  participant Recorder as Recorder
  participant WriteLoop as writeLoop
  participant Output as Destination
  Caller->>Recorder: RecordEvent(ctx, eventData)
  Recorder->>Recorder: wrap with timestamp/instance
  Recorder->>WriteLoop: send on bounded queue
  alt queue full
    Recorder->>Recorder: increment drops, return
  else queue available
    WriteLoop->>Output: marshalAndSend(event)
  end
  Caller->>Recorder: ApplyConfig(cfg)
  Recorder->>WriteLoop: signal config update
  WriteLoop->>WriteLoop: rebuild outputs
  WriteLoop->>Recorder: ack or keep old outputs
  Caller->>Recorder: Close()
  Recorder->>WriteLoop: signal shutdown
  WriteLoop->>WriteLoop: drain queue
  WriteLoop->>Output: Close()
  WriteLoop->>Recorder: done
Loading
sequenceDiagram
  participant KafkaOut as KafkaOutput
  participant Enqueue as enqueue()
  participant Dispatcher as dispatcher goroutine
  participant Producer as franz-go Producer
  participant Kafka
  KafkaOut->>Enqueue: SendEvent/SendProto payload
  alt buffer full
    Enqueue->>Enqueue: warning log + drop increment
    Enqueue-->>KafkaOut: success (dropped)
  else buffer has space
    Enqueue->>Dispatcher: send on channel
    Enqueue-->>KafkaOut: success (queued)
  end
  Dispatcher->>Producer: Produce record keyed by instance
  Producer->>Kafka: send with topic/acks/compression
  Kafka-->>Producer: callback (error/success)
  Producer->>Dispatcher: update error metrics
  KafkaOut->>Dispatcher: Close() signal
  Dispatcher->>Dispatcher: drain remaining buffered
  Dispatcher->>Producer: Flush within budget
  Producer-->>Dispatcher: flush result
  Dispatcher->>KafkaOut: exit
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • prometheus/alertmanager#5190: Refactors the event recorder config and output types that this PR extends with Kafka support.
  • prometheus/alertmanager#5189: Introduces foundational event recorder configuration structures that this PR builds upon for Kafka integration.
  • prometheus/alertmanager#5072: Adds the initial event recorder implementation whose output/config/dispatch logic this PR extends with proto fast-path and Kafka producer support.

Suggested reviewers

  • Spaceman1701
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 59.70% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding Kafka output support to the event recorder. It follows the required format 'area: description' and directly corresponds to the primary objective of this changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request description is comprehensive, including configuration examples, implementation details, metric changes, testing approach, and dependencies. It clearly documents the feature, breaking changes, and follows most template guidelines.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/configuration.md`:
- Line 2098: Replace the inconsistent placeholder `<secret_url>` with the
documented placeholder `<secret>` in the configuration examples (specifically
the webhook URL entry shown as `url: <secret_url>`); locate the occurrence of
`<secret_url>` in the docs/configuration.md snippet and update it to `url:
<secret>` so it matches the schema placeholder `<secret>` used elsewhere in the
document.

In `@eventrecorder/kafka.go`:
- Around line 151-156: The synchronous startup call to client.Ping using
pingCtx/defaultKafkaPingTimeout blocks initialization; remove the blocking ping
and instead spawn a background goroutine that performs the ping with its own
context.WithTimeout, calls client.Ping inside the goroutine, logs the same
warning (using logger.Warn, "output", name, "err", pingErr) on failure, and
ensures the context cancel is called inside the goroutine to avoid leaks; keep
the main init path non-blocking and do not call cancel() immediately from the
main thread.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 7d611017-d7d4-4c31-a56b-55a1f37efc7d

📥 Commits

Reviewing files that changed from the base of the PR and between f493986 and 72da8ae.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • docs/configuration.md
  • eventrecorder/config.go
  • eventrecorder/eventrecorder.go
  • eventrecorder/eventrecorder_test.go
  • eventrecorder/kafka.go
  • eventrecorder/kafka_test.go
  • go.mod

Comment thread docs/configuration.md Outdated
Comment thread eventrecorder/kafka.go Outdated
@siavashs siavashs force-pushed the feat/event-recorder-kafka-integration branch from 72da8ae to 590738d Compare May 19, 2026 11:47
siavashs added 2 commits May 19, 2026 13:52
Document the event recorder configuration including all supported outputs.

Signed-off-by: Siavash Safi <siavash@cloudflare.com>
Add a third event recorder destination alongside file and webhook that
produces serialized events to a Kafka topic via franz-go.

Configuration is per-output under `event_recorder.outputs`:

  event_recorder:
    outputs:
      - type: kafka
        brokers: ["kafka-1:9093", "kafka-2:9093"]
        topic: alertmanager.events
        format: json          # or "protobuf"
        acks: leader          # "none" | "leader" | "all"
        compression: snappy   # "none" | "gzip" | "snappy" | "lz4" | "zstd"
        buffer_size: 1024
        tls_config: { ... }

Implementation notes:

- KafkaOutput buffers events in a bounded local channel and forwards
  them to franz-go's async producer.  When the buffer is full, events
  are dropped (counted via alertmanager_event_output_drops_total) so a
  slow or unreachable broker cannot block the upstream pipeline.
- Broker unreachability at startup is logged at warn level and does
  not prevent Alertmanager from starting; franz-go retries connections
  in the background.
- Records use the producing instance's hostname as the message key,
  keeping per-instance ordering on the same partition.
- A new optional ProtoDestination interface lets the Kafka output
  receive protobuf events directly, skipping JSON serialization when
  no JSON-mode destination is configured.
- JSON marshalling in marshalAndSend is now lazy: it only happens
  when at least one non-proto destination needs it.
- TLS is supported via prometheus/common's TLSConfig (mTLS or
  server-only).  SASL is intentionally out of scope for this change
  and can be added later via franz-go's kgo.SASL options.
- Idempotent writes are disabled unless acks=all is explicitly set,
  to keep the default leader-ack path compatible with franz-go.

Metric changes:

- Rename alertmanager_event_webhook_drops_total ->
  alertmanager_event_output_drops_total{output}, shared by webhook and
  kafka outputs.  This is a breaking metric rename; dashboards and
  alerts referencing the old name need to be updated.
- Add alertmanager_event_kafka_produce_errors_total{output,error_type}
  populated from franz-go's produce callback.

Testing:

- Unit tests use github.com/twmb/franz-go/pkg/kfake to spin an
  in-process broker for JSON, protobuf, message-key, drop-on-full,
  close-flush, initial-ping-failure, name-stability, and config
  validation cases, plus a proto fast-path test against marshalAndSend.

Dependencies added:
  github.com/twmb/franz-go
  github.com/twmb/franz-go/pkg/kfake (test)
  github.com/twmb/franz-go/plugin/kslog

Signed-off-by: Siavash Safi <siavash@cloudflare.com>
@siavashs siavashs force-pushed the feat/event-recorder-kafka-integration branch from 590738d to 510008b Compare May 19, 2026 11:54
Signed-off-by: Siavash Safi <siavash@cloudflare.com>
Comment thread eventrecorder/kafka.go
ko.logger.Warn("Kafka event recorder flush did not complete within budget; remaining records will be dropped",
"output", ko.name, "err", err)
}
ko.client.Close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could hang. Call ko.client.LeaveGroupContext() before

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our client is a producer and not a consumer, therefore it is not joining any groups and calling LeaveGroupContext() will be a no-op.
I can push a code doc change to clarify that if you think it is necessary.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
eventrecorder/recorder.go (3)

161-169: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Normalize a nil logger before wiring the recorder.

If logger is nil, later logger.Error(...) calls in output build/error paths can panic. Normalize once at construction time (Line 161) to keep error handling safe.

Proposed fix
 func NewRecorderFromConfig(cfg Config, instance string, logger *slog.Logger, r prometheus.Registerer) Recorder {
+	if logger == nil {
+		logger = slog.Default()
+	}
 	core := &sharedRecorder{
 		instance:  instance,
 		logger:    logger,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@eventrecorder/recorder.go` around lines 161 - 169, NewRecorderFromConfig may
receive a nil logger which leads to panics when sharedRecorder later calls
logger.Error; normalize the logger at the start of NewRecorderFromConfig by
replacing a nil logger with a safe default logger (e.g. the package's default
slog logger) before constructing the sharedRecorder so all subsequent uses of
logger in sharedRecorder, NewRecorderFromConfig and related methods are
guaranteed non-nil.

364-369: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Stop accepting events after recorder shutdown starts.

After Line 403 closes done, RecordEvent can still enqueue into events; those events may never be drained once writeLoop exits. Add a done check in the enqueue select.

Proposed fix
 	select {
+	case <-r.core.done:
+		return
 	case r.core.events <- writeRequest{event: wrappedEvent, eventType: eventType}:
 	default:
 		// Queue full; drop event to avoid blocking alertmanager.
 		r.core.metrics.eventsDropped.WithLabelValues(eventType).Inc()
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@eventrecorder/recorder.go` around lines 364 - 369, RecordEvent can still
enqueue into r.core.events after shutdown begins because there is no check for
r.done; add a non-blocking case on the recorder's done channel in the same
select where you send to r.core.events (the block with r.core.events <-
writeRequest{...}) so that if <-r.done is ready you skip enqueuing (and
increment r.core.metrics.eventsDropped.WithLabelValues(eventType).Inc() or
return immediately) to ensure no events are left stranded when writeLoop exits;
update the select inside RecordEvent to include case <-r.done to stop accepting
events once shutdown starts.

319-324: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don’t abort fan-out after a JSON marshal failure.

Returning at Line 323 makes delivery order-dependent: later proto destinations are skipped if a JSON destination was encountered first and marshal failed. Skip only JSON-path deliveries and continue iterating outputs.

Proposed fix
 func (c *sharedRecorder) marshalAndSend(req writeRequest, outputs []Destination) {
 	var (
 		jsonData       []byte
 		jsonMarshalErr error
 		jsonMarshalled bool
+		jsonErrLogged  bool
 	)
@@
 		if jsonMarshalErr != nil {
-			c.metrics.eventSerializeErrors.WithLabelValues(req.eventType).Inc()
-			c.logger.Error("Failed to marshal event", "event_type", req.eventType, "err", jsonMarshalErr)
-			// Don't try further JSON destinations with bad data.
-			return
+			if !jsonErrLogged {
+				c.metrics.eventSerializeErrors.WithLabelValues(req.eventType).Inc()
+				c.logger.Error("Failed to marshal event", "event_type", req.eventType, "err", jsonMarshalErr)
+				jsonErrLogged = true
+			}
+			// Skip JSON delivery for this output, but keep fan-out alive
+			// for remaining outputs (including proto destinations).
+			continue
 		}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@eventrecorder/recorder.go` around lines 319 - 324, The current JSON-marshal
error path in recorder.go logs and returns, which aborts the entire fan-out and
skips later outputs; change the behavior so that when jsonMarshalErr != nil you
increment c.metrics.eventSerializeErrors, log the error (using c.logger.Error
with "event_type" and the error), and then continue to the next output instead
of returning—i.e., replace the return with a continue (or otherwise skip only
the JSON delivery branch in the loop that handles req/eventType/jsonMarshalErr)
so proto destinations are still attempted.
🧹 Nitpick comments (3)
eventrecorder/webhook_test.go (1)

224-228: ⚡ Quick win

Add malformed-URL cases to webhook YAML validation tests.

The table covers missing url, but not invalid present values (e.g., empty/relative URL). Adding those cases will lock in fail-fast validation behavior.

Suggested test additions
 		{
 			name:    "missing url",
 			yaml:    "type: webhook\n",
 			wantErr: true,
 		},
+		{
+			name:    "empty url",
+			yaml:    "type: webhook\nurl: \"\"\n",
+			wantErr: true,
+		},
+		{
+			name:    "relative url",
+			yaml:    "type: webhook\nurl: /hook\n",
+			wantErr: true,
+		},
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@eventrecorder/webhook_test.go` around lines 224 - 228, Add additional
negative test entries to the webhook YAML validation table (the test cases slice
in webhook_test.go that currently includes the "missing url" case) to cover
malformed URL values: include cases where yaml has url: "" (empty string), url:
"/relative/path" (relative URL), and url: "ht!tp://bad" or another syntactically
invalid URL; for each new case set wantErr: true so the parser/validator (the
code exercised by the test harness that parses webhook YAML into the webhook
config struct) fails fast on invalid url values.
eventrecorder/events_test.go (1)

64-75: ⚡ Quick win

Assert deterministic label ordering explicitly.

This test currently validates content but not the sorted order guarantee. Please assert the exact proto.Labels sequence so ordering regressions are caught.

Proposed test update
 func TestLabelSetAsProto(t *testing.T) {
 	ls := model.LabelSet{"foo": "bar", "baz": "qux"}
 	proto := LabelSetAsProto(ls)

 	require.Len(t, proto.Labels, 2)
-	found := map[string]string{}
-	for _, lp := range proto.Labels {
-		found[lp.Key] = lp.Value
-	}
-	require.Equal(t, "bar", found["foo"])
-	require.Equal(t, "qux", found["baz"])
+	require.Equal(t, "baz", proto.Labels[0].Key)
+	require.Equal(t, "qux", proto.Labels[0].Value)
+	require.Equal(t, "foo", proto.Labels[1].Key)
+	require.Equal(t, "bar", proto.Labels[1].Value)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@eventrecorder/events_test.go` around lines 64 - 75, Test LabelSetAsProto
currently checks contents but not order; update the TestLabelSetAsProto test to
assert the exact sequence of proto.Labels so ordering regressions are caught:
after calling LabelSetAsProto(ls) assert proto.Labels length is 2 and then
assert proto.Labels[0].Key/Value and proto.Labels[1].Key/Value match the
expected deterministic order (use the expected sorted keys/values for the
LabelSet input) rather than only comparing via a map.
eventrecorder/kafka.go (1)

367-376: ⚡ Quick win

Downgrade shutdown enqueue race: recorder design prevents concurrent SendEvent/SendProto with KafkaOutput.Close()

The select in KafkaOutput.enqueue can indeed choose the ko.work <- value case even after ko.done is closed, but in this codebase out.SendEvent/out.SendProto are only called from sharedRecorder.writeLoop, and out.Close() happens later (when writeLoop stops sending). So the “post-close enqueue” scenario needed to strand events shouldn’t occur in normal operation.

Optional: if KafkaOutput is ever used directly with concurrent Send* calls while Close() runs, then the proposed mutex/closed-guard would be appropriate.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@eventrecorder/kafka.go` around lines 367 - 376, The enqueue race can allow
ko.work <- value to succeed after ko.done is closed; add a guard on KafkaOutput
to prevent enqueues after Close by introducing a mutex-protected "closed"
boolean: in Close() acquire lock, set closed=true and close ko.done (and ko.work
if currently closed semantics require), and in enqueue() acquire a read-lock,
return error if closed is true, then proceed to the existing select (ko.work,
ko.done, default) so no send happens once Close set closed; reference
KafkaOutput.enqueue, KafkaOutput.Close, and fields ko.work, ko.done, ko.drops,
ko.logger, ko.name when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@eventrecorder/webhook.go`:
- Around line 34-38: The validateWebhook method currently only checks for nil
URL pointer; update Output.validateWebhook to also reject empty or non-absolute
URLs by dereferencing o.URL, returning an error if the string is empty, and
parsing it with net/url (e.g. url.Parse) to ensure it has a valid scheme and
host (or IsAbs) before returning nil. Use the Output.URL field and
validateWebhook function name to locate the logic and return descriptive errors
for empty and non-absolute URL cases.

---

Outside diff comments:
In `@eventrecorder/recorder.go`:
- Around line 161-169: NewRecorderFromConfig may receive a nil logger which
leads to panics when sharedRecorder later calls logger.Error; normalize the
logger at the start of NewRecorderFromConfig by replacing a nil logger with a
safe default logger (e.g. the package's default slog logger) before constructing
the sharedRecorder so all subsequent uses of logger in sharedRecorder,
NewRecorderFromConfig and related methods are guaranteed non-nil.
- Around line 364-369: RecordEvent can still enqueue into r.core.events after
shutdown begins because there is no check for r.done; add a non-blocking case on
the recorder's done channel in the same select where you send to r.core.events
(the block with r.core.events <- writeRequest{...}) so that if <-r.done is ready
you skip enqueuing (and increment
r.core.metrics.eventsDropped.WithLabelValues(eventType).Inc() or return
immediately) to ensure no events are left stranded when writeLoop exits; update
the select inside RecordEvent to include case <-r.done to stop accepting events
once shutdown starts.
- Around line 319-324: The current JSON-marshal error path in recorder.go logs
and returns, which aborts the entire fan-out and skips later outputs; change the
behavior so that when jsonMarshalErr != nil you increment
c.metrics.eventSerializeErrors, log the error (using c.logger.Error with
"event_type" and the error), and then continue to the next output instead of
returning—i.e., replace the return with a continue (or otherwise skip only the
JSON delivery branch in the loop that handles req/eventType/jsonMarshalErr) so
proto destinations are still attempted.

---

Nitpick comments:
In `@eventrecorder/events_test.go`:
- Around line 64-75: Test LabelSetAsProto currently checks contents but not
order; update the TestLabelSetAsProto test to assert the exact sequence of
proto.Labels so ordering regressions are caught: after calling
LabelSetAsProto(ls) assert proto.Labels length is 2 and then assert
proto.Labels[0].Key/Value and proto.Labels[1].Key/Value match the expected
deterministic order (use the expected sorted keys/values for the LabelSet input)
rather than only comparing via a map.

In `@eventrecorder/kafka.go`:
- Around line 367-376: The enqueue race can allow ko.work <- value to succeed
after ko.done is closed; add a guard on KafkaOutput to prevent enqueues after
Close by introducing a mutex-protected "closed" boolean: in Close() acquire
lock, set closed=true and close ko.done (and ko.work if currently closed
semantics require), and in enqueue() acquire a read-lock, return error if closed
is true, then proceed to the existing select (ko.work, ko.done, default) so no
send happens once Close set closed; reference KafkaOutput.enqueue,
KafkaOutput.Close, and fields ko.work, ko.done, ko.drops, ko.logger, ko.name
when making the change.

In `@eventrecorder/webhook_test.go`:
- Around line 224-228: Add additional negative test entries to the webhook YAML
validation table (the test cases slice in webhook_test.go that currently
includes the "missing url" case) to cover malformed URL values: include cases
where yaml has url: "" (empty string), url: "/relative/path" (relative URL), and
url: "ht!tp://bad" or another syntactically invalid URL; for each new case set
wantErr: true so the parser/validator (the code exercised by the test harness
that parses webhook YAML into the webhook config struct) fails fast on invalid
url values.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 58f4a560-7735-463f-8b06-f179baa12fc3

📥 Commits

Reviewing files that changed from the base of the PR and between 510008b and 3f8dfd2.

📒 Files selected for processing (12)
  • eventrecorder/config.go
  • eventrecorder/events.go
  • eventrecorder/events_test.go
  • eventrecorder/file.go
  • eventrecorder/file_test.go
  • eventrecorder/kafka.go
  • eventrecorder/kafka_test.go
  • eventrecorder/metrics.go
  • eventrecorder/recorder.go
  • eventrecorder/recorder_test.go
  • eventrecorder/webhook.go
  • eventrecorder/webhook_test.go

Comment thread eventrecorder/webhook.go
Comment on lines +34 to +38
func (o *Output) validateWebhook() error {
if o.URL == nil {
return errors.New("event_recorder webhook output requires a url")
}
return nil
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject empty/non-absolute webhook URLs at config load time.

validateWebhook only checks for nil, so values like url: "" or relative URLs pass unmarshal and fail later during sends. Validate URL shape here to fail fast.

Proposed fix
 func (o *Output) validateWebhook() error {
 	if o.URL == nil {
 		return errors.New("event_recorder webhook output requires a url")
 	}
+	u, err := url.Parse(o.URL.String())
+	if err != nil || u.Scheme == "" || u.Host == "" {
+		return errors.New("event_recorder webhook output requires an absolute url")
+	}
 	return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (o *Output) validateWebhook() error {
if o.URL == nil {
return errors.New("event_recorder webhook output requires a url")
}
return nil
func (o *Output) validateWebhook() error {
if o.URL == nil {
return errors.New("event_recorder webhook output requires a url")
}
u, err := url.Parse(o.URL.String())
if err != nil || u.Scheme == "" || u.Host == "" {
return errors.New("event_recorder webhook output requires an absolute url")
}
return nil
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@eventrecorder/webhook.go` around lines 34 - 38, The validateWebhook method
currently only checks for nil URL pointer; update Output.validateWebhook to also
reject empty or non-absolute URLs by dereferencing o.URL, returning an error if
the string is empty, and parsing it with net/url (e.g. url.Parse) to ensure it
has a valid scheme and host (or IsAbs) before returning nil. Use the Output.URL
field and validateWebhook function name to locate the logic and return
descriptive errors for empty and non-absolute URL cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-existing bug, we only moved the code, I can fix it in a separate PR or here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siavashs — thanks for the context! Either approach is reasonable. Since validateWebhook is already being restructured in this PR, fixing it here would be the lowest-friction option and keeps the validation complete from the start. That said, if you'd prefer to keep this PR's scope tight, I'm happy to open a follow-up issue to track it. Just let me know which you'd prefer!

@TheMeier
Copy link
Copy Markdown
Contributor

A docker-compose example under examples/kafka/ provides a single-node Apache Kafka (KRaft) broker for manual end-to-end verification with a matching alertmanager.yml.

This seems to be missing

@siavashs
Copy link
Copy Markdown
Contributor Author

A docker-compose example under examples/kafka/ provides a single-node Apache Kafka (KRaft) broker for manual end-to-end verification with a matching alertmanager.yml.

This seems to be missing

Ah, that was for my local tests but it leaked into the commit message.
Do you want me to push that?

Comment thread docs/configuration.md
`--enable-feature=event-recorder` on the command line to activate it.
When the flag is not set, the recorder silently discards all events.

Event recording is configured under the top-level `event_recorder` key.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this enough? Or should there be a section like:

# Optional event recorder configuration
[ event_recorder: <event_recorder_config> ]

in the top level section?

@TheMeier
Copy link
Copy Markdown
Contributor

Maybe add a note about topic creation to the docs (required if auto.create.topics.enable is not set?)

@TheMeier
Copy link
Copy Markdown
Contributor

A docker-compose example under examples/kafka/ provides a single-node Apache Kafka (KRaft) broker for manual end-to-end verification with a matching alertmanager.yml.

This seems to be missing

Ah, that was for my local tests but it leaked into the commit message. Do you want me to push that?

your call, either add it or remove the text claiming it's there ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants