Skip to content

Commit

Permalink
Chore(deps): bump kafka-protocol to 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
iamazy committed Mar 21, 2024
1 parent 4aaf42b commit 1c47666
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 82 deletions.
20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "kafkas"
version = "0.1.0"
edition = "2021"
authors = ["iamazy <iamazy.me@outlook.com>"]
keywords = ["kafka", "message queue", "async", "tokio", "async-std"]
keywords = ["kafka", "async", "tokio", "async-std"]
license-file = "LICENSE"
readme = "README.md"
repository = "https://github.com/iamazy/kafkas"
Expand All @@ -12,10 +12,10 @@ description = "async kafka client for rust"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
asynchronous-codec = { version = "0.6", optional = true }
async-io = { version = "1", optional = true}
async-native-tls = { version = "0.4", optional = true }
async-recursion = "1.0.0"
asynchronous-codec = { version = "0.7", optional = true }
async-io = { version = "2", optional = true }
async-native-tls = { version = "0.5", optional = true }
async-recursion = "1"
async-std = { version = "1", features = ["attributes", "unstable"], optional = true }
async-stream = "0.3"
bit-vec = "0.6"
Expand All @@ -25,19 +25,19 @@ dashmap = "5"
fnv = "1"
futures = "0.3"
fxhash = "0.2"
indexmap = "1"
kafka-protocol = { git = "https://github.com/iamazy/kafka-protocol-rs", rev = "d8a289bbdebd71f89d52838810303902a7368773"}
indexmap = "2"
kafka-protocol = "0.10"
native-tls = "0.2"
pin-project-lite = "0.2"
rand = "0.8"
regex = "1.1.7"
regex = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"], optional = true }
tokio-util = { version = "0.7", features = ["codec"], optional = true }
tokio-native-tls = { version = "0.3", optional = true }
tracing = "0.1"
url = "2.1"
uuid = "1.3"
url = "2"
uuid = "1"

[dev-dependencies]
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly
stable
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# cargo +nightly fmt
comment_width = 100
edition = "2021"
format_code_in_doc_comments = true
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/fetch_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl FetchRequestDataBuilder {

let mut session_remove = Vec::new();
for (tp, prev_data) in session.session_partitions.iter_mut() {
match self.next.remove(tp) {
match self.next.swap_remove(tp) {
Some(next_data) => {
// We basically check if the new partition had the same topic ID. If not,
// we add it to the "replaced" set. If the request is version 13 or higher, the
Expand Down Expand Up @@ -470,7 +470,7 @@ impl FetchRequestDataBuilder {
}

for tp in session_remove.iter() {
session.session_partitions.remove(tp);
session.session_partitions.swap_remove(tp);
}

// Add any new partitions to the session.
Expand Down
19 changes: 13 additions & 6 deletions src/consumer/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ impl<Exe: Executor> Fetcher<Exe> {
debug!(
"Handing v0 ListOffsetResponse response for [{} - {}]. Fetched \
offset {offset}",
&topic.name.0, partition
topic.name.as_str(),
partition
);
if offset != UNKNOWN_OFFSET {
let tp = TopicPartition {
Expand All @@ -488,7 +489,7 @@ impl<Exe: Executor> Fetcher<Exe> {
debug!(
"Handling ListOffsetResponse response for [{} - {}], Fetched \
offset {}, timestamp {}",
topic.name.0,
topic.name.as_str(),
partition,
partition_response.offset,
partition_response.timestamp
Expand Down Expand Up @@ -525,7 +526,8 @@ impl<Exe: Executor> Fetcher<Exe> {
debug!(
"Cannot search by timestamp for [{} - {}] because the message format \
version is before 0.10.0",
topic.name.0, partition
topic.name.as_str(),
partition
);
break;
}
Expand All @@ -540,7 +542,9 @@ impl<Exe: Executor> Fetcher<Exe> {
) => {
debug!(
"Attempt to fetch offsets for [{} - {}] failed due to {}, retrying.",
topic.name.0, partition, error
topic.name.as_str(),
partition,
error
);
let tp = TopicPartition {
topic: topic.name.clone(),
Expand All @@ -552,7 +556,8 @@ impl<Exe: Executor> Fetcher<Exe> {
warn!(
"Received unknown topic or partition error in ListOffset request for \
partition [{} - {}]",
topic.name.0, partition
topic.name.as_str(),
partition
);
let tp = TopicPartition {
topic: topic.name.clone(),
Expand All @@ -567,7 +572,9 @@ impl<Exe: Executor> Fetcher<Exe> {
warn!(
"Attempt to fetch offsets for [{} - {}] failed due to unexpected \
exception: {}, retrying.",
topic.name.0, partition, error
topic.name.as_str(),
partition,
error
);
let tp = TopicPartition {
topic: topic.name.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl<Exe: Executor> Consumer<Exe> {

pub async fn unsubscribe(&mut self) -> Result<()> {
self.coordinator
.maybe_leave_group(StrBytes::from_str(
.maybe_leave_group(StrBytes::from_static_str(
"the consumer unsubscribed from all topics",
))
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/partition_assignor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub trait PartitionAssigner {
} else {
debug!(
"skipping assignment for topic {} since no metadata is available",
topic.0
topic.as_str()
);
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/consumer/subscription_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl SubscriptionState {
OffsetMetadata {
committed_offset: tp_state.position.offset,
committed_leader_epoch: tp_state.position.current_leader.epoch,
metadata: Some(StrBytes::from_str("")),
metadata: Some(StrBytes::default()),
},
);
}
Expand Down Expand Up @@ -150,25 +150,30 @@ impl SubscriptionState {
if !matches!(partition_state.fetch_state, FetchState::AwaitReset) {
debug!(
"Skipping reset of [{} - {}] since it is no longer needed",
partition.topic.0, partition.partition
partition.topic.as_str(),
partition.partition
);
} else if partition_state.offset_strategy != offset_strategy {
debug!(
"Skipping reset of topic [{} - {}] since an alternative reset has been \
requested",
partition.topic.0, partition.partition
partition.topic.as_str(),
partition.partition
);
} else {
info!(
"Resetting offset for topic [{} - {}] to position {}.",
partition.topic.0, partition.partition, position.offset
partition.topic.as_str(),
partition.partition,
position.offset
);
partition_state.seek_unvalidated(position)?;
}
} else {
debug!(
"Skipping reset of [{} - {}] since it is no longer assigned",
partition.topic.0, partition.partition
partition.topic.as_str(),
partition.partition
);
}

Expand Down
56 changes: 31 additions & 25 deletions src/coordinator/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
let node = find_coordinator(&client, group_id.clone(), CoordinatorType::Group).await?;
info!(
"Find coordinator success, group {}, node: {}",
group_id,
group_id.as_str(),
node.address()
);

Expand Down Expand Up @@ -399,7 +399,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
.await?;
for group in describe_groups_response.groups {
if group.error_code.is_err() {
error!("Describe group [{}] failed", group.group_id.0);
error!("Describe group [{}] failed", group.group_id.as_str());
}
}
Ok(())
Expand All @@ -424,7 +424,8 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
warn!(
"Join group with unknown member id, will rejoin group [{}] with \
member id: {}",
self.group_meta.group_id.0, self.group_meta.member_id
self.group_meta.group_id.as_str(),
self.group_meta.member_id.as_str()
);
self.join_group().await
}
Expand All @@ -443,9 +444,9 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
info!(
"Join group [{}] success, leader = {}, member_id = {}, generation_id \
= {}",
self.group_meta.group_id.0,
self.group_meta.leader,
self.group_meta.member_id,
self.group_meta.group_id.as_str(),
self.group_meta.leader.as_str(),
self.group_meta.member_id.as_str(),
self.group_meta.generation_id
);
Ok(())
Expand All @@ -460,9 +461,10 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
match self.client.version_range(ApiKey::LeaveGroupKey) {
Some(version_range) => {
debug!(
"Member {} send LeaveGroup request to coordinator {} due to {reason}",
self.group_meta.member_id,
"Member {} send LeaveGroup request to coordinator {} due to {}",
self.group_meta.member_id.as_str(),
self.node.address(),
reason.as_str()
);

let leave_group_request = self.leave_group_builder(version_range.max, reason)?;
Expand All @@ -478,25 +480,28 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
if member.error_code.is_ok() {
debug!(
"Member {} leave group {} success.",
member.member_id, self.group_meta.group_id.0
member.member_id.as_str(),
self.group_meta.group_id.as_str()
);
} else {
error!(
"Member {} leave group {} failed.",
member.member_id, self.group_meta.group_id.0
member.member_id.as_str(),
self.group_meta.group_id.as_str()
);
}
}
info!(
"Leave group [{}] success, member: {}",
self.group_meta.group_id.0, self.group_meta.member_id
self.group_meta.group_id.as_str(),
self.group_meta.member_id.as_str()
);
Ok(())
}
Some(error) => {
error!(
"Leave group [{}] failed, error: {error}",
self.group_meta.group_id.0
self.group_meta.group_id.as_str()
);
Err(error.into())
}
Expand Down Expand Up @@ -528,8 +533,8 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
error!(
"JoinGroup failed: Inconsistent Protocol Type, received {} but \
expected {}",
sync_group_response.protocol_type.unwrap(),
self.group_meta.protocol_type.as_ref().unwrap()
sync_group_response.protocol_type.unwrap().as_str(),
self.group_meta.protocol_type.as_ref().unwrap().as_str()
);
return Err(ResponseError::InconsistentGroupProtocol.into());
}
Expand Down Expand Up @@ -560,12 +565,12 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
info!(
"Sync group [{}] success, leader = {}, member_id = {}, generation_id \
= {}, protocol_type = {}, protocol_name = {}, assignments = <{}>",
self.group_meta.group_id.0,
self.group_meta.leader,
self.group_meta.member_id,
self.group_meta.group_id.as_str(),
self.group_meta.leader.as_str(),
self.group_meta.member_id.as_str(),
self.group_meta.generation_id,
self.group_meta.protocol_type.as_ref().unwrap(),
self.group_meta.protocol_name.as_ref().unwrap(),
self.group_meta.protocol_type.as_ref().unwrap().as_str(),
self.group_meta.protocol_name.as_ref().unwrap().as_str(),
crate::array_display(self.subscriptions.assignments.keys()),
);
Ok(())
Expand Down Expand Up @@ -643,7 +648,8 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
None => {
debug!(
"Heartbeat success, group: {}, member: {}",
self.group_meta.group_id.0, self.group_meta.member_id
self.group_meta.group_id.as_str(),
self.group_meta.member_id.as_str()
);
Ok(())
}
Expand All @@ -663,7 +669,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
// this case and ignore the REBALANCE_IN_PROGRESS error
warn!(
"Group [{}] is rebalance in progress.",
self.group_meta.group_id.0
self.group_meta.group_id.as_str()
);
if matches!(self.state, MemberState::Stable) {
self.rejoin_group().await?;
Expand Down Expand Up @@ -780,7 +786,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
let mut request = JoinGroupRequest::default();
request.group_id = self.group_meta.group_id.clone();
request.member_id = self.group_meta.member_id.clone();
request.protocol_type = StrBytes::from_str(CONSUMER_PROTOCOL_TYPE);
request.protocol_type = StrBytes::from_static_str(CONSUMER_PROTOCOL_TYPE);
request.protocols = self.join_group_protocol()?;
request.session_timeout_ms = self.consumer_options.rebalance_options.session_timeout_ms;
if version >= 1 {
Expand Down Expand Up @@ -830,7 +836,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
None => {
return Err(Error::Custom(format!(
"Group leader {} has no partition assignor protocol",
self.group_meta.leader
self.group_meta.leader.as_str()
)));
}
}
Expand Down Expand Up @@ -945,11 +951,11 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
}
} else {
generation = DEFAULT_GENERATION_ID;
member = StrBytes::from_str("");
member = StrBytes::default();
}

request.group_id = self.group_meta.group_id.clone();
request.generation_id = generation;
request.generation_id_or_member_epoch = generation;
request.member_id = member;
request.group_instance_id = self.group_meta.group_instance_id.clone();
request.retention_time_ms = -1;
Expand Down
Loading

0 comments on commit 1c47666

Please sign in to comment.