Skip to content

Commit

Permalink
Fix bug in kafka producer that broke on key serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Mar 30, 2024
1 parent 1a2e80f commit 4e57f2d
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 14 deletions.
15 changes: 3 additions & 12 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,9 @@ impl ArrowOperator for KafkaSinkFunc {
async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut ArrowContext) {
let values = self.serializer.serialize(&batch);

if let Some(key_indices) = &ctx.in_schemas[0].key_indices {
let k = batch.project(key_indices).unwrap();

// TODO: we can probably batch this for better performance
for (k, v) in self.serializer.serialize(&k).zip(values) {
self.publish(Some(k), v, ctx).await;
}
} else {
for v in values {
self.publish(None, v, ctx).await;
}
};
for v in values {
self.publish(None, v, ctx).await;
}
}

async fn handle_checkpoint(&mut self, _: CheckpointBarrier, ctx: &mut ArrowContext) {
Expand Down
2 changes: 0 additions & 2 deletions crates/arroyo-operator/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ impl ArrowContext {
if self.buffer.as_ref().unwrap().size() > 0 {
let buffer = self.buffer.take().unwrap();
let batch = buffer.finish();
println!("{}\t{}", batch.num_rows(), batch.get_array_memory_size());
self.collector.collect(batch).await;
self.buffer = Some(ContextBuffer::new(
self.out_schema.as_ref().map(|t| t.schema.clone()).unwrap(),
Expand All @@ -570,7 +569,6 @@ impl ArrowContext {
if let Some(buffer) = deserializer.flush_buffer() {
match buffer {
Ok(batch) => {
println!("{}\t{}", batch.num_rows(), batch.get_array_memory_size());
self.collector.collect(batch).await;
}
Err(e) => {
Expand Down

0 comments on commit 4e57f2d

Please sign in to comment.