From 1c476668ada38bbab9121a6bfc48e6340985516f Mon Sep 17 00:00:00 2001 From: iamazy Date: Thu, 21 Mar 2024 18:25:38 +0800 Subject: [PATCH] Chore(deps): bump `kafka-protocol` to 0.10 --- Cargo.toml | 20 +++++------ rust-toolchain | 2 +- rustfmt.toml | 1 + src/consumer/fetch_session.rs | 4 +-- src/consumer/fetcher.rs | 19 ++++++---- src/consumer/mod.rs | 2 +- src/consumer/partition_assignor.rs | 2 +- src/consumer/subscription_state.rs | 15 +++++--- src/coordinator/consumer.rs | 56 +++++++++++++++++------------- src/error.rs | 24 +++++-------- src/lib.rs | 2 +- src/metadata.rs | 11 ++++-- src/producer/mod.rs | 9 +++-- src/protocol.rs | 39 ++++++++++++++++----- 14 files changed, 124 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cf9b225..05264fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "kafkas" version = "0.1.0" edition = "2021" authors = ["iamazy "] -keywords = ["kafka", "message queue", "async", "tokio", "async-std"] +keywords = ["kafka", "async", "tokio", "async-std"] license-file = "LICENSE" readme = "README.md" repository = "https://github.com/iamazy/kafkas" @@ -12,10 +12,10 @@ description = "async kafka client for rust" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -asynchronous-codec = { version = "0.6", optional = true } -async-io = { version = "1", optional = true} -async-native-tls = { version = "0.4", optional = true } -async-recursion = "1.0.0" +asynchronous-codec = { version = "0.7", optional = true } +async-io = { version = "2", optional = true } +async-native-tls = { version = "0.5", optional = true } +async-recursion = "1" async-std = { version = "1", features = ["attributes", "unstable"], optional = true } async-stream = "0.3" bit-vec = "0.6" @@ -25,19 +25,19 @@ dashmap = "5" fnv = "1" futures = "0.3" fxhash = "0.2" -indexmap = "1" -kafka-protocol = { git = "https://github.com/iamazy/kafka-protocol-rs", rev = "d8a289bbdebd71f89d52838810303902a7368773"} +indexmap = "2" +kafka-protocol = "0.10" native-tls = "0.2" pin-project-lite = "0.2" rand = "0.8" -regex = "1.1.7" +regex = "1" thiserror = "1" tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.7", features = ["codec"], optional = true } tokio-native-tls = { version = "0.3", optional = true } tracing = "0.1" -url = "2.1" -uuid = "1.3" +url = "2" +uuid = "1" [dev-dependencies] rand = "0.8" diff --git a/rust-toolchain b/rust-toolchain index 07ade69..870bbe4 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly \ No newline at end of file +stable \ No newline at end of file diff --git a/rustfmt.toml b/rustfmt.toml index 532b89b..148229d 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,4 @@ +# cargo +nightly fmt comment_width = 100 edition = "2021" format_code_in_doc_comments = true diff --git a/src/consumer/fetch_session.rs b/src/consumer/fetch_session.rs index 40cbe11..d007362 100644 --- a/src/consumer/fetch_session.rs +++ b/src/consumer/fetch_session.rs @@ -423,7 +423,7 @@ impl FetchRequestDataBuilder { let mut session_remove = Vec::new(); for (tp, prev_data) in session.session_partitions.iter_mut() { - match self.next.remove(tp) { + match self.next.swap_remove(tp) { Some(next_data) => { // We basically check if the new partition had the same topic ID. If not, // we add it to the "replaced" set. If the request is version 13 or higher, the @@ -470,7 +470,7 @@ impl FetchRequestDataBuilder { } for tp in session_remove.iter() { - session.session_partitions.remove(tp); + session.session_partitions.swap_remove(tp); } // Add any new partitions to the session. diff --git a/src/consumer/fetcher.rs b/src/consumer/fetcher.rs index 97161d8..c803f83 100644 --- a/src/consumer/fetcher.rs +++ b/src/consumer/fetcher.rs @@ -467,7 +467,8 @@ impl Fetcher { debug!( "Handing v0 ListOffsetResponse response for [{} - {}]. Fetched \ offset {offset}", - &topic.name.0, partition + topic.name.as_str(), + partition ); if offset != UNKNOWN_OFFSET { let tp = TopicPartition { @@ -488,7 +489,7 @@ impl Fetcher { debug!( "Handling ListOffsetResponse response for [{} - {}], Fetched \ offset {}, timestamp {}", - topic.name.0, + topic.name.as_str(), partition, partition_response.offset, partition_response.timestamp @@ -525,7 +526,8 @@ impl Fetcher { debug!( "Cannot search by timestamp for [{} - {}] because the message format \ version is before 0.10.0", - topic.name.0, partition + topic.name.as_str(), + partition ); break; } @@ -540,7 +542,9 @@ impl Fetcher { ) => { debug!( "Attempt to fetch offsets for [{} - {}] failed due to {}, retrying.", - topic.name.0, partition, error + topic.name.as_str(), + partition, + error ); let tp = TopicPartition { topic: topic.name.clone(), @@ -552,7 +556,8 @@ impl Fetcher { warn!( "Received unknown topic or partition error in ListOffset request for \ partition [{} - {}]", - topic.name.0, partition + topic.name.as_str(), + partition ); let tp = TopicPartition { topic: topic.name.clone(), @@ -567,7 +572,9 @@ impl Fetcher { warn!( "Attempt to fetch offsets for [{} - {}] failed due to unexpected \ exception: {}, retrying.", - topic.name.0, partition, error + topic.name.as_str(), + partition, + error ); let tp = TopicPartition { topic: topic.name.clone(), diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index c5e2cad..d38231d 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -340,7 +340,7 @@ impl Consumer { pub async fn unsubscribe(&mut self) -> Result<()> { self.coordinator - .maybe_leave_group(StrBytes::from_str( + .maybe_leave_group(StrBytes::from_static_str( "the consumer unsubscribed from all topics", )) .await?; diff --git a/src/consumer/partition_assignor.rs b/src/consumer/partition_assignor.rs index d197bdd..fdacbc4 100644 --- a/src/consumer/partition_assignor.rs +++ b/src/consumer/partition_assignor.rs @@ -61,7 +61,7 @@ pub trait PartitionAssigner { } else { debug!( "skipping assignment for topic {} since no metadata is available", - topic.0 + topic.as_str() ); } } diff --git a/src/consumer/subscription_state.rs b/src/consumer/subscription_state.rs index f89e09a..aa8c815 100644 --- a/src/consumer/subscription_state.rs +++ b/src/consumer/subscription_state.rs @@ -40,7 +40,7 @@ impl SubscriptionState { OffsetMetadata { committed_offset: tp_state.position.offset, committed_leader_epoch: tp_state.position.current_leader.epoch, - metadata: Some(StrBytes::from_str("")), + metadata: Some(StrBytes::default()), }, ); } @@ -150,25 +150,30 @@ impl SubscriptionState { if !matches!(partition_state.fetch_state, FetchState::AwaitReset) { debug!( "Skipping reset of [{} - {}] since it is no longer needed", - partition.topic.0, partition.partition + partition.topic.as_str(), + partition.partition ); } else if partition_state.offset_strategy != offset_strategy { debug!( "Skipping reset of topic [{} - {}] since an alternative reset has been \ requested", - partition.topic.0, partition.partition + partition.topic.as_str(), + partition.partition ); } else { info!( "Resetting offset for topic [{} - {}] to position {}.", - partition.topic.0, partition.partition, position.offset + partition.topic.as_str(), + partition.partition, + position.offset ); partition_state.seek_unvalidated(position)?; } } else { debug!( "Skipping reset of [{} - {}] since it is no longer assigned", - partition.topic.0, partition.partition + partition.topic.as_str(), + partition.partition ); } diff --git a/src/coordinator/consumer.rs b/src/coordinator/consumer.rs index 3a69fda..7c69efa 100644 --- a/src/coordinator/consumer.rs +++ b/src/coordinator/consumer.rs @@ -331,7 +331,7 @@ impl CoordinatorInner { let node = find_coordinator(&client, group_id.clone(), CoordinatorType::Group).await?; info!( "Find coordinator success, group {}, node: {}", - group_id, + group_id.as_str(), node.address() ); @@ -399,7 +399,7 @@ impl CoordinatorInner { .await?; for group in describe_groups_response.groups { if group.error_code.is_err() { - error!("Describe group [{}] failed", group.group_id.0); + error!("Describe group [{}] failed", group.group_id.as_str()); } } Ok(()) @@ -424,7 +424,8 @@ impl CoordinatorInner { warn!( "Join group with unknown member id, will rejoin group [{}] with \ member id: {}", - self.group_meta.group_id.0, self.group_meta.member_id + self.group_meta.group_id.as_str(), + self.group_meta.member_id.as_str() ); self.join_group().await } @@ -443,9 +444,9 @@ impl CoordinatorInner { info!( "Join group [{}] success, leader = {}, member_id = {}, generation_id \ = {}", - self.group_meta.group_id.0, - self.group_meta.leader, - self.group_meta.member_id, + self.group_meta.group_id.as_str(), + self.group_meta.leader.as_str(), + self.group_meta.member_id.as_str(), self.group_meta.generation_id ); Ok(()) @@ -460,9 +461,10 @@ impl CoordinatorInner { match self.client.version_range(ApiKey::LeaveGroupKey) { Some(version_range) => { debug!( - "Member {} send LeaveGroup request to coordinator {} due to {reason}", - self.group_meta.member_id, + "Member {} send LeaveGroup request to coordinator {} due to {}", + self.group_meta.member_id.as_str(), self.node.address(), + reason.as_str() ); let leave_group_request = self.leave_group_builder(version_range.max, reason)?; @@ -478,25 +480,28 @@ impl CoordinatorInner { if member.error_code.is_ok() { debug!( "Member {} leave group {} success.", - member.member_id, self.group_meta.group_id.0 + member.member_id.as_str(), + self.group_meta.group_id.as_str() ); } else { error!( "Member {} leave group {} failed.", - member.member_id, self.group_meta.group_id.0 + member.member_id.as_str(), + self.group_meta.group_id.as_str() ); } } info!( "Leave group [{}] success, member: {}", - self.group_meta.group_id.0, self.group_meta.member_id + self.group_meta.group_id.as_str(), + self.group_meta.member_id.as_str() ); Ok(()) } Some(error) => { error!( "Leave group [{}] failed, error: {error}", - self.group_meta.group_id.0 + self.group_meta.group_id.as_str() ); Err(error.into()) } @@ -528,8 +533,8 @@ impl CoordinatorInner { error!( "JoinGroup failed: Inconsistent Protocol Type, received {} but \ expected {}", - sync_group_response.protocol_type.unwrap(), - self.group_meta.protocol_type.as_ref().unwrap() + sync_group_response.protocol_type.unwrap().as_str(), + self.group_meta.protocol_type.as_ref().unwrap().as_str() ); return Err(ResponseError::InconsistentGroupProtocol.into()); } @@ -560,12 +565,12 @@ impl CoordinatorInner { info!( "Sync group [{}] success, leader = {}, member_id = {}, generation_id \ = {}, protocol_type = {}, protocol_name = {}, assignments = <{}>", - self.group_meta.group_id.0, - self.group_meta.leader, - self.group_meta.member_id, + self.group_meta.group_id.as_str(), + self.group_meta.leader.as_str(), + self.group_meta.member_id.as_str(), self.group_meta.generation_id, - self.group_meta.protocol_type.as_ref().unwrap(), - self.group_meta.protocol_name.as_ref().unwrap(), + self.group_meta.protocol_type.as_ref().unwrap().as_str(), + self.group_meta.protocol_name.as_ref().unwrap().as_str(), crate::array_display(self.subscriptions.assignments.keys()), ); Ok(()) @@ -643,7 +648,8 @@ impl CoordinatorInner { None => { debug!( "Heartbeat success, group: {}, member: {}", - self.group_meta.group_id.0, self.group_meta.member_id + self.group_meta.group_id.as_str(), + self.group_meta.member_id.as_str() ); Ok(()) } @@ -663,7 +669,7 @@ impl CoordinatorInner { // this case and ignore the REBALANCE_IN_PROGRESS error warn!( "Group [{}] is rebalance in progress.", - self.group_meta.group_id.0 + self.group_meta.group_id.as_str() ); if matches!(self.state, MemberState::Stable) { self.rejoin_group().await?; @@ -780,7 +786,7 @@ impl CoordinatorInner { let mut request = JoinGroupRequest::default(); request.group_id = self.group_meta.group_id.clone(); request.member_id = self.group_meta.member_id.clone(); - request.protocol_type = StrBytes::from_str(CONSUMER_PROTOCOL_TYPE); + request.protocol_type = StrBytes::from_static_str(CONSUMER_PROTOCOL_TYPE); request.protocols = self.join_group_protocol()?; request.session_timeout_ms = self.consumer_options.rebalance_options.session_timeout_ms; if version >= 1 { @@ -830,7 +836,7 @@ impl CoordinatorInner { None => { return Err(Error::Custom(format!( "Group leader {} has no partition assignor protocol", - self.group_meta.leader + self.group_meta.leader.as_str() ))); } } @@ -945,11 +951,11 @@ impl CoordinatorInner { } } else { generation = DEFAULT_GENERATION_ID; - member = StrBytes::from_str(""); + member = StrBytes::default(); } request.group_id = self.group_meta.group_id.clone(); - request.generation_id = generation; + request.generation_id_or_member_epoch = generation; request.member_id = member; request.group_instance_id = self.group_meta.group_instance_id.clone(); request.retention_time_ms = -1; diff --git a/src/error.rs b/src/error.rs index 8bad68b..c4f6df7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,7 +12,7 @@ use futures::channel::{ }; use kafka_protocol::{ messages::{ApiKey, TopicName}, - protocol::{buf::NotEnoughBytesError, DecodeError, EncodeError}, + protocol::{buf::NotEnoughBytesError, EncodeError}, records::Record, ResponseError, }; @@ -108,12 +108,6 @@ impl From for Error { } } -impl From for Error { - fn from(value: DecodeError) -> Self { - Error::Connection(ConnectionError::Decoding(value.to_string())) - } -} - impl From> for Error { fn from(value: TrySendError) -> Self { Error::Custom(format!("{value}")) @@ -141,16 +135,20 @@ impl std::fmt::Display for Error { Error::Produce(e) => write!(f, "Produce error: {e}"), Error::Consume(e) => write!(f, "Consume error: {e}"), Error::PartitionNotAvailable { topic, partition } => { - write!(f, "Partition {partition} not available, topic: {}", topic.0) + write!( + f, + "Partition {partition} not available, topic: {}", + topic.as_str() + ) } Error::TopicNotAvailable { topic } => { - write!(f, "Topic not available, topic: {}", topic.0) + write!(f, "Topic not available, topic: {}", topic.as_str()) } Error::TopicAuthorizationError { topics } => { write!( f, "Topic Authorization Error, topics: <{}>", - array_display(topics.iter().map(|topic| &**topic)) + array_display(topics.iter().map(|topic| topic.as_str())) ) } Error::NodeNotAvailable { node } => { @@ -188,12 +186,6 @@ impl From for ConnectionError { } } -impl From for ConnectionError { - fn from(value: DecodeError) -> Self { - ConnectionError::Decoding(value.to_string()) - } -} - impl From for ConnectionError { fn from(e: std::io::Error) -> Self { ConnectionError::Io(e) diff --git a/src/lib.rs b/src/lib.rs index 4bfb662..8ac0e0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,7 @@ pub trait ToStrBytes { impl ToStrBytes for String { fn to_str_bytes(self) -> StrBytes { - unsafe { StrBytes::from_utf8_unchecked(Bytes::from(self)) } + StrBytes::from_string(self) } } diff --git a/src/metadata.rs b/src/metadata.rs index 9c9137f..80fd713 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -132,7 +132,12 @@ impl Debug for TopicPartition { impl Display for TopicPartition { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "partition [{} - {}]", self.topic.0, self.partition) + write!( + f, + "partition [{} - {}]", + self.topic.as_str(), + self.partition + ) } } @@ -146,7 +151,7 @@ impl Node { pub fn new(id: BrokerId, host: StrBytes, port: i32) -> Self { Self { id: id.0, - address: format!("{host}:{port}"), + address: format!("{}:{port}", host.as_str()), } } @@ -180,7 +185,7 @@ impl Cluster { let cluster_id = other.cluster_id; { let mut lock = self.id.lock()?; - if matches!(*lock, None) { + if lock.is_none() { *lock = cluster_id; } else if *lock != cluster_id { return Err(Error::Custom(format!( diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 425f0f5..b292e13 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -272,7 +272,10 @@ impl Producer { } } Err(err) => { - error!("failed to push record, topic: {}, err: {err}", topic.0,); + error!( + "failed to push record, topic: {}, err: {err}", + topic.as_str(), + ); Err(err) } }; @@ -348,7 +351,9 @@ impl Producer { { error!( "failed to flush topic produce data, topic: [{} - {}], error: {}", - partition.topic.0, partition.partition, e + partition.topic.as_str(), + partition.partition, + e ); } } diff --git a/src/protocol.rs b/src/protocol.rs index 637ab06..9971d08 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -56,10 +56,10 @@ impl tokio_util::codec::Decoder for KafkaCodec { #[cfg(feature = "async-std-runtime")] impl asynchronous_codec::Encoder for KafkaCodec { - type Item = Command; + type Item<'a> = Command; type Error = ConnectionError; - fn encode(&mut self, cmd: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, cmd: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> { let mut bytes = BytesMut::new(); self.encode0(cmd, &mut bytes)?; self.length_codec @@ -184,8 +184,7 @@ impl KafkaCodec { header.request_api_key = api_key; header.request_api_version = api_version; - let header_version = Req::header_version(api_version); - header.encode(dst, header_version)?; + header.encode(dst, Req::header_version(api_version))?; self.active_requests.insert(header.correlation_id, header); @@ -213,16 +212,14 @@ impl KafkaCodec { ConnectionError::UnexpectedResponse(format!("correlation_id: {correlation_id}")) })?; - let response_header_version = self.response_header_version( - request_header.request_api_key, - request_header.request_api_version, - ); + let api_key = ApiKey::try_from(request_header.request_api_key)?; + let response_header_version = + api_key.response_header_version(request_header.request_api_version); // decode response header let response_header = ResponseHeader::decode(src, response_header_version)?; let header_version = request_header.request_api_version; - let api_key = ApiKey::try_from(request_header.request_api_key)?; let response_kind = match api_key { ApiKey::ProduceKey => { let res = ProduceResponse::decode(src, header_version)?; @@ -505,6 +502,30 @@ impl KafkaCodec { let res = AllocateProducerIdsResponse::decode(src, header_version)?; ResponseKind::AllocateProducerIdsResponse(res) } + ApiKey::ConsumerGroupHeartbeatKey => { + let res = ConsumerGroupHeartbeatResponse::decode(src, header_version)?; + ResponseKind::ConsumerGroupHeartbeatResponse(res) + } + ApiKey::ControllerRegistrationKey => { + let res = ControllerRegistrationResponse::decode(src, header_version)?; + ResponseKind::ControllerRegistrationResponse(res) + } + ApiKey::GetTelemetrySubscriptionsKey => { + let res = GetTelemetrySubscriptionsResponse::decode(src, header_version)?; + ResponseKind::GetTelemetrySubscriptionsResponse(res) + } + ApiKey::PushTelemetryKey => { + let res = PushTelemetryResponse::decode(src, header_version)?; + ResponseKind::PushTelemetryResponse(res) + } + ApiKey::AssignReplicasToDirsKey => { + let res = AssignReplicasToDirsResponse::decode(src, header_version)?; + ResponseKind::AssignReplicasToDirsResponse(res) + } + ApiKey::ListClientMetricsResourcesKey => { + let res = ListClientMetricsResourcesResponse::decode(src, header_version)?; + ResponseKind::ListClientMetricsResourcesResponse(res) + } }; let response = KafkaResponse::new(response_header, response_kind); Ok(Some(Command::Response(response)))