Skip to content

Commit

Permalink
Add support for setting the key and timestamp for messages written to…
Browse files Browse the repository at this point in the history
… kafka (ArroyoSystems#698)
  • Loading branch information
mwylde authored Jul 31, 2024
1 parent e8af9f5 commit c076bfe
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 25 deletions.
34 changes: 21 additions & 13 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ impl KafkaConnector {
Some("exactly_once") => SinkCommitMode::ExactlyOnce,
Some(other) => bail!("invalid value for commit_mode '{}'", other),
},
timestamp_field: options.remove("sink.timestamp_field"),
key_field: options.remove("sink.key_field"),
}
}
_ => {
Expand Down Expand Up @@ -382,19 +384,25 @@ impl Connector for KafkaConnector {
.unwrap(),
})))
}
TableType::Sink { commit_mode } => {
Ok(OperatorNode::from_operator(Box::new(KafkaSinkFunc {
bootstrap_servers: profile.bootstrap_servers.to_string(),
producer: None,
consistency_mode: (*commit_mode).into(),
write_futures: vec![],
client_config: client_configs(&profile, &table),
topic: table.topic,
serializer: ArrowSerializer::new(
config.format.expect("Format must be defined for KafkaSink"),
),
})))
}
TableType::Sink {
commit_mode,
key_field,
timestamp_field,
} => Ok(OperatorNode::from_operator(Box::new(KafkaSinkFunc {
bootstrap_servers: profile.bootstrap_servers.to_string(),
producer: None,
consistency_mode: (*commit_mode).into(),
timestamp_field: timestamp_field.clone(),
timestamp_col: None,
key_field: key_field.clone(),
key_col: None,
write_futures: vec![],
client_config: client_configs(&profile, &table),
topic: table.topic,
serializer: ArrowSerializer::new(
config.format.expect("Format must be defined for KafkaSink"),
),
}))),
}
}
}
Expand Down
105 changes: 96 additions & 9 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,30 @@ use rdkafka::util::Timeout;

use rdkafka::ClientConfig;

use arrow::array::RecordBatch;
use super::SinkCommitMode;
use arrow::array::{Array, AsArray, RecordBatch};
use arrow::datatypes::{DataType, TimeUnit};
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_types::CheckpointBarrier;
use async_trait::async_trait;
use prost::Message;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use std::time::{Duration, SystemTime};

use super::SinkCommitMode;

#[cfg(test)]
mod test;

pub struct KafkaSinkFunc {
pub topic: String,
pub bootstrap_servers: String,
pub consistency_mode: ConsistencyMode,
pub timestamp_field: Option<String>,
pub timestamp_col: Option<usize>,
pub key_field: Option<String>,
pub key_col: Option<usize>,
pub producer: Option<FutureProducer>,
pub write_futures: Vec<DeliveryFuture>,
pub client_config: HashMap<String, String>,
Expand Down Expand Up @@ -62,6 +67,54 @@ impl KafkaSinkFunc {
matches!(self.consistency_mode, ConsistencyMode::ExactlyOnce { .. })
}

fn set_timestamp_col(&mut self, schema: &ArroyoSchema) {
if let Some(f) = &self.timestamp_field {
if let Ok(f) = schema.schema.field_with_name(f) {
match f.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
self.timestamp_col = Some(schema.schema.index_of(f.name()).unwrap());
return;
}
_ => {
warn!(
"Kafka sink configured with timestamp_field '{f}', but it has type \
{}, not TIMESTAMP... ignoring",
f.data_type()
);
}
}
} else {
warn!(
"Kafka sink configured with timestamp_field '{f}', but that \
does not appear in the schema... ignoring"
);
}
}

self.timestamp_col = Some(schema.timestamp_index);
}

fn set_key_col(&mut self, schema: &ArroyoSchema) {
if let Some(f) = &self.key_field {
if let Ok(f) = schema.schema.field_with_name(f) {
if matches!(f.data_type(), DataType::Utf8) {
self.key_col = Some(schema.schema.index_of(f.name()).unwrap());
} else {
warn!(
"Kafka sink configured with key_field '{f}', but it has type \
{}, not TEXT... ignoring",
f.data_type()
);
}
} else {
warn!(
"Kafka sink configured with key_field '{f}', but that \
does not appear in the schema... ignoring"
);
}
}
}

fn init_producer(&mut self, task_info: &TaskInfo) -> Result<()> {
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", &self.bootstrap_servers);
Expand Down Expand Up @@ -117,13 +170,23 @@ impl KafkaSinkFunc {
}
}

async fn publish(&mut self, k: Option<Vec<u8>>, v: Vec<u8>, ctx: &mut ArrowContext) {
async fn publish(
&mut self,
ts: Option<i64>,
k: Option<Vec<u8>>,
v: Vec<u8>,
ctx: &mut ArrowContext,
) {
let mut rec = {
let mut rec = FutureRecord::<Vec<u8>, Vec<u8>>::to(&self.topic);
if let Some(ts) = ts {
rec = rec.timestamp(ts);
}
if let Some(k) = k.as_ref() {
FutureRecord::to(&self.topic).key(k).payload(&v)
} else {
FutureRecord::to(&self.topic).payload(&v)
rec = rec.key(&k);
}

rec.payload(&v)
};

loop {
Expand Down Expand Up @@ -176,15 +239,39 @@ impl ArrowOperator for KafkaSinkFunc {
}

async fn on_start(&mut self, ctx: &mut ArrowContext) {
self.set_timestamp_col(&ctx.in_schemas[0]);
self.set_key_col(&ctx.in_schemas[0]);

self.init_producer(&ctx.task_info)
.expect("Producer creation failed");
}

async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut ArrowContext) {
let values = self.serializer.serialize(&batch);
let timestamps = batch
.column(
self.timestamp_col
.expect("timestamp column not initialized!"),
)
.as_any()
.downcast_ref::<arrow::array::TimestampNanosecondArray>();

let keys = self.key_col.map(|i| batch.column(i).as_string::<i32>());

for v in values {
self.publish(None, v, ctx).await;
for (i, v) in values.enumerate() {
// kafka timestamp as unix millis
let timestamp = if let Some(ts) = timestamps {
Some(if ts.is_null(i) {
0
} else {
ts.value(i) / 1_000_000
})
} else {
None
};
// TODO: this copy should be unnecessary but likely needs a custom trait impl
let key = keys.map(|k| k.value(i).as_bytes().to_vec());
self.publish(timestamp, key, v, ctx).await;
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/arroyo-connectors/src/kafka/sink/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ impl KafkaTopicTester {
bootstrap_servers: self.server.to_string(),
producer: None,
consistency_mode: ConsistencyMode::AtLeastOnce,
timestamp_field: None,
timestamp_col: None,
key_field: None,
write_futures: vec![],
client_config: HashMap::new(),
serializer: ArrowSerializer::new(Format::Json(JsonFormat::default())),
key_col: None,
};

let (_, control_rx) = channel(128);
Expand Down
10 changes: 10 additions & 0 deletions crates/arroyo-connectors/src/kafka/table.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@
"at_least_once",
"exactly_once"
]
},
"key_field": {
"type": "string",
"title": "key field",
"description": "Field to use to set the key of the message written to Kafka"
},
"timestamp_field": {
"type": "string",
"title": "timestamp field",
"description": "Field to use to set the timestamp of the message written to Kafka; defaults to the event time"
}
},
"additionalProperties": false,
Expand Down
5 changes: 2 additions & 3 deletions crates/arroyo-state/src/tables/table_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::any::Any;

use std::{collections::HashMap, sync::Arc, time::SystemTime};

use anyhow::{anyhow, bail, Context, Result};
use anyhow::{anyhow, bail, Result};
use arroyo_rpc::CompactionResult;
use arroyo_rpc::{
grpc::rpc::{
Expand All @@ -11,14 +11,13 @@ use arroyo_rpc::{
},
CheckpointCompleted, ControlResp,
};
use arroyo_storage::{StorageProvider, StorageProviderRef};
use arroyo_storage::StorageProviderRef;
use arroyo_types::{to_micros, CheckpointBarrier, Data, Key, TaskInfoRef};
use tokio::sync::{
mpsc::{self, Receiver, Sender},
oneshot,
};

use arroyo_rpc::config::config;
use tracing::{debug, error, info, warn};

use crate::{get_storage_provider, tables::global_keyed_map::GlobalKeyedTable, StateMessage};
Expand Down

0 comments on commit c076bfe

Please sign in to comment.