Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka perf improvements #1964

Merged
merged 6 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/ingress-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
derive_builder = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
rdkafka = { version = "0.34", features = ["libz-static", "cmake-build"] }
rdkafka = { version = "0.35", features = ["libz-static", "cmake-build"] }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
thiserror = { workspace = true }
Expand Down
107 changes: 89 additions & 18 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use base64::Engine;
use bytes::Bytes;
use metrics::counter;
use opentelemetry::trace::TraceContextExt;
use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
Expand All @@ -21,14 +26,16 @@ use tokio::sync::oneshot;
use tracing::{debug, info, info_span, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use restate_core::{cancellation_watcher, task_center, TaskId, TaskKind};
use restate_ingress_dispatcher::{
DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest,
};
use restate_types::identifiers::SubscriptionId;
use restate_types::invocation::{Header, SpanRelation};
use restate_types::message::MessageIndex;
use restate_types::schema::subscriptions::{EventReceiverServiceType, Sink, Subscription};

use crate::metric_definitions::KAFKA_INGRESS_REQUESTS;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Expand All @@ -45,6 +52,8 @@ pub enum Error {
},
#[error("ingress dispatcher channel is closed")]
IngressDispatcherClosed,
#[error("topic {0} partition {1} queue split didn't succeed")]
TopicPartitionSplit(String, i32),
}

type MessageConsumer = StreamConsumer<DefaultConsumerContext>;
Expand Down Expand Up @@ -82,21 +91,25 @@ impl DeduplicationId for KafkaDeduplicationId {
pub struct MessageSender {
subscription: Subscription,
dispatcher: IngressDispatcher,

subscription_id: String,
ingress_request_counter: metrics::Counter,
}

impl MessageSender {
pub fn new(subscription: Subscription, dispatcher: IngressDispatcher) -> Self {
Self {
subscription_id: subscription.id().to_string(),
ingress_request_counter: counter!(
KAFKA_INGRESS_REQUESTS,
"subscription" => subscription.id().to_string()
),
subscription,
dispatcher,
}
}

async fn send(
&mut self,
consumer_group_id: &str,
msg: &BorrowedMessage<'_>,
) -> Result<(), Error> {
async fn send(&self, consumer_group_id: &str, msg: BorrowedMessage<'_>) -> Result<(), Error> {
// Prepare ingress span
let ingress_span = info_span!(
"kafka_ingress_consume",
Expand All @@ -119,14 +132,14 @@ impl MessageSender {
} else {
Bytes::default()
};
let headers = Self::generate_events_attributes(msg, self.subscription.id());
let headers = Self::generate_events_attributes(&msg, &self.subscription_id);

let req = IngressDispatcherRequest::event(
&self.subscription,
key,
payload,
SpanRelation::Parent(ingress_span_context),
Some(Self::generate_deduplication_id(consumer_group_id, msg)),
Some(Self::generate_deduplication_id(consumer_group_id, &msg)),
headers,
)
.map_err(|cause| Error::Event {
Expand All @@ -136,6 +149,8 @@ impl MessageSender {
cause,
})?;

self.ingress_request_counter.increment(1);

self.dispatcher
.dispatch_ingress_request(req)
.instrument(ingress_span)
Expand All @@ -144,10 +159,7 @@ impl MessageSender {
Ok(())
}

fn generate_events_attributes(
msg: &impl Message,
subscription_id: SubscriptionId,
) -> Vec<Header> {
fn generate_events_attributes(msg: &impl Message, subscription_id: &str) -> Vec<Header> {
let mut headers = Vec::with_capacity(6);
headers.push(Header::new("kafka.offset", msg.offset().to_string()));
headers.push(Header::new("kafka.topic", msg.topic()));
Expand All @@ -157,7 +169,7 @@ impl MessageSender {
}
headers.push(Header::new(
"restate.subscription.id".to_string(),
subscription_id.to_string(),
subscription_id,
));

if let Some(key) = msg.key() {
Expand Down Expand Up @@ -201,7 +213,7 @@ impl ConsumerTask {
}
}

pub async fn run(mut self, mut rx: oneshot::Receiver<()>) -> Result<(), Error> {
pub async fn run(self, mut rx: oneshot::Receiver<()>) -> Result<(), Error> {
// Create the consumer and subscribe to the topic
let consumer_group_id = self
.client_config
Expand All @@ -213,25 +225,84 @@ impl ConsumerTask {
self.topics, self.client_config
);

let consumer: MessageConsumer = self.client_config.create()?;
let consumer: Arc<MessageConsumer> = Arc::new(self.client_config.create()?);
let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect();
consumer.subscribe(&topics)?;

let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default();
let tc = task_center();
slinkydeveloper marked this conversation as resolved.
Show resolved Hide resolved

loop {
tokio::select! {
res = consumer.recv() => {
let msg = res?;
self.sender.send(&consumer_group_id, &msg).await?;
let topic = msg.topic().to_owned();
let partition = msg.partition();
let offset = msg.offset();

// If we didn't split the queue, let's do it and start the topic partition consumer
if let Entry::Vacant(e) = topic_partition_tasks.entry((topic.clone(), partition)) {
let topic_partition_consumer = consumer
.split_partition_queue(&topic, partition)
.ok_or_else(|| Error::TopicPartitionSplit(topic.clone(), partition))?;

let task = topic_partition_queue_consumption_loop(
self.sender.clone(),
topic.clone(), partition,
topic_partition_consumer,
Arc::clone(&consumer),
consumer_group_id.clone()
);

if let Ok(task_id) = tc.spawn_child(TaskKind::Ingress, "partition-queue", None, task) {
e.insert(task_id);
} else {
break;
}
}

// We got this message, let's send it through
self.sender.send(&consumer_group_id, msg).await?;

// This method tells rdkafka that we have processed this message,
// so its offset can be safely committed.
// rdkafka periodically commits these offsets asynchronously, with a period configurable
// with auto.commit.interval.ms
consumer.store_offset_from_message(&msg)?;
consumer.store_offset(&topic, partition, offset)?;
}
_ = &mut rx => {
return Ok(());
break;
}
}
}
for task_id in topic_partition_tasks.into_values() {
tc.cancel_task(task_id);
}
Ok(())
}
}

async fn topic_partition_queue_consumption_loop(
sender: MessageSender,
topic: String,
partition: i32,
topic_partition_consumer: StreamPartitionQueue<DefaultConsumerContext>,
consumer: Arc<MessageConsumer>,
consumer_group_id: String,
) -> Result<(), anyhow::Error> {
let mut shutdown = std::pin::pin!(cancellation_watcher());

loop {
tokio::select! {
res = topic_partition_consumer.recv() => {
let msg = res?;
let offset = msg.offset();
sender.send(&consumer_group_id, msg).await?;
consumer.store_offset(&topic, partition, offset)?;
}
_ = &mut shutdown => {
return Ok(())
}
}
}
}
1 change: 1 addition & 0 deletions crates/ingress-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

mod consumer_task;
mod metric_definitions;
mod subscription_controller;

use tokio::sync::mpsc;
Expand Down
21 changes: 21 additions & 0 deletions crates/ingress-kafka/src/metric_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use metrics::{describe_counter, Unit};

pub const KAFKA_INGRESS_REQUESTS: &str = "restate.kafka_ingress.requests.total";

pub(crate) fn describe_metrics() {
describe_counter!(
KAFKA_INGRESS_REQUESTS,
Unit::Count,
"Number of Kafka ingress requests"
);
}
1 change: 1 addition & 0 deletions crates/ingress-kafka/src/subscription_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct Service {

impl Service {
pub fn new(dispatcher: IngressDispatcher) -> Service {
metric_definitions::describe_metrics();
let (commands_tx, commands_rx) = mpsc::channel(10);

Service {
Expand Down
Loading