-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat(ingestion): adds kafka consumer breadcrumbs #31201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR adds Kafka consumer breadcrumbs to track event flow through the ingestion pipeline, helping identify potential sources of event duplication (currently affecting 3-4% of events).
- Adds
kafka_consumer_breadcrumbs
array to event properties containing topic, offset, partition, timestamp, and consumer_id for each processing step - Preserves existing breadcrumbs when an event passes through multiple consumers in the pipeline
- Implements comprehensive test coverage verifying breadcrumb structure and content
- Enables tracing of events across all potential destinations (main pipeline, DLQ, historical, overflow)
- Maintains the breadcrumb chain when events are reprocessed, providing a complete audit trail
💡 (3/5) Reply to the bot's comments like "Can you suggest a fix for this @greptileai?" or ask follow-up questions!
2 file(s) reviewed, no comment(s)
Edit PR Review Bot Settings | Greptile
? eventProperties.kafka_consumer_breadcrumbs | ||
: [] | ||
|
||
// Store the breadcrumb in event properties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: We should store the breadcrumbs outside of the event properties, as we don't really want to expose them to our customers.
IMO, they should be stored in a separate column in Clickhouse, so they should be at the top level of the produced Kafka message. It will be easier to aggregate them later into a single array of breadcrumbs for querying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, sounds good!
ba41180
to
aff712d
Compare
kafkaMessages.map((message) => | ||
this.kafkaOverflowProducer!.produce({ | ||
incomingEvents.map(({ message, event }) => { | ||
const { data: dataStr, ...rawEvent } = parseJSON(message.value!.toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Is there a way we could do it without adding another pair of JSON parse/stringify calls? JSON serialization in Node is quite expensive and we might see a visible bump in CPU usage if we do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd have to pass the rawEvent
object from parseKafkaBatch
to this point in the code. Then I should be able to avoid the const { data: dataStr, ...rawEvent } = parseJSON(message.value!.toString())
line, but pretty sure there's no way around having to make the two stringify calls, as I need to reserialize the modified data so I can send it.
I checked to see if I could just reconstruct rawEvent
from the event passed into emitToOverflow
and unfortunately we are overwriting some of the fields that we need in the normalizeEvent
step...(the ip field)
Similar stringify calls are made when we do the emitEvent calls, in other cases where we've changed the event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the breadcrumbs to the message headers in subsequent commit. Don't have to deser/ser the entire message anymore to tack on these breadcrumbs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to get a 2nd pair of 👁️ from @pl but this LGTM 👍
@@ -97,6 +106,7 @@ describe('IngestionConsumer', () => { | |||
beforeEach(async () => { | |||
fixedTime = DateTime.fromObject({ year: 2025, month: 1, day: 1 }, { zone: 'UTC' }) | |||
jest.spyOn(Date, 'now').mockReturnValue(fixedTime.toMillis()) | |||
jest.spyOn(Date.prototype, 'toISOString').mockReturnValue(fixedTime.toISO()!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Jest has mocks for everything time-related: https://jestjs.io/docs/timer-mocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, my tests were stalling when I tried using useFakeTimers()
not sure why....so i opted to use this spyOn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's leave it as it is then – there might be a process.nextTick
call or something like that, but it looks like it's not worth the effort.
if (validatedBreadcrumbs.success) { | ||
existingBreadcrumbs.push(...validatedBreadcrumbs.data) | ||
} else { | ||
console.log('yes') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoopps, just rm'ed
Important
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
Problem
We are potentially duplicating up to 3-4% of events being ingested into clickhouse. To figure out where in our ingestion/processing pipeline these events are being duplicated, we'd like to add some breadcrumbs to help with the analysis.
The portion of our pipeline we need observability into is the following:
[client] -> [capture api] -> [events_plugin_ingestion topic] -> [ingestion-events consumer] -> [events_plugin_ingestion_* topic] -> [ingestion-events-* consumer]
(NOTE: * can be any of dlq, historical, or overflow. These are potential destinations other than the clickhouse topic. We need to ensure we track duplicates across these as well)
Client -> Capture API
A
now
timestamp is already being set on the event when it arrives at the server. If this timestamp differs between two events with an otherwise unique identifer, then the client sent the event multiple times.Capture API -> ingestion-topic or ingestion-consumer -> ingestion-topic
The
offset
in the breadcrumb should indicate whether or not a producer enqueued a single event to a kafka topic multiple times. A kafka topic is also written with the breadcrumb, so we know which topic was being produced to when the duplication happens.ingestion-topic -> ingestion-consumer
The
processed_at
timestamp that we set in the breadcrumb will allow us to understand if our consumer de-queued the same event off the ingestion-topic twice. A kafka topic and consumer id is also written with the breadcrumb, so we know which topic was being consumed from and the consumer group id.Changes
Adds a
kafka_consumer_breadcrumbs
array toevent.properties
while processing events in our ingestion consumers. This array is a list of objects with the following structure: {topic, offset, partition, processed_at, consumer_id}.How did you test this code?
Added unit tests and updated the snapshots