From 85c1b2bf6d51d4d6d86ce30e39b5a6ffd0a4733f Mon Sep 17 00:00:00 2001 From: Ilya Teplov <47264728+iteplov@users.noreply.github.com> Date: Sat, 15 Oct 2022 16:54:04 -0700 Subject: [PATCH] 2251: Fixes missing activation of the sub leg during initial endpoint discovery (#2268) Fix for #2251 and #2265 ensuring subscription connections are proactively created in all cases. Co-authored-by: Nick Craver --- docs/ReleaseNotes.md | 3 ++- src/StackExchange.Redis/ConnectionMultiplexer.cs | 15 ++++++++++++--- src/StackExchange.Redis/ServerEndPoint.cs | 3 ++- tests/StackExchange.Redis.Tests/Cluster.cs | 16 ++++++++++++++++ .../StackExchange.Redis.Tests/CommandTimeouts.cs | 10 ++++++++-- .../ConnectingFailDetection.cs | 16 ++++++++++++++++ tests/StackExchange.Redis.Tests/Failover.cs | 5 +++-- tests/StackExchange.Redis.Tests/TestBase.cs | 2 +- 8 files changed, 60 insertions(+), 10 deletions(-) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 9a1cf32f6..5fad07b24 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -2,7 +2,8 @@ ## Unreleased -- Fix: `MOVED` with `NoRedirect` (and other non-reachable errors) should respect the `IncludeDetailInExceptions` setting +- Fix: `MOVED` with `NoRedirect` (and other non-reachable errors) should respect the `IncludeDetailInExceptions` setting ([#2267 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2267)) +- Fix [#2251](https://github.com/StackExchange/StackExchange.Redis/issues/2251) & [#2265](https://github.com/StackExchange/StackExchange.Redis/issues/2265): Cluster endpoint connections weren't proactively connecting subscriptions in all cases and taking the full connection timeout to complete as a result ([#2268 by iteplov](https://github.com/StackExchange/StackExchange.Redis/pull/2268)) ## 2.6.66 diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index 4d64fa7de..5630b28fe 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -803,7 +803,15 @@ internal EndPoint[] GetEndPoints() } } // spin up the connection if this is new - if (isNew && activate) server.Activate(ConnectionType.Interactive, log); + if (isNew && activate) + { + server.Activate(ConnectionType.Interactive, log); + if (server.SupportsSubscriptions) + { + // Intentionally not logging the sub connection + server.Activate(ConnectionType.Subscription, null); + } + } } return server; } @@ -1300,9 +1308,10 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP // Log current state after await foreach (var server in servers) { - log?.WriteLine($" {Format.ToString(server.EndPoint)}: Endpoint is {server.ConnectionState}"); + log?.WriteLine($" {Format.ToString(server.EndPoint)}: Endpoint is (Interactive: {server.InteractiveConnectionState}, Subscription: {server.SubscriptionConnectionState})"); } + log?.WriteLine("Task summary:"); EndPointCollection? updatedClusterEndpointCollection = null; for (int i = 0; i < available.Length; i++) { @@ -1388,7 +1397,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP else { server.SetUnselectable(UnselectableFlags.DidNotRespond); - log?.WriteLine($" {Format.ToString(server)}: Did not respond"); + log?.WriteLine($" {Format.ToString(server)}: Did not respond (Task.Status: {task.Status})"); } } diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index 876e21dc4..89ce6a974 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -157,7 +157,8 @@ internal Exception? LastException } } - internal State ConnectionState => interactive?.ConnectionState ?? State.Disconnected; + internal State InteractiveConnectionState => interactive?.ConnectionState ?? State.Disconnected; + internal State SubscriptionConnectionState => subscription?.ConnectionState ?? State.Disconnected; public long OperationCount => interactive?.OperationCount ?? 0 + subscription?.OperationCount ?? 0; diff --git a/tests/StackExchange.Redis.Tests/Cluster.cs b/tests/StackExchange.Redis.Tests/Cluster.cs index f81375c97..d1fe5ef5f 100644 --- a/tests/StackExchange.Redis.Tests/Cluster.cs +++ b/tests/StackExchange.Redis.Tests/Cluster.cs @@ -728,4 +728,20 @@ public void MovedProfiling() } } } + + [Fact] + public void ConnectIncludesSubscriber() + { + using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false); + + var db = conn.GetDatabase(); + db.Ping(); + Assert.True(conn.IsConnected); + + foreach (var server in conn.GetServerSnapshot()) + { + Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.InteractiveConnectionState); + Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.SubscriptionConnectionState); + } + } } diff --git a/tests/StackExchange.Redis.Tests/CommandTimeouts.cs b/tests/StackExchange.Redis.Tests/CommandTimeouts.cs index c7530018b..bcb4c8754 100644 --- a/tests/StackExchange.Redis.Tests/CommandTimeouts.cs +++ b/tests/StackExchange.Redis.Tests/CommandTimeouts.cs @@ -21,7 +21,7 @@ public async Task DefaultHeartbeatTimeout() using var conn = ConnectionMultiplexer.Connect(options); var pauseServer = GetServer(pauseConn); - _ = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 2000); + var pauseTask = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 2500); var key = Me(); var db = conn.GetDatabase(); @@ -30,6 +30,9 @@ public async Task DefaultHeartbeatTimeout() Log(ex.Message); var duration = sw.GetElapsedTime(); Assert.True(duration < TimeSpan.FromSeconds(2100), $"Duration ({duration.Milliseconds} ms) should be less than 2100ms"); + + // Await as to not bias the next test + await pauseTask; } [Fact] @@ -44,7 +47,7 @@ public async Task DefaultHeartbeatLowTimeout() using var conn = ConnectionMultiplexer.Connect(options); var pauseServer = GetServer(pauseConn); - _ = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 2000); + var pauseTask = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 500); var key = Me(); var db = conn.GetDatabase(); @@ -53,5 +56,8 @@ public async Task DefaultHeartbeatLowTimeout() Log(ex.Message); var duration = sw.GetElapsedTime(); Assert.True(duration < TimeSpan.FromSeconds(250), $"Duration ({duration.Milliseconds} ms) should be less than 250ms"); + + // Await as to not bias the next test + await pauseTask; } } diff --git a/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs b/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs index d32066bf3..3fe53187c 100644 --- a/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs +++ b/tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs @@ -151,4 +151,20 @@ public void ConnectsWhenBeginConnectCompletesSynchronously() ClearAmbientFailures(); } } + + [Fact] + public void ConnectIncludesSubscriber() + { + using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false); + + var db = conn.GetDatabase(); + db.Ping(); + Assert.True(conn.IsConnected); + + foreach (var server in conn.GetServerSnapshot()) + { + Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.InteractiveConnectionState); + Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.SubscriptionConnectionState); + } + } } diff --git a/tests/StackExchange.Redis.Tests/Failover.cs b/tests/StackExchange.Redis.Tests/Failover.cs index b9b23cd21..16faff432 100644 --- a/tests/StackExchange.Redis.Tests/Failover.cs +++ b/tests/StackExchange.Redis.Tests/Failover.cs @@ -295,8 +295,9 @@ public async Task SubscriptionsSurvivePrimarySwitchAsync() Log("FAILURE: B has not detected the topology change."); foreach (var server in bConn.GetServerSnapshot().ToArray()) { - Log(" Server" + server.EndPoint); - Log(" State: " + server.ConnectionState); + Log(" Server: " + server.EndPoint); + Log(" State (Interactive): " + server.InteractiveConnectionState); + Log(" State (Subscription): " + server.SubscriptionConnectionState); Log(" IsReplica: " + !server.IsReplica); Log(" Type: " + server.ServerType); } diff --git a/tests/StackExchange.Redis.Tests/TestBase.cs b/tests/StackExchange.Redis.Tests/TestBase.cs index c547e5e5e..39ec34229 100644 --- a/tests/StackExchange.Redis.Tests/TestBase.cs +++ b/tests/StackExchange.Redis.Tests/TestBase.cs @@ -268,7 +268,7 @@ internal virtual IInternalConnectionMultiplexer Create( { if (Output == null) { - Assert.True(false, "Failure: Be sure to call the TestBase constuctor like this: BasicOpsTests(ITestOutputHelper output) : base(output) { }"); + Assert.True(false, "Failure: Be sure to call the TestBase constructor like this: BasicOpsTests(ITestOutputHelper output) : base(output) { }"); } // Share a connection if instructed to and we can - many specifics mean no sharing