Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Membership] When indirect probe fails, include intermediary's vote in suspecter list #9302

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,7 @@ private async Task OnProbeResultInternal(SiloHealthMonitor monitor, ProbeResult
}
else if (probeResult.Status == ProbeResultStatus.Failed)
{
if (this.clusterMembershipOptions.CurrentValue.NumVotesForDeathDeclaration <= 2)
{
// Since both this silo and another silo were unable to probe the target silo, we declare it dead.
await this.membershipService.TryKill(monitor.SiloAddress).ConfigureAwait(false);
}
else
{
await this.membershipService.TryToSuspectOrKill(monitor.SiloAddress).ConfigureAwait(false);
}
await this.membershipService.TryToSuspectOrKill(monitor.SiloAddress, probeResult.Intermediary).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public async Task<bool> TryKill(SiloAddress silo)
return await DeclareDead(entry, eTag, table.Version, GetDateTimeUtcNow());
}

public async Task<bool> TryToSuspectOrKill(SiloAddress silo)
public async Task<bool> TryToSuspectOrKill(SiloAddress silo, SiloAddress indirectProbingSilo = null)
{
var table = await membershipTableProvider.ReadAll();
var now = GetDateTimeUtcNow();
Expand Down Expand Up @@ -764,6 +764,13 @@ public async Task<bool> TryToSuspectOrKill(SiloAddress silo)
// Try to add our vote to the list and tally the fresh votes again.
var prevList = entry.SuspectTimes?.ToList() ?? new List<Tuple<SiloAddress, DateTime>>();
entry.AddOrUpdateSuspector(myAddress, now, clusterMembershipOptions.NumVotesForDeathDeclaration);

// Include the indirect probe silo's vote as well, if it exists.
if (indirectProbingSilo is not null)
{
entry.AddOrUpdateSuspector(indirectProbingSilo, now, clusterMembershipOptions.NumVotesForDeathDeclaration);
}

freshVotes = entry.GetFreshVotes(now, this.clusterMembershipOptions.DeathVoteExpirationTimeout);

// Determine if there are enough votes to evict the silo.
Expand Down
34 changes: 19 additions & 15 deletions src/Orleans.Runtime/MembershipService/SiloHealthMonitor.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System;
using System.Linq;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -28,7 +29,7 @@ internal class SiloHealthMonitor : ITestAccessor, IHealthCheckable, IDisposable,
private readonly IAsyncTimer _pingTimer;
private ValueStopwatch _elapsedSinceLastSuccessfulResponse;
private readonly Func<SiloHealthMonitor, ProbeResult, Task> _onProbeResult;
private Task _runTask;
private Task? _runTask;

/// <summary>
/// The id of the next probe.
Expand Down Expand Up @@ -152,8 +153,8 @@ public async ValueTask DisposeAsync()

private async Task Run()
{
ClusterMembershipSnapshot activeMembersSnapshot = default;
SiloAddress[] otherNodes = default;
ClusterMembershipSnapshot? activeMembersSnapshot = default;
SiloAddress[]? otherNodes = default;
TimeSpan? overrideDelay = RandomTimeSpan.Next(_clusterMembershipOptions.CurrentValue.ProbeTimeout);
while (await _pingTimer.NextTick(overrideDelay))
{
Expand All @@ -164,7 +165,7 @@ private async Task Run()
{
// Discover the other active nodes in the cluster, if there are any.
var membershipSnapshot = _membershipService.CurrentSnapshot;
if (otherNodes is null || !object.ReferenceEquals(activeMembersSnapshot, membershipSnapshot))
if (otherNodes is null || !ReferenceEquals(activeMembersSnapshot, membershipSnapshot))
{
activeMembersSnapshot = membershipSnapshot;
otherNodes = membershipSnapshot.Members.Values
Expand All @@ -175,7 +176,7 @@ private async Task Run()

var isDirectProbe = !_clusterMembershipOptions.CurrentValue.EnableIndirectProbes || _failedProbes < _clusterMembershipOptions.CurrentValue.NumMissedProbesLimit - 1 || otherNodes.Length == 0;
var timeout = GetTimeout(isDirectProbe);
var cancellation = new CancellationTokenSource(timeout);
using var cancellation = new CancellationTokenSource(timeout);

if (isDirectProbe)
{
Expand All @@ -196,7 +197,7 @@ private async Task Run()
// Note that all recused silos will be included in the consideration set the next time cluster membership changes.
if (probeResult.Status != ProbeResultStatus.Succeeded && probeResult.IntermediaryHealthDegradationScore > 0)
{
_log.LogInformation("Recusing unhealthy intermediary {Intermediary} and trying again with remaining nodes", intermediary);
_log.LogInformation("Recusing unhealthy intermediary '{Intermediary}' and trying again with remaining nodes", intermediary);
otherNodes = otherNodes.Where(node => !node.Equals(intermediary)).ToArray();
overrideDelay = TimeSpan.FromMilliseconds(250);
}
Expand Down Expand Up @@ -249,7 +250,7 @@ private async Task<ProbeResult> ProbeDirectly(CancellationToken cancellation)

var roundTripTimer = ValueStopwatch.StartNew();
ProbeResult probeResult;
Exception failureException;
Exception? failureException;
try
{
await _prober.Probe(SiloAddress, id, cancellation).WaitAsync(cancellation);
Expand Down Expand Up @@ -347,7 +348,7 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
MessagingInstruments.OnPingReplyReceived(SiloAddress);

_failedProbes = 0;
probeResult = ProbeResult.CreateIndirect(0, ProbeResultStatus.Succeeded, indirectResult);
probeResult = ProbeResult.CreateIndirect(0, ProbeResultStatus.Succeeded, indirectResult, intermediary);
}
else
{
Expand All @@ -360,7 +361,7 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
id,
SiloAddress,
indirectResult.IntermediaryHealthScore);
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, indirectResult);
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, indirectResult, intermediary);
}
else
{
Expand All @@ -375,15 +376,15 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
indirectResult.IntermediaryHealthScore);

var missed = ++_failedProbes;
probeResult = ProbeResult.CreateIndirect(missed, ProbeResultStatus.Failed, indirectResult);
probeResult = ProbeResult.CreateIndirect(missed, ProbeResultStatus.Failed, indirectResult, intermediary);
}
}
}
catch (Exception exception)
{
MessagingInstruments.OnPingReplyMissed(SiloAddress);
_log.LogWarning(exception, "Indirect probe request failed.");
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, default);
probeResult = ProbeResult.CreateIndirect(_failedProbes, ProbeResultStatus.Unknown, default, intermediary);
}

return probeResult;
Expand All @@ -398,19 +399,20 @@ private async Task<ProbeResult> ProbeIndirectly(SiloAddress intermediary, TimeSp
[StructLayout(LayoutKind.Auto)]
public readonly struct ProbeResult
{
private ProbeResult(int failedProbeCount, ProbeResultStatus status, bool isDirectProbe, int intermediaryHealthDegradationScore)
private ProbeResult(int failedProbeCount, ProbeResultStatus status, bool isDirectProbe, int intermediaryHealthDegradationScore, SiloAddress? intermediary)
{
FailedProbeCount = failedProbeCount;
Status = status;
IsDirectProbe = isDirectProbe;
IntermediaryHealthDegradationScore = intermediaryHealthDegradationScore;
Intermediary = intermediary;
}

public static ProbeResult CreateDirect(int failedProbeCount, ProbeResultStatus status)
=> new(failedProbeCount, status, isDirectProbe: true, 0);
=> new(failedProbeCount, status, isDirectProbe: true, 0, null);

public static ProbeResult CreateIndirect(int failedProbeCount, ProbeResultStatus status, IndirectProbeResponse indirectProbeResponse)
=> new(failedProbeCount, status, isDirectProbe: false, indirectProbeResponse.IntermediaryHealthScore);
public static ProbeResult CreateIndirect(int failedProbeCount, ProbeResultStatus status, IndirectProbeResponse indirectProbeResponse, SiloAddress? intermediary)
=> new(failedProbeCount, status, isDirectProbe: false, indirectProbeResponse.IntermediaryHealthScore, intermediary);

public int FailedProbeCount { get; }

Expand All @@ -419,6 +421,8 @@ public static ProbeResult CreateIndirect(int failedProbeCount, ProbeResultStatus
public bool IsDirectProbe { get; }

public int IntermediaryHealthDegradationScore { get; }

public SiloAddress? Intermediary { get; }
}

public enum ProbeResultStatus
Expand Down
Loading