Skip to content

Commit

Permalink
use unbounded_send when sending pub/sub message from network handler …
Browse files Browse the repository at this point in the history
…to client

- improved logs
  • Loading branch information
mcatanzariti committed May 25, 2024
1 parent c77bf5f commit 527e6ec
Showing 1 changed file with 38 additions and 20 deletions.
58 changes: 38 additions & 20 deletions src/network/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ impl NetworkHandler {
}
Status::Subscribed => {
for command in &msg.commands {
if let "UNSUBSCRIBE" | "PUNSUBSCRIBE" | "SUNSUBSCRIBE" = command.name {
let subscription_type = match command.name {
"UNSUBSCRIBE" => SubscriptionType::Channel,
"PUNSUBSCRIBE" => SubscriptionType::Pattern,
"SUNSUBSCRIBE" => SubscriptionType::ShardChannel,
_ => unreachable!(),
};
let subscription_type = match command.name {
"UNSUBSCRIBE" => Some(SubscriptionType::Channel),
"PUNSUBSCRIBE" => Some(SubscriptionType::Pattern),
"SUNSUBSCRIBE" => Some(SubscriptionType::ShardChannel),
_ => None,
};
if let Some(subscription_type) = subscription_type {
self.pending_unsubscriptions.push_back(
command
.args
Expand Down Expand Up @@ -568,16 +568,34 @@ impl NetworkHandler {
| RefPubSubMessage::SMessage(channel_or_pattern, _) => {
match self.subscriptions.get_mut(channel_or_pattern) {
Some((_subscription_type, pub_sub_sender)) => {
if let Err(e) = pub_sub_sender.send(value).await {
warn!(
"[{}] Cannot send pub/sub message to caller: {e}",
self.tag
);
if let Err(e) = pub_sub_sender.unbounded_send(value) {
let error_desc = e.to_string();
if let Ok(ref_value) = &e.into_inner() {
if let Some(pub_sub_message) =
RefPubSubMessage::from_resp(ref_value)
{
if let RefPubSubMessage::Message(
channel_or_pattern,
_,
)
| RefPubSubMessage::SMessage(
channel_or_pattern,
_,
) = pub_sub_message
{
warn!(
"[{}] Cannot send pub/sub message to caller from channel `{}`: {error_desc}",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
}
}
}
}
}
None => {
error!(
"[{}] Unexpected message on channel '{:?}'",
"[{}] Unexpected message on channel `{}`",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
Expand All @@ -600,7 +618,7 @@ impl NetworkHandler {
{
return Some(Err(Error::Client(
format!(
"There is already a subscription on channel '{}'",
"There is already a subscription on channel `{}`",
String::from_utf8_lossy(channel_or_pattern)
)
.to_string(),
Expand All @@ -612,14 +630,14 @@ impl NetworkHandler {
}
} else {
error!(
"[{}] Unexpected subscription confirmation on channel '{}'",
"[{}] Unexpected subscription confirmation on channel `{}`",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
}
} else {
error!(
"[{}] Cannot find pending subscription for channel '{}'",
"[{}] Cannot find pending subscription for channel `{}`",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
Expand All @@ -634,7 +652,7 @@ impl NetworkHandler {
if remaining.len() > 1 {
if remaining.remove(channel_or_pattern).is_none() {
error!(
"[{}] Cannot find channel or pattern to remove: {}",
"[{}] Cannot find channel or pattern to remove: `{}`",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
Expand All @@ -645,15 +663,15 @@ impl NetworkHandler {
let Some(mut remaining) = self.pending_unsubscriptions.pop_front()
else {
error!(
"[{}] Cannot find channel or pattern to remove: {}",
"[{}] Cannot find channel or pattern to remove: `{}`",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
return None;
};
if remaining.remove(channel_or_pattern).is_none() {
error!(
"[{}] Cannot find channel or pattern to remove: {}",
"[{}] Cannot find channel or pattern to remove: `{}`",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
Expand All @@ -677,7 +695,7 @@ impl NetworkHandler {
}
None => {
error!(
"[{}] Unexpected message on channel '{}' for pattern '{}'",
"[{}] Unexpected message on channel `{}` for pattern `{}`",
self.tag,
String::from_utf8_lossy(channel),
String::from_utf8_lossy(pattern)
Expand Down

0 comments on commit 527e6ec

Please sign in to comment.