Skip to content

Proposal: Full async channel interface (IModel) #970

Closed
@bollhals

Description

@bollhals

I've drafted a proposal for a new IModel interface that I tried to keep as slim as possible while changing everything relevant to Async.
This proposal is in no way finalized and should serve as a discussion base. (I'll update the proposal accordingly)
Please also verify the naming of the members and methods, I think with the breaking change that we do anyway, we should make sure the new names are exactly what we want.

A few things to highlight:

  • I renamed the interface to IChannel to match other languages.
  • I changed the naming generally to reflect what it is doing away from the protocol names used previously (I think they're confusing for a user without the background of the protocol).
  • I named the methods in a style where it begins with a verb and all async methods have the Async Postfix. Events in past tense.
  • All Async methods return ValueTask or ValueTask
  • All passive methods were replaced by an additional parameter "throwOnMismatch"
  • All noWait methods were replaced by an additional parameter "waitForConfirmation"
  • A few methods were removed (comments suggested deprecation or I felt they are not useful)
  • I've added several todos at places where things are still unclear / need discussion
  • I've tried to group them for better visibility.

Prerequisites:

  • Used ValueTuple => Not possible for net461
  • Used IAsyncDisposable => Not possible for net461, Reference Microsoft.Bcl.AsyncInterfaces or target .net core 3.1
  • Used nullable reference types => Not possible for net461, Workaround possible for .netstandard 2.0

As a reference, here's the current interface.

    public interface IChannel : IDisposable, IAsyncDisposable
    {
        int ChannelNumber { get; }
        bool IsOpen { get; }
        TimeSpan ContinuationTimeout { get; set; }

        /****************************
         * Various methods
         ****************************/
        ValueTask SetQosAsync(uint prefetchSize, ushort prefetchCount, bool global);
        // exception, context-detail
        event Action<Exception, Dictionary<string, object>?>? UnhandledExceptionOccurred;
        // active, TODO ask is this still needed?
        event Action<bool>? FlowControlChanged;

        /****************************
         * Message retrieval methods
         ****************************/
        ValueTask<string> ActivateConsumerAsync(IBasicConsumer consumer, string queue, bool autoAck, string consumerTag = "", bool noLocal = false, bool exclusive = false, IDictionary<string, object>? arguments = null);
        ValueTask CancelConsumerAsync(string consumerTag, bool waitForConfirmation = true);
        // TODO return value depends on how we represent messages
        ValueTask<BasicGetResult?> RetrieveSingleMessageAsync(string queue, bool autoAck);
        ValueTask AckMessageAsync(ulong deliveryTag, bool multiple);
        ValueTask NackMessageAsync(ulong deliveryTag, bool multiple, bool requeue);
        // TODO ask is this still needed?
        ValueTask RejectMessageAsync(ulong deliveryTag, bool requeue);

        /****************************
         * Message publication methods
         ****************************/
        event MessageDeliveryFailedDelegate? MessageDeliveryFailed;
        ValueTask PublishMessageAsync(string exchange, string routingKey, IBasicProperties? basicProperties, ReadOnlyMemory<byte> body, bool mandatory = false);
        // Depends on #972 
        ValueTask PublishMessageAsync(ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, IBasicProperties? basicProperties, ReadOnlyMemory<byte> body, bool mandatory = false);
        ValueTask ActivatePublishAcksAsync();
        // delivery tag, multiple, isAck
        event Action<ulong, bool, bool>? PublishAckReceived;
        event Action<ulong>? NewPublishTagUsed;
        ValueTask<ulong> PublishAckableMessageAsync(string exchange, string routingKey, IBasicProperties? basicProperties, ReadOnlyMemory<byte> body, bool mandatory = false);

        /****************************
         * Close methods
         ****************************/
        // CloseReason
        event Action<ShutdownEventArgs>? Shutdown;
        ShutdownEventArgs? CloseReason { get; }
        ValueTask AbortAsync(ushort replyCode, string replyText);
        ValueTask CloseAsync(ushort replyCode, string replyText);

        /****************************
         * Exchange methods
         ****************************/
        ValueTask DeclareExchangeAsync(string exchange, string type, bool durable, bool autoDelete, bool throwOnMismatch = true, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);
        ValueTask DeleteExchangeAsync(string exchange, bool ifUnused = false, bool waitForConfirmation = true);
        ValueTask BindExchangeAsync(string destination, string source, string routingKey, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);
        ValueTask UnbindExchangeAsync(string destination, string source, string routingKey, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);

        /****************************
         * Queue methods
         ****************************/
        ValueTask<(string QueueName, uint MessageCount, uint ConsumerCount)> DeclareQueueAsync(string queue, bool durable, bool exclusive, bool autoDelete, bool throwOnMismatch = true, IDictionary<string, object>? arguments = null);
        ValueTask DeclareQueueWithoutConfirmationAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object>? arguments = null);
        ValueTask<uint> DeleteQueueAsync(string queue, bool ifUnused = false, bool ifEmpty = false);
        ValueTask DeleteQueueWithoutConfirmationAsync(string queue, bool ifUnused = false, bool ifEmpty = false);
        ValueTask BindQueueAsync(string queue, string exchange, string routingKey, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);
        ValueTask UnbindQueueAsync(string queue, string exchange, string routingKey, IDictionary<string, object>? arguments = null);
        ValueTask<uint> PurgeQueueAsync(string queue);

        /****************************
         * Transaction methods
         ****************************/
        ValueTask ActivateTransactionsAsync();
        ValueTask CommitTransactionAsync();
        ValueTask RollbackTransactionAsync();
    }

    // Delegate definitions due to "in" modifier
    public delegate void MessageDeliveryFailedDelegate(in Message message, string replyText, ushort replyCode);
 
    // Auxiliary types
    public readonly struct Message
    {
        public readonly string Exchange;
        public readonly string RoutingKey;
        public readonly IBasicProperties Properties;
        public readonly ReadOnlyMemory<byte> Body;

        public Message(string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            Exchange = exchange;
            RoutingKey = routingKey;
            Properties = properties;
            Body = body;
        }
    }

    // Extension methods
    public static class ChannelExtensions
    {
        // Queue methods
        ValueTask<(string QueueName, uint MessageCount, uint ConsumerCount)> DeclareQueueAsync(this Channel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, bool throwOnMismatch = true, IDictionary<string, object>? arguments = null);
        ValueTask<uint> GetQueueMessageCountAsync(this Channel channel, string queue);
        ValueTask<uint> GetQueueConsumerCountAsync(this Channel channel, string queue);

        // Close methods
        ValueTask CloseAsync(this Channel channel);
        ValueTask AbortAsync(this Channel channel);
    }

The WaitForConfirms methods I plan to replace with a new class that can be wrapped around IChannel, so customer can decide if they want to use our basic implementation or build something on their own.

    public sealed class PublishAckAwaiter : IValueTaskSource<bool>
    {
        public PublishAckAwaiter(IChannel channel) { }

        ValueTask<bool> WaitForAckAsync(ulong publishTag, CancellationToken cancellation) { }
        ValueTask<bool> WaitForAckOrCloseAsync(ulong publishTag, CancellationToken cancellation) { }
        ValueTask<bool> WaitForAllAcksAsync(CancellationToken cancellation) {}
        ValueTask<bool> WaitForAllAcksOrCloseAsync(CancellationToken cancellation) {}
    }

FYI removed methods:

    public interface IModel : IDisposable
    {
        // Does anyone really use it? Removed due to not known by me.
        IBasicConsumer DefaultConsumer { get; set; }
        // not needed if we have IsOpen & ShutdownReason
        bool IsClosed { get; }
        // replaced by return value on publish method
        ulong NextPublishSeqNo { get; }

        // deprecated according to comments
        event EventHandler<EventArgs> BasicRecoverOk;
        void BasicRecover(bool requeue);
        void BasicRecoverAsync(bool requeue);

        // removed from interface, can be delivered as extension methods
        void Abort();
        void Close();

        // "new BasicProperty()" / "new BasicPublishBatch()" => no need to hide them behind these methods
        IBasicPublishBatch CreateBasicPublishBatch();
        IBasicPublishBatch CreateBasicPublishBatch(int sizeHint);
        IBasicProperties CreateBasicProperties();
    }

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions