@@ -80,10 +80,9 @@ public SocketConnectivitySubchannelTransport(
8080 _socketConnectedTimer = new Timer ( OnCheckSocketConnection , state : null , Timeout . InfiniteTimeSpan , Timeout . InfiniteTimeSpan ) ;
8181 }
8282
83- public object Lock => _subchannel . Lock ;
83+ private object Lock => _subchannel . Lock ;
8484 public BalancerAddress ? CurrentAddress => _currentAddress ;
8585 public TimeSpan ? ConnectTimeout { get ; }
86- public bool HasStream { get ; }
8786
8887 // For testing. Take a copy under lock for thread-safety.
8988 internal IReadOnlyList < ActiveStream > GetActiveStreams ( )
@@ -264,13 +263,21 @@ public async ValueTask<Stream> GetStreamAsync(BalancerAddress address, Cancellat
264263 Socket ? socket = null ;
265264 lock ( Lock )
266265 {
267- if ( _initialSocket != null &&
268- _initialSocketAddress != null &&
269- Equals ( _initialSocketAddress , address ) )
266+ if ( _initialSocket != null )
270267 {
268+ var socketAddressMatch = Equals ( _initialSocketAddress , address ) ;
269+
271270 socket = _initialSocket ;
272271 _initialSocket = null ;
273272 _initialSocketAddress = null ;
273+
274+ // Double check the address matches the socket address and only use socket on match.
275+ // Not sure if this is possible in practice, but better safe than sorry.
276+ if ( ! socketAddressMatch )
277+ {
278+ socket . Dispose ( ) ;
279+ socket = null ;
280+ }
274281 }
275282 }
276283
@@ -288,6 +295,8 @@ public async ValueTask<Stream> GetStreamAsync(BalancerAddress address, Cancellat
288295
289296 if ( socket == null )
290297 {
298+ SocketConnectivitySubchannelTransportLog . ConnectingOnCreateStream ( _logger , _subchannel . Id , address ) ;
299+
291300 socket = new Socket ( SocketType . Stream , ProtocolType . Tcp ) { NoDelay = true } ;
292301 await socket . ConnectAsync ( address . EndPoint , cancellationToken ) . ConfigureAwait ( false ) ;
293302 }
@@ -300,6 +309,7 @@ public async ValueTask<Stream> GetStreamAsync(BalancerAddress address, Cancellat
300309 lock ( Lock )
301310 {
302311 _activeStreams . Add ( new ActiveStream ( address , socket , stream ) ) ;
312+ SocketConnectivitySubchannelTransportLog . StreamCreated ( _logger , _subchannel . Id , address , _activeStreams . Count ) ;
303313 }
304314
305315 return stream ;
@@ -331,7 +341,7 @@ private void OnStreamDisposed(Stream streamWrapper)
331341 if ( t . Stream == streamWrapper )
332342 {
333343 _activeStreams . RemoveAt ( i ) ;
334- SocketConnectivitySubchannelTransportLog . DisposingStream ( _logger , _subchannel . Id , t . Address ) ;
344+ SocketConnectivitySubchannelTransportLog . DisposingStream ( _logger , _subchannel . Id , t . Address , _activeStreams . Count ) ;
335345
336346 // If the last active streams is removed then there is no active connection.
337347 disconnect = _activeStreams . Count == 0 ;
@@ -399,15 +409,21 @@ internal static class SocketConnectivitySubchannelTransportLog
399409 private static readonly Action < ILogger , int , BalancerAddress , Exception ? > _creatingStream =
400410 LoggerMessage . Define < int , BalancerAddress > ( LogLevel . Trace , new EventId ( 7 , "CreatingStream" ) , "Subchannel id '{SubchannelId}' creating stream for {Address}." ) ;
401411
402- private static readonly Action < ILogger , int , BalancerAddress , Exception ? > _disposingStream =
403- LoggerMessage . Define < int , BalancerAddress > ( LogLevel . Trace , new EventId ( 8 , "DisposingStream" ) , "Subchannel id '{SubchannelId}' disposing stream for {Address}." ) ;
412+ private static readonly Action < ILogger , int , BalancerAddress , int , Exception ? > _disposingStream =
413+ LoggerMessage . Define < int , BalancerAddress , int > ( LogLevel . Trace , new EventId ( 8 , "DisposingStream" ) , "Subchannel id '{SubchannelId}' disposing stream for {Address}. Transport has {ActiveStreams} active streams ." ) ;
404414
405415 private static readonly Action < ILogger , int , Exception ? > _disposingTransport =
406416 LoggerMessage . Define < int > ( LogLevel . Trace , new EventId ( 9 , "DisposingTransport" ) , "Subchannel id '{SubchannelId}' disposing transport." ) ;
407417
408418 private static readonly Action < ILogger , int , Exception > _errorOnDisposingStream =
409419 LoggerMessage . Define < int > ( LogLevel . Error , new EventId ( 10 , "ErrorOnDisposingStream" ) , "Subchannel id '{SubchannelId}' unexpected error when reacting to transport stream dispose." ) ;
410420
421+ private static readonly Action < ILogger , int , BalancerAddress , Exception ? > _connectingOnCreateStream =
422+ LoggerMessage . Define < int , BalancerAddress > ( LogLevel . Trace , new EventId ( 11 , "ConnectingOnCreateStream" ) , "Subchannel id '{SubchannelId}' doesn't have a connected socket available. Connecting new stream socket for {Address}." ) ;
423+
424+ private static readonly Action < ILogger , int , BalancerAddress , int , Exception ? > _streamCreated =
425+ LoggerMessage . Define < int , BalancerAddress , int > ( LogLevel . Trace , new EventId ( 12 , "StreamCreated" ) , "Subchannel id '{SubchannelId}' created stream for {Address}. Transport has {ActiveStreams} active streams." ) ;
426+
411427 public static void ConnectingSocket ( ILogger logger , int subchannelId , BalancerAddress address )
412428 {
413429 _connectingSocket ( logger , subchannelId , address , null ) ;
@@ -443,9 +459,9 @@ public static void CreatingStream(ILogger logger, int subchannelId, BalancerAddr
443459 _creatingStream ( logger , subchannelId , address , null ) ;
444460 }
445461
446- public static void DisposingStream ( ILogger logger , int subchannelId , BalancerAddress address )
462+ public static void DisposingStream ( ILogger logger , int subchannelId , BalancerAddress address , int activeStreams )
447463 {
448- _disposingStream ( logger , subchannelId , address , null ) ;
464+ _disposingStream ( logger , subchannelId , address , activeStreams , null ) ;
449465 }
450466
451467 public static void DisposingTransport ( ILogger logger , int subchannelId )
@@ -457,6 +473,16 @@ public static void ErrorOnDisposingStream(ILogger logger, int subchannelId, Exce
457473 {
458474 _errorOnDisposingStream ( logger , subchannelId , ex ) ;
459475 }
476+
477+ public static void ConnectingOnCreateStream ( ILogger logger , int subchannelId , BalancerAddress address )
478+ {
479+ _connectingOnCreateStream ( logger , subchannelId , address , null ) ;
480+ }
481+
482+ public static void StreamCreated ( ILogger logger , int subchannelId , BalancerAddress address , int activeStreams )
483+ {
484+ _streamCreated ( logger , subchannelId , address , activeStreams , null ) ;
485+ }
460486}
461487#endif
462488#endif
0 commit comments