Skip to content

Commit

Permalink
Fix flaky `Should_ConvertAllRemoteCalls_ToLocalCalls_WhileRespectingT…
Browse files Browse the repository at this point in the history
…olerance` test (#9058)
  • Loading branch information
ReubenBond authored Jul 9, 2024
1 parent ca85ace commit 6ff7edc
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ static IActivationRepartitionerSystemTarget GetReference(IGrainFactory grainFact
/// For diagnostics only.
/// </summary>
ValueTask<ImmutableArray<(Edge, ulong)>> GetGrainCallFrequencies();

/// <summary>
/// For sue in testing only! Flushes buffered messages.
/// </summary>
ValueTask FlushBuffers();
}

// We use a readonly struct so that we can fully decouple the message-passing and potentially modifications to the Silo fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,19 @@ public void ValidateConfiguration()
ThrowMustBeGreaterThanZero(nameof(ActivationRepartitionerOptions.MaxUnprocessedEdges));
}

if (_options.MinRoundPeriod <= TimeSpan.Zero)
if (_options.MinRoundPeriod < TimeSpan.Zero)
{
ThrowMustBeGreaterThanZero(nameof(ActivationRepartitionerOptions.MinRoundPeriod));
ThrowMustBeGreaterThanOrEqualToZero(nameof(ActivationRepartitionerOptions.MinRoundPeriod));
}

if (_options.MaxRoundPeriod <= TimeSpan.Zero)
{
ThrowMustBeGreaterThanZero(nameof(ActivationRepartitionerOptions.MaxRoundPeriod));
}

if (_options.RecoveryPeriod <= TimeSpan.Zero)
if (_options.RecoveryPeriod < TimeSpan.Zero)
{
ThrowMustBeGreaterThanZero(nameof(ActivationRepartitionerOptions.RecoveryPeriod));
ThrowMustBeGreaterThanOrEqualToZero(nameof(ActivationRepartitionerOptions.RecoveryPeriod));
}

if (_options.MaxRoundPeriod < _options.MinRoundPeriod)
Expand All @@ -147,9 +147,12 @@ public void ValidateConfiguration()
}
}

private static void ThrowMustBeGreaterThanOrEqualToZero(string propertyName)
=> throw new OrleansConfigurationException($"{propertyName} must be greater than or equal to 0.");

private static void ThrowMustBeGreaterThanZero(string propertyName)
=> throw new OrleansConfigurationException($"{propertyName} must be greater than 0");
=> throw new OrleansConfigurationException($"{propertyName} must be greater than 0.");

private static void ThrowMustBeGreaterThanOrEqualTo(string name1, string name2)
=> throw new OrleansConfigurationException($"{name1} must be greater than or equal to {name2}");
=> throw new OrleansConfigurationException($"{name1} must be greater than or equal to {name2}.");
}
5 changes: 4 additions & 1 deletion src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,9 @@ public async Task<AddressAndTag> RegisterAsync(GrainAddress address, GrainAddres
DirectoryInstruments.RegistrationsSingleActLocal.Add(1);

var result = DirectoryPartition.AddSingleActivation(address, previousAddress);

// update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup.
DirectoryCache.AddOrUpdate(result.Address, result.VersionTag);
return result;
}
else
Expand All @@ -469,7 +472,7 @@ public async Task<AddressAndTag> RegisterAsync(GrainAddress address, GrainAddres
if (!address.Equals(result.Address) || !IsValidSilo(address.SiloAddress)) return result;

// update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup.
DirectoryCache.AddOrUpdate(address, result.VersionTag);
DirectoryCache.AddOrUpdate(result.Address, result.VersionTag);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ internal partial class ActivationRepartitioner
[LoggerMessage(Level = LogLevel.Debug, Message = "Finalized exchange protocol: migrating {GivingActivationCount} activations, receiving {TakingActivationCount} activations.")]
private partial void LogProtocolFinalized(int givingActivationCount, int takingActivationCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "Finalized exchange protocol: migrating [{GivingActivations}], receiving [{TakingActivations}].")]
private partial void LogProtocolFinalizedTrace(string givingActivations, string takingActivations);

[LoggerMessage(Level = LogLevel.Warning, Message = "Error performing exchange request from {ThisSilo} to {CandidateSilo}. I will try the next best candidate (if one is available), otherwise I will wait for my next period to come.")]
private partial void LogErrorOnProtocolExecution(Exception exception, SiloAddress thisSilo, SiloAddress candidateSilo);

Expand All @@ -49,4 +52,16 @@ internal partial class ActivationRepartitioner

[LoggerMessage(Level = LogLevel.Warning, Message = "Error accepting exchange request from {SendingSilo}.")]
private partial void LogErrorAcceptingExchangeRequest(Exception exception, SiloAddress sendingSilo);

[LoggerMessage(Level = LogLevel.Debug, Message = "Waiting an additional {CoolDown} to cool down before initiating the exchange protocol.")]
private partial void LogCoolingDown(TimeSpan coolDown);

[LoggerMessage(Level = LogLevel.Debug, Message = "Adding {NewlyAnchoredGrains} newly anchored grains to set on host {Silo}. EdgeWeights contains {EdgeWeightCount} elements.")]
private partial void LogAddingAnchoredGrains(int newlyAnchoredGrains, SiloAddress silo, int edgeWeightCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "Candidate sets computed in {Elapsed}.")]
private partial void LogComputedCandidateSets(TimeSpan elapsed);

[LoggerMessage(Level = LogLevel.Trace, Message = "Candidate heaps created in {Elapsed}.")]
private partial void LogComputedCandidateHeaps(TimeSpan elapsed);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#nullable enable
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -89,7 +90,7 @@ private async Task ProcessPendingEdges(CancellationToken cancellationToken)
// Ignore edges between two non-migratable grains.
continue;
}

Edge edge = new(sourceVertex, destinationVertex);
_edgeWeights.Add(edge);
}
Expand Down Expand Up @@ -132,4 +133,12 @@ public void RecordMessage(Message message)
_pendingMessageEvent.Signal();
}
}

async ValueTask IActivationRepartitionerSystemTarget.FlushBuffers()
{
while (_pendingMessages.Count > 0)
{
await Task.Delay(TimeSpan.FromMilliseconds(30));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public async ValueTask TriggerExchangeRequest()
var coolDown = _options.RecoveryPeriod - _lastExchangedStopwatch.Elapsed;
if (coolDown > TimeSpan.Zero)
{
_logger.LogDebug("Waiting an additional {CoolDown} to cool down before initiating the exchange protocol.", coolDown);
LogCoolingDown(coolDown);
await Task.Delay(coolDown, _timeProvider);
}

Expand Down Expand Up @@ -143,7 +143,7 @@ public async ValueTask TriggerExchangeRequest()
var migrationCandidates = GetMigrationCandidates();
var sets = CreateCandidateSets(migrationCandidates, silos);
var anchoredSet = ComputeAnchoredGrains(migrationCandidates);
_logger.LogInformation("Candidate sets computed in {Elapsed} ms.", sw.Elapsed.TotalMilliseconds);
LogComputedCandidateSets(sw.Elapsed);
foreach ((var candidateSilo, var offeredGrains, var _) in sets)
{
if (offeredGrains.Count == 0)
Expand Down Expand Up @@ -241,7 +241,7 @@ public async ValueTask<AcceptExchangeResponse> AcceptExchangeRequest(AcceptExcha

var stopwatch = ValueStopwatch.StartNew();
var (localHeap, remoteHeap) = CreateCandidateHeaps(localSet, remoteSet);
_logger.LogInformation("Candidate heaps created in {Elapsed} ms.", stopwatch.Elapsed.TotalMilliseconds);
LogComputedCandidateHeaps(stopwatch.Elapsed);
stopwatch.Restart();

var iterations = 0;
Expand Down Expand Up @@ -464,13 +464,17 @@ private async Task FinalizeProtocol(ImmutableArray<GrainId> giving, ImmutableArr
try
{
Dictionary<string, object> migrationRequestContext = new() { [IPlacementDirector.PlacementHintKey] = targetSilo };
var deactivationTasks = new List<Task>();
foreach (var grainId in giving)
{
if (_activationDirectory.FindTarget(grainId) is { } localActivation)
{
localActivation.Migrate(migrationRequestContext);
deactivationTasks.Add(localActivation.Deactivated);
}
}

await Task.WhenAll(deactivationTasks);
}
catch (Exception exception)
{
Expand All @@ -484,9 +488,9 @@ private async Task FinalizeProtocol(ImmutableArray<GrainId> giving, ImmutableArr
var toRemove = new List<Edge>();
var affected = new HashSet<GrainId>(giving.Length + accepting.Length);

_logger.LogInformation("Adding {NewlyAnchoredGrains} newly anchored grains to set on host {Silo}. EdgeWeights contains {EdgeWeightCount} elements.", newlyAnchoredGrains.Count, Silo, _edgeWeights.Count);
if (_anchoredFilter is { } filter)
{
LogAddingAnchoredGrains(newlyAnchoredGrains.Count, Silo, _edgeWeights.Count);
foreach (var id in newlyAnchoredGrains)
{
filter.Add(id);
Expand Down Expand Up @@ -531,7 +535,14 @@ private async Task FinalizeProtocol(ImmutableArray<GrainId> giving, ImmutableArr

// Stamp this silos exchange for a potential next pair exchange request.
_lastExchangedStopwatch.Restart();
LogProtocolFinalized(giving.Length, accepting.Length);
if (_logger.IsEnabled(LogLevel.Trace))
{
LogProtocolFinalizedTrace(string.Join(", ", giving), string.Join(", ", accepting));
}
else if (_logger.IsEnabled(LogLevel.Debug))
{
LogProtocolFinalized(giving.Length, accepting.Length);
}
}

private List<(SiloAddress Silo, List<CandidateVertex> Candidates, long TransferScore)> CreateCandidateSets(List<IGrouping<GrainId, VertexEdge>> migrationCandidates, ImmutableArray<SiloAddress> silos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Orleans.Runtime.Placement.Repartitioning;

internal sealed class FrequentEdgeCounter(int capacity) : FrequentItemCollection<ulong, Edge>(capacity)
{
protected override ulong GetKey(in Edge element) => (ulong)element.Source.Id.GetUniformHashCode() << 32 | element.Target.Id.GetUniformHashCode();
protected override ulong GetKey(in Edge element) => ((ulong)element.Source.Id.GetUniformHashCode()) << 32 | element.Target.Id.GetUniformHashCode();
public void Clear() => ClearCore();
public void Remove(in Edge element) => RemoveCore(GetKey(element));
}
Expand Down
Loading

0 comments on commit 6ff7edc

Please sign in to comment.