Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions src/Grpc.Net.Client/Balancer/Subchannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#if SUPPORT_LOAD_BALANCING
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
Expand Down Expand Up @@ -56,12 +57,13 @@ public sealed class Subchannel : IDisposable
/// </summary>
internal ConnectivityState State => _state;

private readonly ConnectionManager _manager;
internal readonly ConnectionManager _manager;
private readonly ILogger _logger;

private ConnectContext? _connectContext;
private ConnectivityState _state;
private TaskCompletionSource<object?>? _delayInterruptTcs;
private int _currentRegistrationId;

/// <summary>
/// Gets the current connected address.
Expand Down Expand Up @@ -101,25 +103,40 @@ public IDisposable OnStateChanged(Action<SubchannelState> callback)
return registration;
}

private string GetNextRegistrationId()
{
var registrationId = Interlocked.Increment(ref _currentRegistrationId);
return Id + "-" + registrationId;
}

private sealed class StateChangedRegistration : IDisposable
{
private readonly Subchannel _subchannel;
private readonly Action<SubchannelState> _callback;

public string RegistrationId { get; }

public StateChangedRegistration(Subchannel subchannel, Action<SubchannelState> callback)
{
_subchannel = subchannel;
_callback = callback;
RegistrationId = subchannel.GetNextRegistrationId();

SubchannelLog.StateChangedRegistrationCreated(_subchannel._logger, _subchannel.Id, RegistrationId);
}

public void Invoke(SubchannelState state)
{
SubchannelLog.ExecutingStateChangedRegistration(_subchannel._logger, _subchannel.Id, RegistrationId);
_callback(state);
}

public void Dispose()
{
_subchannel._stateChangedRegistrations.Remove(this);
if (_subchannel._stateChangedRegistrations.Remove(this))
{
SubchannelLog.StateChangedRegistrationRemoved(_subchannel._logger, _subchannel.Id, RegistrationId);
}
}
}

Expand Down Expand Up @@ -348,6 +365,10 @@ internal void RaiseStateChanged(ConnectivityState state, Status status)
registration.Invoke(subchannelState);
}
}
else
{
SubchannelLog.NoStateChangedRegistrations(_logger, Id);
}
}

/// <inheritdocs />
Expand Down Expand Up @@ -379,6 +400,11 @@ public IReadOnlyList<BalancerAddress> GetAddresses()
public void Dispose()
{
UpdateConnectivityState(ConnectivityState.Shutdown, "Subchannel disposed.");

foreach (var registration in _stateChangedRegistrations)
{
SubchannelLog.StateChangedRegistrationRemoved(_logger, Id, registration.RegistrationId);
}
_stateChangedRegistrations.Clear();

CancelInProgressConnect();
Expand Down Expand Up @@ -421,6 +447,21 @@ internal static class SubchannelLog
private static readonly Action<ILogger, int, ConnectivityState, string, Exception?> _subchannelStateChanged =
LoggerMessage.Define<int, ConnectivityState, string>(LogLevel.Debug, new EventId(11, "SubchannelStateChanged"), "Subchannel id '{SubchannelId}' state changed to {State}. Detail: '{Detail}'.");

private static readonly Action<ILogger, int, string, Exception?> _stateChangedRegistrationCreated =
LoggerMessage.Define<int, string>(LogLevel.Trace, new EventId(12, "StateChangedRegistrationCreated"), "Subchannel id '{SubchannelId}' state changed registration '{RegistrationId}' created.");

private static readonly Action<ILogger, int, string, Exception?> _stateChangedRegistrationRemoved =
LoggerMessage.Define<int, string>(LogLevel.Trace, new EventId(13, "StateChangedRegistrationRemoved"), "Subchannel id '{SubchannelId}' state changed registration '{RegistrationId}' removed.");

private static readonly Action<ILogger, int, string, Exception?> _executingStateChangedRegistration =
LoggerMessage.Define<int, string>(LogLevel.Trace, new EventId(14, "ExecutingStateChangedRegistration"), "Subchannel id '{SubchannelId}' executing state changed registration '{RegistrationId}'.");

private static readonly Action<ILogger, int, Exception?> _noStateChangedRegistrations =
LoggerMessage.Define<int>(LogLevel.Trace, new EventId(15, "NoStateChangedRegistrations"), "Subchannel id '{SubchannelId}' has no state changed registrations.");

private static readonly Action<ILogger, int, BalancerAddress, Exception?> _subchannelPreserved =
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(16, "SubchannelPreserved"), "Subchannel id '{SubchannelId}' matches address '{Address}' and is preserved.");

public static void SubchannelCreated(ILogger logger, int subchannelId, IReadOnlyList<BalancerAddress> addresses)
{
if (logger.IsEnabled(LogLevel.Debug))
Expand Down Expand Up @@ -479,6 +520,31 @@ public static void SubchannelStateChanged(ILogger logger, int subchannelId, Conn
{
_subchannelStateChanged(logger, subchannelId, state, status.Detail, status.DebugException);
}

public static void ExecutingStateChangedRegistration(ILogger logger, int subchannelId, string registrationId)
{
_executingStateChangedRegistration(logger, subchannelId, registrationId, null);
}

public static void NoStateChangedRegistrations(ILogger logger, int subchannelId)
{
_noStateChangedRegistrations(logger, subchannelId, null);
}

public static void StateChangedRegistrationCreated(ILogger logger, int subchannelId, string registrationId)
{
_stateChangedRegistrationCreated(logger, subchannelId, registrationId, null);
}

public static void StateChangedRegistrationRemoved(ILogger logger, int subchannelId, string registrationId)
{
_stateChangedRegistrationRemoved(logger, subchannelId, registrationId, null);
}

public static void SubchannelPreserved(ILogger logger, int subchannelId, BalancerAddress address)
{
_subchannelPreserved(logger, subchannelId, address, null);
}
}
}
#endif
25 changes: 20 additions & 5 deletions src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#if SUPPORT_LOAD_BALANCING
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Net;
using Grpc.Core;
Expand Down Expand Up @@ -133,15 +134,17 @@ public override void UpdateChannelState(ChannelState state)
// Check existing subchannels for a match.
var i = FindSubchannelByAddress(currentSubchannels, address);

AddressSubchannel newOrCurrentSubConnection;
AddressSubchannel newOrCurrentSubchannel;
if (i != null)
{
// There is a match so take current subchannel.
newOrCurrentSubConnection = currentSubchannels[i.GetValueOrDefault()];
newOrCurrentSubchannel = currentSubchannels[i.GetValueOrDefault()];

// Remove from current collection because any subchannels
// remaining in this collection at the end will be disposed.
currentSubchannels.RemoveAt(i.GetValueOrDefault());

SubchannelLog.SubchannelPreserved(_logger, newOrCurrentSubchannel.Subchannel.Id, address);
}
else
{
Expand All @@ -150,10 +153,10 @@ public override void UpdateChannelState(ChannelState state)
c.OnStateChanged(s => UpdateSubchannelState(c, s));

newSubchannels.Add(c);
newOrCurrentSubConnection = new AddressSubchannel(c, address);
newOrCurrentSubchannel = new AddressSubchannel(c, address);
}

allUpdatedSubchannels.Add(newOrCurrentSubConnection);
allUpdatedSubchannels.Add(newOrCurrentSubchannel);
}

// Any sub-connections still in this collection are no longer returned by the resolver.
Expand Down Expand Up @@ -227,6 +230,7 @@ private void UpdateBalancingState(Status status)
}
else
{
SubchannelsLoadBalancerLog.CreatingReadyPicker(_logger, readySubchannels);
UpdateChannelState(ConnectivityState.Ready, CreatePicker(readySubchannels));
}
}
Expand Down Expand Up @@ -298,7 +302,7 @@ protected override void Dispose(bool disposing)
/// <returns>A subchannel picker.</returns>
protected abstract SubchannelPicker CreatePicker(IReadOnlyList<Subchannel> readySubchannels);

private class AddressSubchannel
private sealed class AddressSubchannel
{
private ConnectivityState _lastKnownState;

Expand Down Expand Up @@ -340,6 +344,9 @@ internal static class SubchannelsLoadBalancerLog
private static readonly Action<ILogger, int, ConnectivityState, Exception?> _requestingConnectionForSubchannel =
LoggerMessage.Define<int, ConnectivityState>(LogLevel.Trace, new EventId(5, "RequestingConnectionForSubchannel"), "Requesting connection for subchannel id '{SubchannelId}' because it is in state {State}.");

private static readonly Action<ILogger, int, string, Exception?> _creatingReadyPicker =
LoggerMessage.Define<int, string>(LogLevel.Trace, new EventId(6, "CreatingReadyPicker"), "Creating ready picker with {SubchannelCount} subchannels: {Subchannels}");

public static void ProcessingSubchannelStateChanged(ILogger logger, int subchannelId, ConnectivityState state, Status status)
{
_processingSubchannelStateChanged(logger, subchannelId, state, status.Detail, status.DebugException);
Expand All @@ -364,6 +371,14 @@ public static void RequestingConnectionForSubchannel(ILogger logger, int subchan
{
_requestingConnectionForSubchannel(logger, subchannelId, state, null);
}

public static void CreatingReadyPicker(ILogger logger, List<Subchannel> readySubchannels)
{
if (logger.IsEnabled(LogLevel.Trace))
{
_creatingReadyPicker(logger, readySubchannels.Count, string.Join(", ", readySubchannels.Select(s => $"id '{s.Id}' ({string.Join(",", s.GetAddresses())})")), null);
}
}
}
}
#endif
2 changes: 1 addition & 1 deletion test/FunctionalTests/Balancer/BalancerHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public TestSubchannelTransportFactory(TimeSpan socketPingInterval, TimeSpan? con
public ISubchannelTransport Create(Subchannel subchannel)
{
#if NET5_0_OR_GREATER
return new SocketConnectivitySubchannelTransport(subchannel, _socketPingInterval, _connectTimeout, NullLoggerFactory.Instance);
return new SocketConnectivitySubchannelTransport(subchannel, _socketPingInterval, _connectTimeout, subchannel._manager.LoggerFactory);
#else
return new PassiveSubchannelTransport(subchannel);
#endif
Expand Down