Skip to content

Commit

Permalink
DT.AzureStorage: Runtime de-dupe support for orchestration start even…
Browse files Browse the repository at this point in the history
…ts (#528)

Also fixes some ETW tracing bugs
  • Loading branch information
cgillum authored Mar 31, 2021
1 parent a53b13e commit d89ca58
Show file tree
Hide file tree
Showing 18 changed files with 886 additions and 74 deletions.
121 changes: 121 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/StressTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.AzureStorage.Tests
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;

[TestClass]
public class StressTests
{
/// <summary>
/// Starts a large'ish number of orchestrations concurrently and verifies correct behavior
/// both in the case where they all share the same instance ID and when they have unique
/// instance IDs.
/// </summary>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task ConcurrentOrchestrationStarts(bool useSameInstanceId)
{
// Set the minimum thread count to 64+ to make this test extra concurrent.
ThreadPool.GetMinThreads(out int minWorkerThreads, out int minIoThreads);
ThreadPool.SetMinThreads(Math.Max(64, minWorkerThreads), Math.Max(64, minIoThreads));
try
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(
enableExtendedSessions: false,
modifySettingsAction: settings => settings.ThrowExceptionOnInvalidDedupeStatus = false))
{
await host.StartAsync();

var results = new ConcurrentBag<string>();

// We want a number sufficiently high that it results in multiple message batches
const int MaxConcurrency = 40;

TaskActivity activity = TestOrchestrationHost.MakeActivity(
delegate(TaskContext ctx, string input)
{
string result = $"Hello, {input}!";
results.Add(result);
return result;
});

// Use the same instance name for all instances
Func<int, string> instanceIdGenerator;
Func<int, string> inputGenerator;
if (useSameInstanceId)
{
instanceIdGenerator = _ => $"ConcurrentInstance_SINGLETON";
inputGenerator = _ => "World";
}
else
{
instanceIdGenerator = i => $"ConcurrentInstance_{i:00}";
inputGenerator = i => $"{i:00}";
}

List<TestInstance<string>> instances = await host.StartInlineOrchestrations(
MaxConcurrency,
instanceIdGenerator,
inputGenerator,
orchestrationName: "SayHelloOrchestration",
version: string.Empty,
implementation: (ctx, input) => ctx.ScheduleTask<string>("SayHello", "", input),
activities: ("SayHello", activity));

Assert.AreEqual(MaxConcurrency, instances.Count);

// All returned objects point to the same orchestration instance
OrchestrationState[] finalStates = await Task.WhenAll(instances.Select(
i => i.WaitForCompletion(timeout: TimeSpan.FromMinutes(2), expectedOutputRegex: @"Hello, \w+!")));

if (useSameInstanceId)
{
// Make sure each instance is exactly the same
string firstInstanceJson = JsonConvert.SerializeObject(finalStates[0]);
foreach (OrchestrationState state in finalStates.Skip(1))
{
string json = JsonConvert.SerializeObject(state);
Assert.AreEqual(firstInstanceJson, json, "Expected that all instances have the same data.");
}
}
else
{
// Make sure each instance is different
Assert.AreEqual(MaxConcurrency, finalStates.Select(s => s.OrchestrationInstance.InstanceId).Distinct().Count());
Assert.AreEqual(MaxConcurrency, finalStates.Select(s => s.OrchestrationInstance.ExecutionId).Distinct().Count());
Assert.AreEqual(MaxConcurrency, finalStates.Select(s => s.Input).Distinct().Count());
Assert.AreEqual(MaxConcurrency, finalStates.Select(s => s.Output).Distinct().Count());
}

await host.StopAsync();
}
}
finally
{
// Reset the thread pool configuration
ThreadPool.SetMinThreads(minWorkerThreads, minIoThreads);
}
}
}
}
169 changes: 169 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/TestInstance.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.AzureStorage.Tests
{
using System;
using System.Diagnostics;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using DurableTask.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

class TestInstance<T>
{
readonly TaskHubClient client;
readonly OrchestrationInstance instance;
readonly DateTime startTime;
readonly T input;

public TestInstance(
TaskHubClient client,
OrchestrationInstance instance,
DateTime startTime,
T input)
{
this.client = client;
this.instance = instance;
this.startTime = startTime;
this.input = input;
}

public string InstanceId => this.instance?.InstanceId;

public string ExecutionId => this.instance?.ExecutionId;

OrchestrationInstance GetInstanceForAnyExecution() => new OrchestrationInstance
{
InstanceId = this.instance.InstanceId,
};

public async Task<OrchestrationState> WaitForStart(TimeSpan timeout = default)
{
AdjustTimeout(ref timeout);

Stopwatch sw = Stopwatch.StartNew();
do
{
OrchestrationState state = await this.GetStateAsync();
if (state != null && state.OrchestrationStatus != OrchestrationStatus.Pending)
{
return state;
}

await Task.Delay(TimeSpan.FromMilliseconds(500));

} while (sw.Elapsed < timeout);

throw new TimeoutException($"Orchestration with instance ID '{this.instance.InstanceId}' failed to start.");
}

public async Task<OrchestrationState> WaitForCompletion(
TimeSpan timeout = default,
OrchestrationStatus? expectedStatus = OrchestrationStatus.Completed,
object expectedOutput = null,
string expectedOutputRegex = null,
bool continuedAsNew = false)
{
AdjustTimeout(ref timeout);

OrchestrationState state = await this.client.WaitForOrchestrationAsync(this.GetInstanceForAnyExecution(), timeout);
Assert.IsNotNull(state);
if (expectedStatus != null)
{
Assert.AreEqual(expectedStatus, state.OrchestrationStatus);
}

if (!continuedAsNew)
{
if (this.input != null)
{
Assert.AreEqual(JToken.FromObject(this.input).ToString(), JToken.Parse(state.Input).ToString());
}
else
{
Assert.IsNull(state.Input);
}
}

// For created time, account for potential clock skew
Assert.IsTrue(state.CreatedTime >= this.startTime.AddMinutes(-5));
Assert.IsTrue(state.LastUpdatedTime > state.CreatedTime);
Assert.IsTrue(state.CompletedTime > state.CreatedTime);
Assert.IsNotNull(state.OrchestrationInstance);
Assert.AreEqual(this.instance.InstanceId, state.OrchestrationInstance.InstanceId);

// Make sure there is an ExecutionId, but don't require it to match any particular value
Assert.IsNotNull(state.OrchestrationInstance.ExecutionId);

if (expectedOutput != null)
{
Assert.IsNotNull(state.Output);
try
{
// DTFx usually encodes outputs as JSON values. The exception is error messages.
// If this is an error message, we'll throw here and try the logic in the catch block.
JToken.Parse(state.Output);
Assert.AreEqual(JToken.FromObject(expectedOutput).ToString(Formatting.None), state.Output);
}
catch (JsonReaderException)
{
Assert.AreEqual(expectedOutput, state?.Output);
}
}

if (expectedOutputRegex != null)
{
Assert.IsTrue(
Regex.IsMatch(state.Output, expectedOutputRegex),
$"The output '{state.Output}' doesn't match the regex pattern '{expectedOutputRegex}'.");
}

return state;
}

internal Task<OrchestrationState> GetStateAsync()
{
return this.client.GetOrchestrationStateAsync(this.instance);
}

internal Task RaiseEventAsync(string name, object value)
{
return this.client.RaiseEventAsync(this.instance, name, value);
}

internal Task TerminateAsync(string reason)
{
return this.client.TerminateInstanceAsync(this.instance, reason);
}

static void AdjustTimeout(ref TimeSpan timeout)
{
if (timeout == default)
{
timeout = TimeSpan.FromSeconds(10);
}

if (Debugger.IsAttached)
{
TimeSpan debuggingTimeout = TimeSpan.FromMinutes(5);
if (debuggingTimeout > timeout)
{
timeout = debuggingTimeout;
}
}
}
}
}
24 changes: 18 additions & 6 deletions src/DurableTask.AzureStorage/AnalyticsEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace DurableTask.AzureStorage
{
using System;
using System.Diagnostics.Tracing;
using System.Threading;
using System.Runtime.CompilerServices;
using DurableTask.AzureStorage.Logging;

/// <summary>
Expand All @@ -27,8 +27,6 @@ namespace DurableTask.AzureStorage
[EventSource(Name = "DurableTask-AzureStorage")]
class AnalyticsEventSource : EventSource
{
static readonly AsyncLocal<Guid> ActivityIdState = new AsyncLocal<Guid>();

/// <summary>
/// Singleton instance used for writing events.
/// </summary>
Expand All @@ -37,9 +35,23 @@ class AnalyticsEventSource : EventSource
[NonEvent]
public static void SetLogicalTraceActivityId(Guid activityId)
{
// We use AsyncLocal to preserve activity IDs across async/await boundaries.
ActivityIdState.Value = activityId;
SetCurrentThreadActivityId(activityId);
try
{
SetCoreTraceActivityId(activityId);
}
catch (MissingMethodException)
{
// Best effort. This method is only available starting in DurableTask.Core v2.5.3.
// We catch to maintain backwards compatibility with previous versions.
}
}

// NoInlining ensures we can get a predictable MissingMethodException if the build of DurableTask.Core
// we're using doesn't define the LoggingExtensions.SetLogicalTraceActivityId() method.
[MethodImpl(MethodImplOptions.NoInlining)]
static void SetCoreTraceActivityId(Guid activityId)
{
DurableTask.Core.Logging.LoggingExtensions.SetLogicalTraceActivityId(activityId);
}

[Event(EventIds.SendingMessage, Level = EventLevel.Informational, Opcode = EventOpcode.Send, Task = Tasks.Enqueue, Version = 6)]
Expand Down
Loading

0 comments on commit d89ca58

Please sign in to comment.