@@ -211,52 +211,6 @@ protected void TakeOver(ChannelBase other)
211211 _recoveryWrapper . Takeover ( other . _recoveryWrapper ) ;
212212 }
213213
214- public void Close ( ushort replyCode , string replyText , bool abort )
215- {
216- var reason = new ShutdownEventArgs ( ShutdownInitiator . Application , replyCode , replyText ) ;
217- var k = new ShutdownContinuation ( ) ;
218- ChannelShutdown += k . OnConnectionShutdown ;
219-
220- try
221- {
222- ConsumerDispatcher . Quiesce ( ) ;
223-
224- if ( SetCloseReason ( reason ) )
225- {
226- _Private_ChannelClose ( reason . ReplyCode , reason . ReplyText , reason . ClassId , reason . MethodId ) ;
227- }
228-
229- k . Wait ( TimeSpan . FromMilliseconds ( 10000 ) ) ;
230-
231- ConsumerDispatcher . WaitForShutdown ( ) ;
232- }
233- catch ( AlreadyClosedException )
234- {
235- if ( ! abort )
236- {
237- throw ;
238- }
239- }
240- catch ( IOException )
241- {
242- if ( ! abort )
243- {
244- throw ;
245- }
246- }
247- catch ( Exception )
248- {
249- if ( ! abort )
250- {
251- throw ;
252- }
253- }
254- finally
255- {
256- ChannelShutdown -= k . OnConnectionShutdown ;
257- }
258- }
259-
260214 public Task CloseAsync ( ushort replyCode , string replyText , bool abort )
261215 {
262216 var args = new ShutdownEventArgs ( ShutdownInitiator . Application , replyCode , replyText ) ;
@@ -448,63 +402,7 @@ private void HandleCommand(in IncomingCommand cmd)
448402 }
449403 }
450404
451- protected void ChannelRpc < TMethod > ( in TMethod method , ProtocolCommandId returnCommandId )
452- where TMethod : struct , IOutgoingAmqpMethod
453- {
454- var k = new SimpleBlockingRpcContinuation ( ) ;
455- IncomingCommand reply = default ;
456- _rpcSemaphore . Wait ( ) ;
457- try
458- {
459- Enqueue ( k ) ;
460- Session . Transmit ( in method ) ;
461- k . GetReply ( ContinuationTimeout , out reply ) ;
462-
463- if ( reply . CommandId != returnCommandId )
464- {
465- throw new UnexpectedMethodException ( reply . CommandId , returnCommandId ) ;
466- }
467- }
468- finally
469- {
470- reply . ReturnBuffers ( ) ;
471- _rpcSemaphore . Release ( ) ;
472- }
473- }
474-
475- protected TReturn ChannelRpc < TMethod , TReturn > ( in TMethod method , ProtocolCommandId returnCommandId , Func < RentedMemory , TReturn > createFunc )
476- where TMethod : struct , IOutgoingAmqpMethod
477- {
478- IncomingCommand reply = default ;
479- try
480- {
481- var k = new SimpleBlockingRpcContinuation ( ) ;
482-
483- _rpcSemaphore . Wait ( ) ;
484- try
485- {
486- Enqueue ( k ) ;
487- Session . Transmit ( in method ) ;
488- k . GetReply ( ContinuationTimeout , out reply ) ;
489- }
490- finally
491- {
492- _rpcSemaphore . Release ( ) ;
493- }
494-
495- if ( reply . CommandId != returnCommandId )
496- {
497- throw new UnexpectedMethodException ( reply . CommandId , returnCommandId ) ;
498- }
499-
500- return createFunc ( reply . Method ) ;
501- }
502- finally
503- {
504- reply . ReturnBuffers ( ) ;
505- }
506- }
507-
405+ // TODO REMOVE rabbitmq-dotnet-client-1472
508406 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
509407 protected void ChannelSend < T > ( in T method ) where T : struct , IOutgoingAmqpMethod
510408 {
@@ -517,19 +415,6 @@ protected ValueTask ModelSendAsync<T>(in T method, CancellationToken cancellatio
517415 return Session . TransmitAsync ( in method , cancellationToken ) ;
518416 }
519417
520- [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
521- protected void ChannelSend < TMethod , THeader > ( in TMethod method , in THeader header , ReadOnlyMemory < byte > body )
522- where TMethod : struct , IOutgoingAmqpMethod
523- where THeader : IAmqpHeader
524- {
525- if ( ! _flowControlBlock . IsSet )
526- {
527- _flowControlBlock . Wait ( ) ;
528- }
529-
530- Session . Transmit ( in method , in header , body ) ;
531- }
532-
533418 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
534419 protected ValueTask ModelSendAsync < TMethod , THeader > ( in TMethod method , in THeader header , ReadOnlyMemory < byte > body , CancellationToken cancellationToken )
535420 where TMethod : struct , IOutgoingAmqpMethod
@@ -620,7 +505,7 @@ protected virtual void Dispose(bool disposing)
620505 // dispose unmanaged resources
621506 }
622507
623- public abstract void ConnectionTuneOk ( ushort channelMax , uint frameMax , ushort heartbeat ) ;
508+ public abstract Task ConnectionTuneOkAsync ( ushort channelMax , uint frameMax , ushort heartbeat , CancellationToken cancellationToken ) ;
624509
625510 protected void HandleBasicAck ( in IncomingCommand cmd )
626511 {
@@ -884,7 +769,8 @@ protected void HandleChannelClose(in IncomingCommand cmd)
884769
885770 Session . Close ( CloseReason , false ) ;
886771
887- _Private_ChannelCloseOk ( ) ;
772+ // TODO async
773+ _Private_ChannelCloseOkAsync ( CancellationToken . None ) . EnsureCompleted ( ) ;
888774 }
889775 finally
890776 {
@@ -1067,12 +953,12 @@ protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
1067953 }
1068954 }
1069955
1070- public abstract void _Private_ChannelClose ( ushort replyCode , string replyText , ushort classId , ushort methodId ) ;
1071-
1072- public abstract void _Private_ChannelCloseOk ( ) ;
956+ public abstract Task _Private_ChannelCloseOkAsync ( CancellationToken cancellationToken ) ;
1073957
958+ // TODO async
1074959 public abstract void _Private_ChannelFlowOk ( bool active ) ;
1075960
961+ // TODO async
1076962 public abstract void _Private_ConnectionCloseOk ( ) ;
1077963
1078964 public abstract ValueTask BasicAckAsync ( ulong deliveryTag , bool multiple ) ;
0 commit comments