Skip to content

Timeout/Disconnects during waits on the returned future during sending, safe to retry? a bug, since handled elsewhere? #358

@chamons

Description

@chamons

We have some pulsar delivery code that roughly looks like this:

        let send_future = match producer
            .send_non_blocking(
                topic,
                PulsarPayload {
                    payload,
                    deliver_at_time,
                },
            )
            .await
        {
            Ok(send_future) => send_future,
            Err(e) => {
                let _ = producer.close_producer(topic).await;
                return Err(PulsarDeliveryError::SendError(e));
            }
        };

        match send_future.await {
            Ok(_) => Ok(()),
            Err(e) => {
                let _ = producer.close_producer(topic).await;
                Err(PulsarDeliveryError::AwaitConfirmationError(e))
            }
        }

Under significant load, we receive a small % of errors in that second case (the one that returns AwaitConfirmationError) with either Disconnect or timeout.

Looking into the source code, I see in src/producer.rs that send_inner loops and retries if you get either one of those cases on the initial send.

However, inside the await:

                    let fut = async move {
                        let res = fut.await;
                        res.map_err(|e| {
                            error!("wait send receipt got error: {:?}", e);
                            Error::Producer(ProducerError::Connection(e))
                        })
                    };

We don't, which means they can bubble up.

Two questions:

  1. Is this a bug, shoudln't pulsar-rs handle this for us?
  2. Is it safe for us to retry

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions