From 8762e9da1d946315ac07346953586ba09088d66a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 17 Sep 2024 13:47:07 +0200 Subject: [PATCH] Feedback --- crates/ingress-kafka/src/consumer_task.rs | 33 +++++++++++++---------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index ba51a77167..f12ff48a75 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -8,7 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions::KAFKA_INGRESS_REQUESTS; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + use base64::Engine; use bytes::Bytes; use metrics::counter; @@ -18,6 +22,10 @@ use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::message::BorrowedMessage; use rdkafka::{ClientConfig, Message}; +use tokio::sync::oneshot; +use tracing::{debug, info, info_span, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + use restate_core::{cancellation_watcher, task_center, TaskId, TaskKind}; use restate_ingress_dispatcher::{ DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest, @@ -25,13 +33,8 @@ use restate_ingress_dispatcher::{ use restate_types::invocation::{Header, SpanRelation}; use restate_types::message::MessageIndex; use restate_types::schema::subscriptions::{EventReceiverServiceType, Sink, Subscription}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::fmt; -use std::sync::Arc; -use tokio::sync::oneshot; -use tracing::{debug, info, info_span, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::metric_definitions::KAFKA_INGRESS_REQUESTS; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -86,15 +89,21 @@ impl DeduplicationId for KafkaDeduplicationId { #[derive(Clone)] pub struct MessageSender { - subscription_id: String, subscription: Subscription, dispatcher: IngressDispatcher, + + subscription_id: String, + ingress_request_counter: metrics::Counter, } impl MessageSender { pub fn new(subscription: Subscription, dispatcher: IngressDispatcher) -> Self { Self { subscription_id: subscription.id().to_string(), + ingress_request_counter: counter!( + KAFKA_INGRESS_REQUESTS, + "subscription" => subscription.id().to_string() + ), subscription, dispatcher, } @@ -140,11 +149,7 @@ impl MessageSender { cause, })?; - counter!( - KAFKA_INGRESS_REQUESTS, - "subscription" => self.subscription_id.clone() - ) - .increment(1); + self.ingress_request_counter.increment(1); self.dispatcher .dispatch_ingress_request(req)