Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.framing;
Expand All @@ -43,19 +42,16 @@ public Channel(ConnectionConfig config, ISession session) : base(config, session
{
}

public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat)
public override Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
{
ChannelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat));
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken)
{
ChannelSend(new ChannelClose(replyCode, replyText, classId, methodId));
}

public override void _Private_ChannelCloseOk()
{
ChannelSend(new ChannelCloseOk());
var method = new ChannelCloseOk();
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override void _Private_ChannelFlowOk(bool active)
Expand Down
14 changes: 0 additions & 14 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,6 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
_innerChannel.RunRecoveryEventHandlers(this);
}

public void Close(ushort replyCode, string replyText, bool abort)
{
ThrowIfDisposed();
try
{
_innerChannel.Close(replyCode, replyText, abort);
}
finally
{
_connection.DeleteRecordedChannel(this,
channelsSemaphoreHeld: false, recordedEntitiesSemaphoreHeld: false);
}
}

public Task CloseAsync(ushort replyCode, string replyText, bool abort)
{
ThrowIfDisposed();
Expand Down
128 changes: 7 additions & 121 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,52 +211,6 @@ protected void TakeOver(ChannelBase other)
_recoveryWrapper.Takeover(other._recoveryWrapper);
}

public void Close(ushort replyCode, string replyText, bool abort)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
var k = new ShutdownContinuation();
ChannelShutdown += k.OnConnectionShutdown;

try
{
ConsumerDispatcher.Quiesce();

if (SetCloseReason(reason))
{
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId);
}

k.Wait(TimeSpan.FromMilliseconds(10000));

ConsumerDispatcher.WaitForShutdown();
}
catch (AlreadyClosedException)
{
if (!abort)
{
throw;
}
}
catch (IOException)
{
if (!abort)
{
throw;
}
}
catch (Exception)
{
if (!abort)
{
throw;
}
}
finally
{
ChannelShutdown -= k.OnConnectionShutdown;
}
}

public Task CloseAsync(ushort replyCode, string replyText, bool abort)
{
var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
Expand Down Expand Up @@ -448,63 +402,7 @@ private void HandleCommand(in IncomingCommand cmd)
}
}

protected void ChannelRpc<TMethod>(in TMethod method, ProtocolCommandId returnCommandId)
where TMethod : struct, IOutgoingAmqpMethod
{
var k = new SimpleBlockingRpcContinuation();
IncomingCommand reply = default;
_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);

if (reply.CommandId != returnCommandId)
{
throw new UnexpectedMethodException(reply.CommandId, returnCommandId);
}
}
finally
{
reply.ReturnBuffers();
_rpcSemaphore.Release();
}
}

protected TReturn ChannelRpc<TMethod, TReturn>(in TMethod method, ProtocolCommandId returnCommandId, Func<RentedMemory, TReturn> createFunc)
where TMethod : struct, IOutgoingAmqpMethod
{
IncomingCommand reply = default;
try
{
var k = new SimpleBlockingRpcContinuation();

_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);
}
finally
{
_rpcSemaphore.Release();
}

if (reply.CommandId != returnCommandId)
{
throw new UnexpectedMethodException(reply.CommandId, returnCommandId);
}

return createFunc(reply.Method);
}
finally
{
reply.ReturnBuffers();
}
}

// TODO REMOVE rabbitmq-dotnet-client-1472
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
{
Expand All @@ -517,19 +415,6 @@ protected ValueTask ModelSendAsync<T>(in T method, CancellationToken cancellatio
return Session.TransmitAsync(in method, cancellationToken);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
where TMethod : struct, IOutgoingAmqpMethod
where THeader : IAmqpHeader
{
if (!_flowControlBlock.IsSet)
{
_flowControlBlock.Wait();
}

Session.Transmit(in method, in header, body);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
where TMethod : struct, IOutgoingAmqpMethod
Expand Down Expand Up @@ -620,7 +505,7 @@ protected virtual void Dispose(bool disposing)
// dispose unmanaged resources
}

public abstract void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat);
public abstract Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken);

protected void HandleBasicAck(in IncomingCommand cmd)
{
Expand Down Expand Up @@ -884,7 +769,8 @@ protected void HandleChannelClose(in IncomingCommand cmd)

Session.Close(CloseReason, false);

_Private_ChannelCloseOk();
// TODO async
_Private_ChannelCloseOkAsync(CancellationToken.None).EnsureCompleted();
}
finally
{
Expand Down Expand Up @@ -1067,12 +953,12 @@ protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
}
}

public abstract void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId);

public abstract void _Private_ChannelCloseOk();
public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken);

// TODO async
public abstract void _Private_ChannelFlowOk(bool active);

// TODO async
public abstract void _Private_ConnectionCloseOk();

public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);
Expand Down
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
uint heartbeatInSeconds = NegotiatedMaxValue((uint)_config.HeartbeatInterval.TotalSeconds, (uint)connectionTune.m_heartbeatInSeconds);
Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds);

// TODO cancellationToken / async
_channel0.ConnectionTuneOk(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds);
await _channel0.ConnectionTuneOkAsync(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds, cancellationToken);

// TODO check for cancellation
MaybeStartCredentialRefresher();
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken
}
}

// TODO async
private void ProcessFrame(InboundFrame frame)
{
bool shallReturnPayload = true;
Expand Down