Skip to content

Is Consumer::next cancel safe (for both Single and Multi consumers)? #374

@chamons

Description

@chamons

I have a high throughput system that needs to be able to process many requests in parallel. Each operation can take 10s of seconds in a synchronous API call.

I currently have my system architected with a single pulsar consumer connection worker and many worker tasks which fetch requests and send ack/nack requets back. It was found that having hundreds of pulsar consumers in a process, each listening to many topics was expensive and wasteful.

   loop {
            select! {
                biased;

                ack_nack_request = self.ack_nack_rx.recv() => {
                    self.handle_ack_request(ack_nack_request).await;
                }

                maybe_message = self.consumer.next(), if self.current_fetch_request.is_some() => {
                    self.handle_new_pulsar_message(maybe_message, false);
                }

                maybe_fetch_request = self.fetch_request_rx.recv(), if self.current_fetch_request.is_none() && !self.fetch_request_rx.is_closed() => {
                    self.handle_new_message_request(maybe_fetch_request);
                }
            }
}

In scale testing, we have seen some strange behavior where some messages get "stuck" until the consumer pods are restarted.

I have verified most of the API calls in my core select are cancel safe, however, future-util next is ambiguous. The best I can find online (https://rfd.shared.oxide.computer/rfd/400) suggests:

StreamExt::next is not documented to be cancel-safe, and its cancel safety depends on how the underlying Stream behaves. However, a Stream that couldn't handle interruptions while generating its values would clearly be buggy. After all, the entire point of a Stream is to iterate over values in situations where the next item isn't immediately available

I poked around source code attempting to find cancel-unsafe calls, but the call chain is non-trivial to analyze.

Is pulsar-rs documented to be cancel safe/unsafe?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions