Closed
Description
I've drafted a proposal for a new IModel interface that I tried to keep as slim as possible while changing everything relevant to Async.
This proposal is in no way finalized and should serve as a discussion base. (I'll update the proposal accordingly)
Please also verify the naming of the members and methods, I think with the breaking change that we do anyway, we should make sure the new names are exactly what we want.
A few things to highlight:
- I renamed the interface to IChannel to match other languages.
- I changed the naming generally to reflect what it is doing away from the protocol names used previously (I think they're confusing for a user without the background of the protocol).
- I named the methods in a style where it begins with a verb and all async methods have the Async Postfix. Events in past tense.
- All Async methods return ValueTask or ValueTask
- All passive methods were replaced by an additional parameter "throwOnMismatch"
- All noWait methods were replaced by an additional parameter "waitForConfirmation"
- A few methods were removed (comments suggested deprecation or I felt they are not useful)
- I've added several todos at places where things are still unclear / need discussion
- I've tried to group them for better visibility.
Prerequisites:
- Used ValueTuple => Not possible for net461
- Used IAsyncDisposable => Not possible for net461, Reference Microsoft.Bcl.AsyncInterfaces or target .net core 3.1
- Used nullable reference types => Not possible for net461, Workaround possible for .netstandard 2.0
As a reference, here's the current interface.
public interface IChannel : IDisposable, IAsyncDisposable
{
int ChannelNumber { get; }
bool IsOpen { get; }
TimeSpan ContinuationTimeout { get; set; }
/****************************
* Various methods
****************************/
ValueTask SetQosAsync(uint prefetchSize, ushort prefetchCount, bool global);
// exception, context-detail
event Action<Exception, Dictionary<string, object>?>? UnhandledExceptionOccurred;
// active, TODO ask is this still needed?
event Action<bool>? FlowControlChanged;
/****************************
* Message retrieval methods
****************************/
ValueTask<string> ActivateConsumerAsync(IBasicConsumer consumer, string queue, bool autoAck, string consumerTag = "", bool noLocal = false, bool exclusive = false, IDictionary<string, object>? arguments = null);
ValueTask CancelConsumerAsync(string consumerTag, bool waitForConfirmation = true);
// TODO return value depends on how we represent messages
ValueTask<BasicGetResult?> RetrieveSingleMessageAsync(string queue, bool autoAck);
ValueTask AckMessageAsync(ulong deliveryTag, bool multiple);
ValueTask NackMessageAsync(ulong deliveryTag, bool multiple, bool requeue);
// TODO ask is this still needed?
ValueTask RejectMessageAsync(ulong deliveryTag, bool requeue);
/****************************
* Message publication methods
****************************/
event MessageDeliveryFailedDelegate? MessageDeliveryFailed;
ValueTask PublishMessageAsync(string exchange, string routingKey, IBasicProperties? basicProperties, ReadOnlyMemory<byte> body, bool mandatory = false);
// Depends on #972
ValueTask PublishMessageAsync(ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, IBasicProperties? basicProperties, ReadOnlyMemory<byte> body, bool mandatory = false);
ValueTask ActivatePublishAcksAsync();
// delivery tag, multiple, isAck
event Action<ulong, bool, bool>? PublishAckReceived;
event Action<ulong>? NewPublishTagUsed;
ValueTask<ulong> PublishAckableMessageAsync(string exchange, string routingKey, IBasicProperties? basicProperties, ReadOnlyMemory<byte> body, bool mandatory = false);
/****************************
* Close methods
****************************/
// CloseReason
event Action<ShutdownEventArgs>? Shutdown;
ShutdownEventArgs? CloseReason { get; }
ValueTask AbortAsync(ushort replyCode, string replyText);
ValueTask CloseAsync(ushort replyCode, string replyText);
/****************************
* Exchange methods
****************************/
ValueTask DeclareExchangeAsync(string exchange, string type, bool durable, bool autoDelete, bool throwOnMismatch = true, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);
ValueTask DeleteExchangeAsync(string exchange, bool ifUnused = false, bool waitForConfirmation = true);
ValueTask BindExchangeAsync(string destination, string source, string routingKey, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);
ValueTask UnbindExchangeAsync(string destination, string source, string routingKey, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);
/****************************
* Queue methods
****************************/
ValueTask<(string QueueName, uint MessageCount, uint ConsumerCount)> DeclareQueueAsync(string queue, bool durable, bool exclusive, bool autoDelete, bool throwOnMismatch = true, IDictionary<string, object>? arguments = null);
ValueTask DeclareQueueWithoutConfirmationAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object>? arguments = null);
ValueTask<uint> DeleteQueueAsync(string queue, bool ifUnused = false, bool ifEmpty = false);
ValueTask DeleteQueueWithoutConfirmationAsync(string queue, bool ifUnused = false, bool ifEmpty = false);
ValueTask BindQueueAsync(string queue, string exchange, string routingKey, IDictionary<string, object>? arguments = null, bool waitForConfirmation = true);
ValueTask UnbindQueueAsync(string queue, string exchange, string routingKey, IDictionary<string, object>? arguments = null);
ValueTask<uint> PurgeQueueAsync(string queue);
/****************************
* Transaction methods
****************************/
ValueTask ActivateTransactionsAsync();
ValueTask CommitTransactionAsync();
ValueTask RollbackTransactionAsync();
}
// Delegate definitions due to "in" modifier
public delegate void MessageDeliveryFailedDelegate(in Message message, string replyText, ushort replyCode);
// Auxiliary types
public readonly struct Message
{
public readonly string Exchange;
public readonly string RoutingKey;
public readonly IBasicProperties Properties;
public readonly ReadOnlyMemory<byte> Body;
public Message(string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
Exchange = exchange;
RoutingKey = routingKey;
Properties = properties;
Body = body;
}
}
// Extension methods
public static class ChannelExtensions
{
// Queue methods
ValueTask<(string QueueName, uint MessageCount, uint ConsumerCount)> DeclareQueueAsync(this Channel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, bool throwOnMismatch = true, IDictionary<string, object>? arguments = null);
ValueTask<uint> GetQueueMessageCountAsync(this Channel channel, string queue);
ValueTask<uint> GetQueueConsumerCountAsync(this Channel channel, string queue);
// Close methods
ValueTask CloseAsync(this Channel channel);
ValueTask AbortAsync(this Channel channel);
}
The WaitForConfirms methods I plan to replace with a new class that can be wrapped around IChannel, so customer can decide if they want to use our basic implementation or build something on their own.
public sealed class PublishAckAwaiter : IValueTaskSource<bool>
{
public PublishAckAwaiter(IChannel channel) { }
ValueTask<bool> WaitForAckAsync(ulong publishTag, CancellationToken cancellation) { }
ValueTask<bool> WaitForAckOrCloseAsync(ulong publishTag, CancellationToken cancellation) { }
ValueTask<bool> WaitForAllAcksAsync(CancellationToken cancellation) {}
ValueTask<bool> WaitForAllAcksOrCloseAsync(CancellationToken cancellation) {}
}
FYI removed methods:
public interface IModel : IDisposable
{
// Does anyone really use it? Removed due to not known by me.
IBasicConsumer DefaultConsumer { get; set; }
// not needed if we have IsOpen & ShutdownReason
bool IsClosed { get; }
// replaced by return value on publish method
ulong NextPublishSeqNo { get; }
// deprecated according to comments
event EventHandler<EventArgs> BasicRecoverOk;
void BasicRecover(bool requeue);
void BasicRecoverAsync(bool requeue);
// removed from interface, can be delivered as extension methods
void Abort();
void Close();
// "new BasicProperty()" / "new BasicPublishBatch()" => no need to hide them behind these methods
IBasicPublishBatch CreateBasicPublishBatch();
IBasicPublishBatch CreateBasicPublishBatch(int sizeHint);
IBasicProperties CreateBasicProperties();
}
Metadata
Metadata
Assignees
Labels
No labels