Skip to content

feat(ingestion): adds kafka consumer breadcrumbs #31201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

nickbest-ph
Copy link
Contributor

@nickbest-ph nickbest-ph commented Apr 14, 2025

Important

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

Problem

We are potentially duplicating up to 3-4% of events being ingested into clickhouse. To figure out where in our ingestion/processing pipeline these events are being duplicated, we'd like to add some breadcrumbs to help with the analysis.

The portion of our pipeline we need observability into is the following:

[client] -> [capture api] -> [events_plugin_ingestion topic] -> [ingestion-events consumer] -> [events_plugin_ingestion_* topic] -> [ingestion-events-* consumer]

(NOTE: * can be any of dlq, historical, or overflow. These are potential destinations other than the clickhouse topic. We need to ensure we track duplicates across these as well)

Client -> Capture API

A now timestamp is already being set on the event when it arrives at the server. If this timestamp differs between two events with an otherwise unique identifer, then the client sent the event multiple times.

Capture API -> ingestion-topic or ingestion-consumer -> ingestion-topic
The offset in the breadcrumb should indicate whether or not a producer enqueued a single event to a kafka topic multiple times. A kafka topic is also written with the breadcrumb, so we know which topic was being produced to when the duplication happens.

ingestion-topic -> ingestion-consumer
The processed_at timestamp that we set in the breadcrumb will allow us to understand if our consumer de-queued the same event off the ingestion-topic twice. A kafka topic and consumer id is also written with the breadcrumb, so we know which topic was being consumed from and the consumer group id.

Changes

Adds a kafka_consumer_breadcrumbs array to event.properties while processing events in our ingestion consumers. This array is a list of objects with the following structure: {topic, offset, partition, processed_at, consumer_id}.

How did you test this code?

Added unit tests and updated the snapshots

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR adds Kafka consumer breadcrumbs to track event flow through the ingestion pipeline, helping identify potential sources of event duplication (currently affecting 3-4% of events).

  • Adds kafka_consumer_breadcrumbs array to event properties containing topic, offset, partition, timestamp, and consumer_id for each processing step
  • Preserves existing breadcrumbs when an event passes through multiple consumers in the pipeline
  • Implements comprehensive test coverage verifying breadcrumb structure and content
  • Enables tracing of events across all potential destinations (main pipeline, DLQ, historical, overflow)
  • Maintains the breadcrumb chain when events are reprocessed, providing a complete audit trail

💡 (3/5) Reply to the bot's comments like "Can you suggest a fix for this @greptileai?" or ask follow-up questions!

2 file(s) reviewed, no comment(s)
Edit PR Review Bot Settings | Greptile

@nickbest-ph nickbest-ph assigned tkaemming and unassigned tkaemming Apr 14, 2025
@nickbest-ph nickbest-ph requested a review from a team April 14, 2025 19:15
@nickbest-ph nickbest-ph self-assigned this Apr 14, 2025
@nickbest-ph nickbest-ph requested a review from tkaemming April 14, 2025 19:45
@nickbest-ph nickbest-ph changed the title feat(ingestion):adds kafka consumer breadcrumbs feat(ingestion): adds kafka consumer breadcrumbs Apr 14, 2025
? eventProperties.kafka_consumer_breadcrumbs
: []

// Store the breadcrumb in event properties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: We should store the breadcrumbs outside of the event properties, as we don't really want to expose them to our customers.

IMO, they should be stored in a separate column in Clickhouse, so they should be at the top level of the produced Kafka message. It will be easier to aggregate them later into a single array of breadcrumbs for querying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, sounds good!

@nickbest-ph nickbest-ph requested review from pl and a team April 14, 2025 23:51
@nickbest-ph nickbest-ph force-pushed the feat/kafka-offset-breadcrumbs branch from ba41180 to aff712d Compare April 15, 2025 21:49
kafkaMessages.map((message) =>
this.kafkaOverflowProducer!.produce({
incomingEvents.map(({ message, event }) => {
const { data: dataStr, ...rawEvent } = parseJSON(message.value!.toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is there a way we could do it without adding another pair of JSON parse/stringify calls? JSON serialization in Node is quite expensive and we might see a visible bump in CPU usage if we do this.

Copy link
Contributor Author

@nickbest-ph nickbest-ph Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd have to pass the rawEvent object from parseKafkaBatch to this point in the code. Then I should be able to avoid the const { data: dataStr, ...rawEvent } = parseJSON(message.value!.toString()) line, but pretty sure there's no way around having to make the two stringify calls, as I need to reserialize the modified data so I can send it.

I checked to see if I could just reconstruct rawEvent from the event passed into emitToOverflow and unfortunately we are overwriting some of the fields that we need in the normalizeEvent step...(the ip field)

Similar stringify calls are made when we do the emitEvent calls, in other cases where we've changed the event.

Copy link
Contributor Author

@nickbest-ph nickbest-ph Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the breadcrumbs to the message headers in subsequent commit. Don't have to deser/ser the entire message anymore to tack on these breadcrumbs

@nickbest-ph nickbest-ph requested a review from pl April 16, 2025 21:35
Copy link
Contributor

@eli-r-ph eli-r-ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to get a 2nd pair of 👁️ from @pl but this LGTM 👍

@@ -97,6 +106,7 @@ describe('IngestionConsumer', () => {
beforeEach(async () => {
fixedTime = DateTime.fromObject({ year: 2025, month: 1, day: 1 }, { zone: 'UTC' })
jest.spyOn(Date, 'now').mockReturnValue(fixedTime.toMillis())
jest.spyOn(Date.prototype, 'toISOString').mockReturnValue(fixedTime.toISO()!)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Jest has mocks for everything time-related: https://jestjs.io/docs/timer-mocks

Copy link
Contributor Author

@nickbest-ph nickbest-ph Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, my tests were stalling when I tried using useFakeTimers() not sure why....so i opted to use this spyOn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's leave it as it is then – there might be a process.nextTick call or something like that, but it looks like it's not worth the effort.

if (validatedBreadcrumbs.success) {
existingBreadcrumbs.push(...validatedBreadcrumbs.data)
} else {
console.log('yes')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoopps, just rm'ed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants