Skip to content

Commit

Permalink
Update comments in KafkaConsumer (#1354)
Browse files Browse the repository at this point in the history
Follow-up to #1333.
  • Loading branch information
svix-jplatte authored Jul 1, 2024
2 parents 464abe1 + 9621abd commit 8fd6e00
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions bridge/svix-bridge-plugin-kafka/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,8 @@ impl KafkaConsumer {
.expect("create_consumer task panicked")?;

loop {
// FIXME(jplatte): I don't know if StreamConsumer::recv has an internal buffer.
// Overall, rdkafka seems to be doing a bunch of background magic so maybe it does.
// In that case, it's likely already doing batching reads (which don't seem to be
// a thing in the public API) internally.
// If not, we should likely do some sort of batching ourselves, e.g. have two separate
// tokio tasks, one which pulls messages from Kafka and one that processes them, with
// a bounded channel in between for backpressure.
// It's fine to pull messages one-by-one without any buffering in our own code because
// rdkafka buffers messages internally through a background task / thread.
let msg = consumer.recv().await?;
tracing::debug!("Received a message");

Expand Down Expand Up @@ -178,12 +173,11 @@ impl KafkaConsumer {
}
}

// FIXME(jplatte): Unlike recv above, this seems less likely to be auto-coalesced
// internally in rdkafka so maybe we should introduce our own logic to only commit
// after N messages to reduce unnecessary back and forth on the Kafka connection,
// or unnecessary disk writes inside Kafka (messages in Kafka are not committed
// individually, rather what this call does is update the stored stream position
// for the consumer group).
// FIXME(jplatte): Should we introduce logic to only commit every N messages to reduce
// back and forth on the Kafka connection / disk writes inside Kafka?
//
// Background: messages in Kafka are not committed individually, rather what this call
// does is update the stored stream position for the consumer group.
consumer.commit_message(&msg, CommitMode::Async)?;
}
}
Expand Down

0 comments on commit 8fd6e00

Please sign in to comment.