Skip to content

Commit

Permalink
2251: Fixes missing activation of the sub leg during initial endpoint…
Browse files Browse the repository at this point in the history
… discovery (#2268)

Fix for #2251 and #2265 ensuring subscription connections are proactively created in all cases.

Co-authored-by: Nick Craver <nrcraver@gmail.com>
  • Loading branch information
iteplov and NickCraver authored Oct 15, 2022
1 parent bff0811 commit 85c1b2b
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 10 deletions.
3 changes: 2 additions & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## Unreleased

- Fix: `MOVED` with `NoRedirect` (and other non-reachable errors) should respect the `IncludeDetailInExceptions` setting
- Fix: `MOVED` with `NoRedirect` (and other non-reachable errors) should respect the `IncludeDetailInExceptions` setting ([#2267 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2267))
- Fix [#2251](https://github.com/StackExchange/StackExchange.Redis/issues/2251) & [#2265](https://github.com/StackExchange/StackExchange.Redis/issues/2265): Cluster endpoint connections weren't proactively connecting subscriptions in all cases and taking the full connection timeout to complete as a result ([#2268 by iteplov](https://github.com/StackExchange/StackExchange.Redis/pull/2268))


## 2.6.66
Expand Down
15 changes: 12 additions & 3 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,15 @@ internal EndPoint[] GetEndPoints()
}
}
// spin up the connection if this is new
if (isNew && activate) server.Activate(ConnectionType.Interactive, log);
if (isNew && activate)
{
server.Activate(ConnectionType.Interactive, log);
if (server.SupportsSubscriptions)
{
// Intentionally not logging the sub connection
server.Activate(ConnectionType.Subscription, null);
}
}
}
return server;
}
Expand Down Expand Up @@ -1300,9 +1308,10 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
// Log current state after await
foreach (var server in servers)
{
log?.WriteLine($" {Format.ToString(server.EndPoint)}: Endpoint is {server.ConnectionState}");
log?.WriteLine($" {Format.ToString(server.EndPoint)}: Endpoint is (Interactive: {server.InteractiveConnectionState}, Subscription: {server.SubscriptionConnectionState})");
}

log?.WriteLine("Task summary:");
EndPointCollection? updatedClusterEndpointCollection = null;
for (int i = 0; i < available.Length; i++)
{
Expand Down Expand Up @@ -1388,7 +1397,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
else
{
server.SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($" {Format.ToString(server)}: Did not respond");
log?.WriteLine($" {Format.ToString(server)}: Did not respond (Task.Status: {task.Status})");
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ internal Exception? LastException
}
}

internal State ConnectionState => interactive?.ConnectionState ?? State.Disconnected;
internal State InteractiveConnectionState => interactive?.ConnectionState ?? State.Disconnected;
internal State SubscriptionConnectionState => subscription?.ConnectionState ?? State.Disconnected;

public long OperationCount => interactive?.OperationCount ?? 0 + subscription?.OperationCount ?? 0;

Expand Down
16 changes: 16 additions & 0 deletions tests/StackExchange.Redis.Tests/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -728,4 +728,20 @@ public void MovedProfiling()
}
}
}

[Fact]
public void ConnectIncludesSubscriber()
{
using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false);

var db = conn.GetDatabase();
db.Ping();
Assert.True(conn.IsConnected);

foreach (var server in conn.GetServerSnapshot())
{
Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.InteractiveConnectionState);
Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.SubscriptionConnectionState);
}
}
}
10 changes: 8 additions & 2 deletions tests/StackExchange.Redis.Tests/CommandTimeouts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task DefaultHeartbeatTimeout()
using var conn = ConnectionMultiplexer.Connect(options);

var pauseServer = GetServer(pauseConn);
_ = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 2000);
var pauseTask = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 2500);

var key = Me();
var db = conn.GetDatabase();
Expand All @@ -30,6 +30,9 @@ public async Task DefaultHeartbeatTimeout()
Log(ex.Message);
var duration = sw.GetElapsedTime();
Assert.True(duration < TimeSpan.FromSeconds(2100), $"Duration ({duration.Milliseconds} ms) should be less than 2100ms");

// Await as to not bias the next test
await pauseTask;
}

[Fact]
Expand All @@ -44,7 +47,7 @@ public async Task DefaultHeartbeatLowTimeout()
using var conn = ConnectionMultiplexer.Connect(options);

var pauseServer = GetServer(pauseConn);
_ = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 2000);
var pauseTask = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 500);

var key = Me();
var db = conn.GetDatabase();
Expand All @@ -53,5 +56,8 @@ public async Task DefaultHeartbeatLowTimeout()
Log(ex.Message);
var duration = sw.GetElapsedTime();
Assert.True(duration < TimeSpan.FromSeconds(250), $"Duration ({duration.Milliseconds} ms) should be less than 250ms");

// Await as to not bias the next test
await pauseTask;
}
}
16 changes: 16 additions & 0 deletions tests/StackExchange.Redis.Tests/ConnectingFailDetection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,20 @@ public void ConnectsWhenBeginConnectCompletesSynchronously()
ClearAmbientFailures();
}
}

[Fact]
public void ConnectIncludesSubscriber()
{
using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false);

var db = conn.GetDatabase();
db.Ping();
Assert.True(conn.IsConnected);

foreach (var server in conn.GetServerSnapshot())
{
Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.InteractiveConnectionState);
Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.SubscriptionConnectionState);
}
}
}
5 changes: 3 additions & 2 deletions tests/StackExchange.Redis.Tests/Failover.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ public async Task SubscriptionsSurvivePrimarySwitchAsync()
Log("FAILURE: B has not detected the topology change.");
foreach (var server in bConn.GetServerSnapshot().ToArray())
{
Log(" Server" + server.EndPoint);
Log(" State: " + server.ConnectionState);
Log(" Server: " + server.EndPoint);
Log(" State (Interactive): " + server.InteractiveConnectionState);
Log(" State (Subscription): " + server.SubscriptionConnectionState);
Log(" IsReplica: " + !server.IsReplica);
Log(" Type: " + server.ServerType);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/StackExchange.Redis.Tests/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ internal virtual IInternalConnectionMultiplexer Create(
{
if (Output == null)
{
Assert.True(false, "Failure: Be sure to call the TestBase constuctor like this: BasicOpsTests(ITestOutputHelper output) : base(output) { }");
Assert.True(false, "Failure: Be sure to call the TestBase constructor like this: BasicOpsTests(ITestOutputHelper output) : base(output) { }");
}

// Share a connection if instructed to and we can - many specifics mean no sharing
Expand Down

0 comments on commit 85c1b2b

Please sign in to comment.