-
Notifications
You must be signed in to change notification settings - Fork 141
Open
Description
We have some pulsar delivery code that roughly looks like this:
let send_future = match producer
.send_non_blocking(
topic,
PulsarPayload {
payload,
deliver_at_time,
},
)
.await
{
Ok(send_future) => send_future,
Err(e) => {
let _ = producer.close_producer(topic).await;
return Err(PulsarDeliveryError::SendError(e));
}
};
match send_future.await {
Ok(_) => Ok(()),
Err(e) => {
let _ = producer.close_producer(topic).await;
Err(PulsarDeliveryError::AwaitConfirmationError(e))
}
}
Under significant load, we receive a small % of errors in that second case (the one that returns AwaitConfirmationError) with either Disconnect or timeout.
Looking into the source code, I see in src/producer.rs that send_inner loops and retries if you get either one of those cases on the initial send.
However, inside the await:
let fut = async move {
let res = fut.await;
res.map_err(|e| {
error!("wait send receipt got error: {:?}", e);
Error::Producer(ProducerError::Connection(e))
})
};
We don't, which means they can bubble up.
Two questions:
- Is this a bug, shoudln't pulsar-rs handle this for us?
- Is it safe for us to retry
Metadata
Metadata
Assignees
Labels
No labels