Skip to content

Commit

Permalink
Merge pull request #1176 from ably/fix/1156-incremental-backoff-jitter
Browse files Browse the repository at this point in the history
Add incremental backoff and jitter
  • Loading branch information
sacOO7 authored Jun 28, 2023
2 parents c7e014d + c75b0cd commit 42f9cd2
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/IO.Ably.Shared/IO.Ably.Shared.projitems
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
<Compile Include="$(MSBuildThisFileDirectory)Utils\ActionUtils.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Utils\ConnectionChangeAwaiter.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Utils\ErrorPolicy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Utils\ReconnectionStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Utils\StringUtils.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Utils\TaskUtils.cs" />
</ItemGroup>
Expand Down
20 changes: 16 additions & 4 deletions src/IO.Ably.Shared/Realtime/RealtimeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using IO.Ably.MessageEncoders;
using IO.Ably.Push;
using IO.Ably.Rest;
using IO.Ably.Shared.Utils;
using IO.Ably.Transport;
using IO.Ably.Types;
using IO.Ably.Utils;
Expand All @@ -25,6 +26,7 @@ internal class RealtimeChannel : EventEmitter<ChannelEvent, ChannelStateChange>,
private ChannelOptions _options;
private ChannelState _state;
private readonly PushChannel _pushChannel;
private int _retryCount = 0;

/// <summary>
/// True when the channel moves to the @ATTACHED@ state, and False
Expand Down Expand Up @@ -658,7 +660,7 @@ private void HandleStateChange(ChannelState state, ErrorInfo error, ProtocolMess
Logger.Debug($"HandleStateChange state change from {State} to {state}");
}

var oldState = State;
var previousState = State;
State = state;

switch (state)
Expand All @@ -671,12 +673,13 @@ private void HandleStateChange(ChannelState state, ErrorInfo error, ProtocolMess
AttachResume = false;
break;
case ChannelState.Attached:
_retryCount = 0;
AttachResume = true;
Presence.ChannelAttached(protocolMessage);
break;
case ChannelState.Detached:
/* RTL13a check for unexpected detach */
switch (oldState)
switch (previousState)
{
/* (RTL13a) If the channel is in the @ATTACHED@ or @SUSPENDED@ states,
an attempt to reattach the channel should be made immediately */
Expand All @@ -688,6 +691,8 @@ an attempt to reattach the channel should be made immediately */
break;

case ChannelState.Attaching:
// Since attachtimeout will transition state to suspended, no need to suspend it twice
AttachedAwaiter.Fail(new ErrorInfo("Channel transitioned to suspended", ErrorCodes.InternalError));
/* RTL13b says we need to become suspended, but continue to retry */
Logger.Debug($"Server initiated detach for channel {Name} whilst attaching; moving to suspended");
SetChannelState(ChannelState.Suspended, error, protocolMessage);
Expand All @@ -709,6 +714,7 @@ an attempt to reattach the channel should be made immediately */

break;
case ChannelState.Failed:
_retryCount = 0;
AttachResume = false;
AttachedAwaiter.Fail(error);
DetachedAwaiter.Fail(error);
Expand Down Expand Up @@ -743,15 +749,21 @@ private void Reattach(ErrorInfo error, ProtocolMessage msg)
}

/// <summary>
/// should only be called when the channel is SUSPENDED.
/// should only be called when the channel gets into SUSPENDED.
/// RTL13b.
/// </summary>
private void ReattachAfterTimeout(ErrorInfo error, ProtocolMessage msg)
{
_retryCount++;

var retryTimeout = TimeSpan.FromMilliseconds(ReconnectionStrategy.
GetRetryTime(RealtimeClient.Options.ChannelRetryTimeout.TotalMilliseconds, _retryCount));

// We capture the task but ignore it to make sure an error doesn't take down
// the thread
_ = Task.Run(async () =>
{
await Task.Delay(RealtimeClient.Options.ChannelRetryTimeout);
await Task.Delay(retryTimeout);
// only retry if the connection is connected (RTL13c)
if (Connection.State == ConnectionState.Connected)
Expand Down
5 changes: 3 additions & 2 deletions src/IO.Ably.Shared/Realtime/Workflows/RealtimeState.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using IO.Ably.Shared.Utils;
using IO.Ably.Transport;
using IO.Ably.Transport.States.Connection;
using IO.Ably.Types;
Expand Down Expand Up @@ -136,8 +137,8 @@ public long IncrementSerial()

public readonly List<MessageAndCallback> WaitingForAck = new List<MessageAndCallback>();

public void AddAckMessage(ProtocolMessage message, Action<bool, ErrorInfo> callback)
=> WaitingForAck.Add(new MessageAndCallback(message, callback));
public void AddAckMessage(ProtocolMessage message, Action<bool, ErrorInfo> callback) =>
WaitingForAck.Add(new MessageAndCallback(message, callback));

public RealtimeState()
: this(null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using IO.Ably.Realtime;
using System;
using IO.Ably.Realtime;
using IO.Ably.Realtime.Workflow;
using IO.Ably.Shared.Utils;

namespace IO.Ably.Transport.States.Connection
{
Expand Down Expand Up @@ -44,11 +46,16 @@ public override void AbortTimer()
_timer.Abort();
}

// RTN14d
public override void StartTimer()
{
var retryInterval = Context.RetryTimeout.TotalMilliseconds;
var noOfAttempts = Context.Connection.RealtimeClient?.State?.AttemptsInfo?.NumberOfAttempts ?? 0 + 1; // First attempt should start with 1 instead of 0.
var retryIn = TimeSpan.FromMilliseconds(ReconnectionStrategy.GetRetryTime(retryInterval, noOfAttempts));

if (RetryInstantly == false)
{
_timer.Start(Context.RetryTimeout, OnTimeOut);
_timer.Start(retryIn, OnTimeOut);
}
}

Expand Down
30 changes: 30 additions & 0 deletions src/IO.Ably.Shared/Utils/ReconnectionStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;

namespace IO.Ably.Shared.Utils
{
// RTB1
internal class ReconnectionStrategy
{
private static readonly Random Random = new Random();

public static double GetBackoffCoefficient(int count)
{
return Math.Min((count + 2) / 3d, 2d);
}

public static double GetJitterCoefficient()
{
return 1 - (Random.NextDouble() * 0.2);
}

// Generates retryTimeout value for given timeout and retryAttempt.
// If x is the value generated then
// Upper bound = min((retryAttempt + 2) / 3, 2) * initialTimeout
// Lower bound = 0.8 * Upper bound
// Lower bound < x < Upper bound
public static double GetRetryTime(double initialTimeout, int retryAttempt)
{
return initialTimeout * GetBackoffCoefficient(retryAttempt) * GetJitterCoefficient();
}
}
}
1 change: 1 addition & 0 deletions src/IO.Ably.Tests.Shared/IO.Ably.Tests.Shared.projitems
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,6 @@
<Compile Include="$(MSBuildThisFileDirectory)Types\OperatingSystemTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Types\SemanticVersionTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Utils\ErrorPolicyTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Utils\ReconnectionStrategyTest.cs" />
</ItemGroup>
</Project>
116 changes: 116 additions & 0 deletions src/IO.Ably.Tests.Shared/Realtime/ChannelSandboxSpecs.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
Expand Down Expand Up @@ -1122,6 +1123,121 @@ await WaitFor(done =>
client.Close();
}

[Theory]
[ProtocolData]
[Trait("spec", "RTL13b")]
public async Task WhenChannelSuspended_ShouldRetryUsingIncrementalBackoffAfterRetryWhenFirstDetachReceived(Protocol protocol)
{
// reduce timeouts to speed up test
var client = await GetRealtimeClient(protocol, (options, settings) =>
{
options.RealtimeRequestTimeout = TimeSpan.FromSeconds(2);
options.ChannelRetryTimeout = TimeSpan.FromSeconds(5);
});

await client.WaitForState();

var channelName = "RTL13a".AddRandomSuffix();
var channel = client.Channels.Get(channelName);

// block attach messages being sent, keeping channel in attaching state
client.GetTestTransport().BlockSendActions.Add(ProtocolMessage.MessageAction.Attach);

channel.Attach();

// Send first detach message, it will keep retrying until attached state is reached.
var detachedMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Detached, channelName)
{
Error = new ErrorInfo("fake error")
};
client.GetTestTransport().FakeReceivedMessage(detachedMessage);

var stopWatch = new Stopwatch();
var channelRetryTimeouts = new List<double>();
do
{
await channel.WaitForState(ChannelState.Suspended);
stopWatch.Start();

await channel.WaitForState(ChannelState.Attaching, TimeSpan.FromSeconds(10));
channelRetryTimeouts.Add(stopWatch.Elapsed.TotalSeconds);
stopWatch.Reset();
}
while (channelRetryTimeouts.Count < 8);

Output.WriteLine(channelRetryTimeouts.ToJson());

// Upper bound = min((retryAttempt + 2) / 3, 2) * initialTimeout
// Lower bound = 0.8 * Upper bound
// Adding 20ms delay to accomodate start and stop
channelRetryTimeouts[0].Should().BeInRange(4, 5 + 0.20);
channelRetryTimeouts[1].Should().BeInRange(5.33, 6.66 + 0.20);
channelRetryTimeouts[2].Should().BeInRange(6.66, 8.33 + 0.20);
for (var i = 3; i < channelRetryTimeouts.Count; i++)
{
channelRetryTimeouts[i].Should().BeInRange(8, 10 + 0.20);
}

client.Close();
}

[Theory]
[ProtocolData]
[Trait("spec", "RTL13b")]
public async Task WhenChannelSuspended_ShouldRetryUsingIncrementalBackoffForConsistentDetachReceived(Protocol protocol)
{
// reduce timeouts to speed up test
var client = await GetRealtimeClient(protocol, (options, settings) =>
{
options.ChannelRetryTimeout = TimeSpan.FromSeconds(5);
});

await client.WaitForState();

var channelName = "RTL13a".AddRandomSuffix();
var channel = client.Channels.Get(channelName);

// block attach messages being sent, keeping channel in attaching state
client.GetTestTransport().BlockSendActions.Add(ProtocolMessage.MessageAction.Attach);

channel.Attach();

var detachedMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Detached, channelName)
{
Error = new ErrorInfo("fake error")
};

var stopWatch = new Stopwatch();
var channelRetryTimeouts = new List<double>();
do
{
client.GetTestTransport().FakeReceivedMessage(detachedMessage);

await channel.WaitForState(ChannelState.Suspended);
stopWatch.Start();

await channel.WaitForState(ChannelState.Attaching, TimeSpan.FromSeconds(10));
channelRetryTimeouts.Add(stopWatch.Elapsed.TotalSeconds);
stopWatch.Reset();
}
while (channelRetryTimeouts.Count < 8);

Output.WriteLine(channelRetryTimeouts.ToJson());

// Upper bound = min((retryAttempt + 2) / 3, 2) * initialTimeout
// Lower bound = 0.8 * Upper bound
// Adding 20ms delay to accomodate start and stop
channelRetryTimeouts[0].Should().BeInRange(4, 5 + 0.20);
channelRetryTimeouts[1].Should().BeInRange(5.33, 6.66 + 0.20);
channelRetryTimeouts[2].Should().BeInRange(6.66, 8.33 + 0.20);
for (var i = 3; i < channelRetryTimeouts.Count; i++)
{
channelRetryTimeouts[i].Should().BeInRange(8, 10 + 0.20);
}

client.Close();
}

[Theory]
[ProtocolData]
[Trait("spec", "RTL13b")]
Expand Down
8 changes: 6 additions & 2 deletions src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using IO.Ably.Realtime;
using IO.Ably.Realtime.Workflow;
using IO.Ably.Tests.Infrastructure;
using IO.Ably.Tests.Shared.Utils;
using IO.Ably.Transport;
using IO.Ably.Transport.States.Connection;
using IO.Ably.Types;
Expand Down Expand Up @@ -993,8 +994,11 @@ await WaitFor(60000, done =>
client.Workflow.QueueCommand(SetDisconnectedStateCommand.Create(ErrorInfo.ReasonDisconnected));
});

var interval = reconnectedAt - disconnectedAt;
interval.TotalMilliseconds.Should().BeGreaterThan(5000 - 10 /* Allow 10 milliseconds */);
var reconnectedInTime = reconnectedAt - disconnectedAt;

var (lowerBound, _) = ReconnectionStrategyTest.Bounds(1, 5000);
reconnectedInTime.TotalMilliseconds.Should().BeGreaterThan(lowerBound);

initialConnectionId.Should().NotBeNullOrEmpty();
initialConnectionId.Should().NotBe(newConnectionId);
connectionStateTtl.Should().Be(TimeSpan.FromSeconds(1));
Expand Down
Loading

0 comments on commit 42f9cd2

Please sign in to comment.