Skip to content

Commit

Permalink
Avoid overlapped per-endpoint heartbeats and heartbeat error reports;…
Browse files Browse the repository at this point in the history
… we don't want them backing up
  • Loading branch information
mgravell committed Feb 3, 2017
1 parent d881e28 commit 09f9d8a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
17 changes: 15 additions & 2 deletions StackExchange.Redis/StackExchange/Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,8 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
{
((ConnectionMultiplexer)state).OnHeartbeat();
};

private int _activeHeartbeatErrors;
private void OnHeartbeat()
{
try
Expand All @@ -982,9 +984,20 @@ private void OnHeartbeat()
var tmp = serverSnapshot;
for (int i = 0; i < tmp.Length; i++)
tmp[i].OnHeartbeat();
} catch(Exception ex)
}
catch (Exception ex)
{
OnInternalError(ex);
if (Interlocked.CompareExchange(ref _activeHeartbeatErrors, 1, 0) == 0)
{
try
{
OnInternalError(ex);
}
finally
{
Interlocked.Exchange(ref _activeHeartbeatErrors, 0);
}
}
}
}

Expand Down
26 changes: 19 additions & 7 deletions StackExchange.Redis/StackExchange/Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

namespace StackExchange.Redis
Expand Down Expand Up @@ -531,17 +532,28 @@ internal bool CheckInfoReplication()
}
private int lastInfoReplicationCheckTicks;

private int _heartBeatActive;
internal void OnHeartbeat()
{
try
{
interactive?.OnHeartbeat(false);
subscription?.OnHeartbeat(false);
} catch(Exception ex)
// don't overlap operations on an endpoint
if (Interlocked.CompareExchange(ref _heartBeatActive, 1, 0) == 0)
{
multiplexer.OnInternalError(ex, EndPoint);
}
try
{


interactive?.OnHeartbeat(false);
subscription?.OnHeartbeat(false);
}
catch (Exception ex)
{
multiplexer.OnInternalError(ex, EndPoint);
}
finally
{
Interlocked.Exchange(ref _heartBeatActive, 0);
}
}
}

internal Task<T> QueueDirectAsync<T>(Message message, ResultProcessor<T> processor, object asyncState = null, PhysicalBridge bridge = null)
Expand Down

0 comments on commit 09f9d8a

Please sign in to comment.