From 891d3eec976e779b094d7ac333ccea811def2bb8 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Tue, 31 May 2022 10:33:06 -0700 Subject: [PATCH] Process events for non-running orchestrations (#97) --- CHANGELOG.md | 4 +- azure-pipelines.yml | 2 +- src/DurableTask.SqlServer/LogHelper.cs | 16 ++ .../Logging/DefaultEventSource.cs | 34 ++- src/DurableTask.SqlServer/Logging/EventIds.cs | 2 + .../Logging/LogEvents.cs | 73 ++++++ .../Scripts/drop-schema.sql | 1 + src/DurableTask.SqlServer/Scripts/logic.sql | 33 ++- .../Scripts/permissions.sql | 2 + .../SqlOrchestrationService.cs | 76 +++++- src/DurableTask.SqlServer/SqlUtils.cs | 12 +- src/common.props | 7 +- .../Integration/DataRetentionTests.cs | 243 ++++++++++++++++++ .../Integration/DatabaseManagement.cs | 1 + .../Integration/PurgeTests.cs | 120 --------- .../Logging/LogAssert.cs | 27 +- .../Logging/LogAssertExtensions.cs | 10 + .../Utils/SharedTestHelpers.cs | 12 +- .../Utils/TestInstance.cs | 13 +- .../Utils/TestService.cs | 9 +- 20 files changed, 551 insertions(+), 146 deletions(-) create mode 100644 test/DurableTask.SqlServer.Tests/Integration/DataRetentionTests.cs delete mode 100644 test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 2beba26..735e337 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## v1.0.0 +## v1.0.0-rc2 ### New @@ -12,7 +12,9 @@ * Removed unnecessary .NET Standard 2.1 target ([#82](https://github.com/microsoft/durabletask-mssql/pull/82)) * Fixed problem terminating orchestration with running activity ([#83](https://github.com/microsoft/durabletask-mssql/pull/83)) * Fixed payload data leak for completed activities (same PR as above) +* Fixed NewEvents leak for completed or continued-as-new instances ([#97](https://github.com/microsoft/durabletask-mssql/pull/97)) * Activity payload IDs are now consistently saved to the history table ([#90](https://github.com/microsoft/durabletask-mssql/issues/90)) +* Remove Microsoft.SqlServer.SqlManagementObjects dependency ([#92](https://github.com/microsoft/durabletask-mssql/pull/92)) - contributed by [@IGx89](https://github.com/IGx89) ### Breaking changes diff --git a/azure-pipelines.yml b/azure-pipelines.yml index cacca50..9830225 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -38,7 +38,7 @@ steps: vsVersion: 'latest' logFileVerbosity: minimal configuration: Release - msbuildArgs: /p:GITHUB_RUN_NUMBER=$(Build.BuildId) /p:ContinuousIntegrationBuild=true + msbuildArgs: /p:FileVersionRevision=$(Build.BuildId) /p:ContinuousIntegrationBuild=true # Authenticode sign all the DLLs with the Microsoft certificate. # This appears to be an in-place signing job, which is convenient. diff --git a/src/DurableTask.SqlServer/LogHelper.cs b/src/DurableTask.SqlServer/LogHelper.cs index a7048d3..7f9362a 100644 --- a/src/DurableTask.SqlServer/LogHelper.cs +++ b/src/DurableTask.SqlServer/LogHelper.cs @@ -129,6 +129,22 @@ public void CreatedDatabase(string databaseName) this.WriteLog(logEvent); } + public void DiscardingEvent(string instanceId, string eventType, int taskEventId, string details) + { + var logEvent = new LogEvents.DiscardingEventEvent( + instanceId, + eventType, + taskEventId, + details); + this.WriteLog(logEvent); + } + + public void GenericInfoEvent(string details, string? instanceId) + { + var logEvent = new LogEvents.GenericInfo(details, instanceId); + this.WriteLog(logEvent); + } + void WriteLog(ILogEvent logEvent) { // LogDurableEvent is an extension method defined in DurableTask.Core diff --git a/src/DurableTask.SqlServer/Logging/DefaultEventSource.cs b/src/DurableTask.SqlServer/Logging/DefaultEventSource.cs index 3c62781..7f9994f 100644 --- a/src/DurableTask.SqlServer/Logging/DefaultEventSource.cs +++ b/src/DurableTask.SqlServer/Logging/DefaultEventSource.cs @@ -234,7 +234,7 @@ internal void PurgedInstances( } [Event(EventIds.CommandCompleted, Level = EventLevel.Verbose)] - public void CommandCompleted( + internal void CommandCompleted( string? InstanceId, string CommandText, long LatencyMs, @@ -266,5 +266,37 @@ internal void CreatedDatabase( AppName, ExtensionVersion); } + + [Event(EventIds.DiscardingEvent, Level = EventLevel.Warning, Version = 1)] + internal void DiscardingEvent( + string InstanceId, + string EventType, + int TaskEventId, + string Details, + string AppName, + string ExtensionVersion) + { + // TODO: Use WriteEventCore for better performance + this.WriteEvent( + EventIds.DiscardingEvent, + InstanceId, + EventType, + TaskEventId, + Details, + AppName, + ExtensionVersion); + } + + [Event(EventIds.GenericInfo, Level = EventLevel.Informational, Version = 1)] + internal void GenericInfo(string Details, string InstanceId, string AppName, string ExtensionVersion) + { + // TODO: Use WriteEventCore for better performance + this.WriteEvent( + EventIds.GenericInfo, + InstanceId, + Details, + AppName, + ExtensionVersion); + } } } diff --git a/src/DurableTask.SqlServer/Logging/EventIds.cs b/src/DurableTask.SqlServer/Logging/EventIds.cs index 6497662..02153db 100644 --- a/src/DurableTask.SqlServer/Logging/EventIds.cs +++ b/src/DurableTask.SqlServer/Logging/EventIds.cs @@ -22,5 +22,7 @@ static class EventIds public const int PurgedInstances = 310; public const int CommandCompleted = 311; public const int CreatedDatabase = 312; + public const int DiscardingEvent = 313; + public const int GenericInfo = 314; } } diff --git a/src/DurableTask.SqlServer/Logging/LogEvents.cs b/src/DurableTask.SqlServer/Logging/LogEvents.cs index 3626475..17f6e7c 100644 --- a/src/DurableTask.SqlServer/Logging/LogEvents.cs +++ b/src/DurableTask.SqlServer/Logging/LogEvents.cs @@ -481,5 +481,78 @@ void IEventSourceEvent.WriteEventSource() => DTUtils.AppName, DTUtils.ExtensionVersionString); } + + internal class DiscardingEventEvent : StructuredLogEvent, IEventSourceEvent + { + public DiscardingEventEvent(string instanceId, string eventType, int taskEventId, string details) + { + this.InstanceId = instanceId; + this.EventType = eventType; + this.TaskEventId = taskEventId; + this.Details = details; + } + + [StructuredLogField] + public string InstanceId { get; } + + [StructuredLogField] + public string EventType { get; } + + [StructuredLogField] + public int TaskEventId { get; } + + [StructuredLogField] + public string Details { get; } + + public override EventId EventId => new EventId( + EventIds.DiscardingEvent, + nameof(EventIds.DiscardingEvent)); + + public override LogLevel Level => LogLevel.Warning; + + protected override string CreateLogMessage() => + $"{this.InstanceId}: Discarding {GetEventDescription(this.EventType, this.TaskEventId)}: {this.Details}"; + + void IEventSourceEvent.WriteEventSource() => + DefaultEventSource.Log.DiscardingEvent( + this.InstanceId, + this.EventType, + this.TaskEventId, + this.Details, + DTUtils.AppName, + DTUtils.ExtensionVersionString); + } + + internal class GenericInfo : StructuredLogEvent, IEventSourceEvent + { + public GenericInfo(string details, string? instanceId) + { + this.Details = details; + this.InstanceId = instanceId; + } + + [StructuredLogField] + public string Details { get; } + + [StructuredLogField] + public string? InstanceId { get; } + + public override EventId EventId => new EventId( + EventIds.GenericInfo, + nameof(EventIds.GenericInfo)); + + public override LogLevel Level => LogLevel.Information; + + protected override string CreateLogMessage() => string.IsNullOrEmpty(this.InstanceId) ? + this.Details : + $"{this.InstanceId}: {this.Details}"; + + void IEventSourceEvent.WriteEventSource() => + DefaultEventSource.Log.GenericInfo( + this.Details, + this.InstanceId ?? string.Empty, + DTUtils.AppName, + DTUtils.ExtensionVersionString); + } } } diff --git a/src/DurableTask.SqlServer/Scripts/drop-schema.sql b/src/DurableTask.SqlServer/Scripts/drop-schema.sql index 5d50224..3439191 100644 --- a/src/DurableTask.SqlServer/Scripts/drop-schema.sql +++ b/src/DurableTask.SqlServer/Scripts/drop-schema.sql @@ -24,6 +24,7 @@ DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByTime DROP PROCEDURE IF EXISTS dt._AddOrchestrationEvents DROP PROCEDURE IF EXISTS dt._CheckpointOrchestration DROP PROCEDURE IF EXISTS dt._CompleteTasks +DROP PROCEDURE IF EXISTS dt._DiscardEventsAndUnlockInstance DROP PROCEDURE IF EXISTS dt._GetVersions DROP PROCEDURE IF EXISTS dt._LockNextOrchestration DROP PROCEDURE IF EXISTS dt._LockNextTask diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 21a18ce..975d41e 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -514,6 +514,7 @@ BEGIN DECLARE @instanceID varchar(100) DECLARE @parentInstanceID varchar(100) DECLARE @version varchar(100) + DECLARE @runtimeStatus varchar(30) DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub() BEGIN TRANSACTION @@ -532,6 +533,7 @@ BEGIN [LockExpiration] = @LockExpiration, @instanceID = I.[InstanceID], @parentInstanceID = I.[ParentInstanceID], + @runtimeStatus = I.[RuntimeStatus], @version = I.[Version] FROM dt.Instances I WITH (READPAST) INNER JOIN NewEvents E WITH (READPAST) ON @@ -539,7 +541,7 @@ BEGIN E.[InstanceID] = I.[InstanceID] WHERE I.TaskHub = @TaskHub AND - I.[RuntimeStatus] IN ('Pending', 'Running') AND + I.[RuntimeStatus] NOT IN ('Suspended') AND (I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now) @@ -580,7 +582,10 @@ BEGIN RETURN END - -- Result #2: The full event history for the locked instance + -- Result #2: Basic information about this instance, including its runtime status + SELECT @instanceID AS [InstanceID], @runtimeStatus AS [RuntimeStatus] + + -- Result #3: The full event history for the locked instance -- NOTE: This must be kept consistent with the dt.HistoryEvents custom data type SELECT H.[InstanceID], @@ -901,6 +906,30 @@ END GO +CREATE OR ALTER PROCEDURE dt._DiscardEventsAndUnlockInstance + @InstanceID varchar(100), + @DeletedEvents MessageIDs READONLY +AS +BEGIN + DECLARE @taskHub varchar(50) = dt.CurrentTaskHub() + + -- We return the list of deleted messages so that the caller can issue a + -- warning about missing messages + DELETE E + OUTPUT DELETED.InstanceID, DELETED.SequenceNumber + FROM dt.NewEvents E WITH (FORCESEEK(PK_NewEvents(TaskHub, InstanceID, SequenceNumber))) + INNER JOIN @DeletedEvents D ON + D.InstanceID = E.InstanceID AND + D.SequenceNumber = E.SequenceNumber AND + E.TaskHub = @taskHub + + -- Release the lock on this instance + UPDATE Instances SET [LastUpdatedTime] = SYSUTCDATETIME(), [LockExpiration] = NULL + WHERE [TaskHub] = @taskHub and [InstanceID] = @InstanceID +END +GO + + CREATE OR ALTER PROCEDURE dt._AddOrchestrationEvents @NewOrchestrationEvents OrchestrationEvents READONLY AS diff --git a/src/DurableTask.SqlServer/Scripts/permissions.sql b/src/DurableTask.SqlServer/Scripts/permissions.sql index 4d8252c..6e9b4bf 100644 --- a/src/DurableTask.SqlServer/Scripts/permissions.sql +++ b/src/DurableTask.SqlServer/Scripts/permissions.sql @@ -16,6 +16,7 @@ END -- Functions GRANT EXECUTE ON OBJECT::dt.GetScaleMetric TO dt_runtime GRANT EXECUTE ON OBJECT::dt.GetScaleRecommendation TO dt_runtime +GRANT EXECUTE ON OBJECT::dt.CurrentTaskHub TO dt_runtime -- Public sprocs GRANT EXECUTE ON OBJECT::dt.CreateInstance TO dt_runtime @@ -30,6 +31,7 @@ GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByTime TO dt_runtime GRANT EXECUTE ON OBJECT::dt._AddOrchestrationEvents TO dt_runtime GRANT EXECUTE ON OBJECT::dt._CheckpointOrchestration TO dt_runtime GRANT EXECUTE ON OBJECT::dt._CompleteTasks TO dt_runtime +GRANT EXECUTE ON OBJECT::dt._DiscardEventsAndUnlockInstance TO dt_runtime GRANT EXECUTE ON OBJECT::dt._GetVersions TO dt_runtime GRANT EXECUTE ON OBJECT::dt._LockNextOrchestration TO dt_runtime GRANT EXECUTE ON OBJECT::dt._LockNextTask TO dt_runtime diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 66f98dd..1553203 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -113,6 +113,7 @@ public override Task DeleteAsync(bool deleteInstanceStore) TimeSpan receiveTimeout, CancellationToken cancellationToken) { + bool isWaiting = false; Stopwatch stopwatch = Stopwatch.StartNew(); do { @@ -166,14 +167,87 @@ public override Task DeleteAsync(bool deleteInstanceStore) if (messages.Count == 0) { + if (!isWaiting) + { + this.traceHelper.GenericInfoEvent( + "No events were found. Waiting for new events to appear.", + instanceId: null); + isWaiting = true; + } + // TODO: Make this dynamic based on the number of readers await this.orchestrationBackoffHelper.WaitAsync(cancellationToken); continue; } this.orchestrationBackoffHelper.Reset(); + isWaiting = false; + + // Result #2: The runtime status of the orchestration instance + if (await reader.NextResultAsync(cancellationToken)) + { + bool instanceExists = await reader.ReadAsync(cancellationToken); + string instanceId; + OrchestrationStatus? currentStatus; + + bool isRunning = false; + if (instanceExists) + { + instanceId = SqlUtils.GetInstanceId(reader); + currentStatus = SqlUtils.GetRuntimeStatus(reader); + isRunning = + currentStatus == OrchestrationStatus.Running || + currentStatus == OrchestrationStatus.Pending; + } + else + { + instanceId = messages.Select(msg => msg.OrchestrationInstance.InstanceId).First(); + currentStatus = null; + } + + // If the instance is in a terminal state, log and discard the new events. + // NOTE: In the future, we may want to allow processing of some events if, for example, they may + // change the state of a completed instance. For example, a rewind command. + if (!isRunning) + { + string warningMessage = instanceExists ? + $"Target is in the {currentStatus} state." : + $"Target doesn't exist (either never existed or continued-as-new)."; + + messages.ForEach(msg => this.traceHelper.DiscardingEvent( + msg.OrchestrationInstance.InstanceId, + msg.Event.EventType.ToString(), + DTUtils.GetTaskEventId(msg.Event), + warningMessage)); + + // Close the already opened reader so that we can execute a new command + reader.Close(); + + // Delete the events and release the orchestration instance lock + using SqlCommand discardCommand = this.GetSprocCommand( + connection, + "dt._DiscardEventsAndUnlockInstance"); + discardCommand.Parameters.Add("@InstanceID", SqlDbType.VarChar, 100).Value = instanceId; + discardCommand.Parameters.AddMessageIdParameter("@DeletedEvents", messages); + try + { + await SqlUtils.ExecuteNonQueryAsync( + discardCommand, + this.traceHelper, + instanceId, + cancellationToken); + } + catch (Exception e) + { + this.traceHelper.ProcessingError(e, new OrchestrationInstance { InstanceId = instanceId }); + throw; + } + + continue; + } + } - // Result #2: The full event history for the locked instance + // Result #3: The full event history for the locked instance IList history; if (await reader.NextResultAsync(cancellationToken)) { diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index aa15a0a..54cecca 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -209,9 +209,7 @@ public static OrchestrationState GetOrchestrationState(this DbDataReader reader) InstanceId = GetInstanceId(reader), ExecutionId = GetExecutionId(reader), }, - OrchestrationStatus = (OrchestrationStatus)Enum.Parse( - typeof(OrchestrationStatus), - GetStringOrNull(reader, reader.GetOrdinal("RuntimeStatus"))), + OrchestrationStatus = GetRuntimeStatus(reader), Output = GetStringOrNull(reader, reader.GetOrdinal("OutputText")), Status = GetStringOrNull(reader, reader.GetOrdinal("CustomStatusText")), ParentInstance = parentInstance @@ -276,6 +274,12 @@ public static SqlString GetParentInstanceId(HistoryEvent historyEvent) return reader.IsDBNull(ordinal) ? null : reader.GetString(ordinal); } + public static OrchestrationStatus GetRuntimeStatus(DbDataReader reader) + { + int ordinal = reader.GetOrdinal("RuntimeStatus"); + return (OrchestrationStatus)Enum.Parse(typeof(OrchestrationStatus), GetStringOrNull(reader, ordinal)); + } + public static Guid? GetPayloadId(this DbDataReader reader) { int ordinal = reader.GetOrdinal("PayloadID"); @@ -327,7 +331,7 @@ internal static SqlString GetPayloadText(HistoryEvent e) return reader.IsDBNull(ordinal) ? null : reader.GetString(ordinal); } - static string GetInstanceId(DbDataReader reader) + internal static string GetInstanceId(DbDataReader reader) { int ordinal = reader.GetOrdinal("InstanceID"); return reader.GetString(ordinal); diff --git a/src/common.props b/src/common.props index 0658aba..75cf028 100644 --- a/src/common.props +++ b/src/common.props @@ -19,10 +19,11 @@ 0 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) - rc + rc2 $(MajorVersion).$(MinorVersion).0.0 - .$(GITHUB_RUN_NUMBER) - $(VersionPrefix)$(BuildSuffix) + $(VersionPrefix).0 + + $(VersionPrefix).$(FileVersionRevision) diff --git a/test/DurableTask.SqlServer.Tests/Integration/DataRetentionTests.cs b/test/DurableTask.SqlServer.Tests/Integration/DataRetentionTests.cs new file mode 100644 index 0000000..c136496 --- /dev/null +++ b/test/DurableTask.SqlServer.Tests/Integration/DataRetentionTests.cs @@ -0,0 +1,243 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +namespace DurableTask.SqlServer.Tests.Integration +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.Core; + using DurableTask.Core.History; + using DurableTask.SqlServer.Logging; + using DurableTask.SqlServer.Tests.Logging; + using DurableTask.SqlServer.Tests.Utils; + using Xunit; + using Xunit.Abstractions; + + public class DataRetentionTests : IAsyncLifetime + { + readonly TestService testService; + + public DataRetentionTests(ITestOutputHelper output) + { + this.testService = new TestService(output); + } + + Task IAsyncLifetime.InitializeAsync() => this.testService.InitializeAsync(); + + Task IAsyncLifetime.DisposeAsync() => this.testService.DisposeAsync(); + + /// + /// This test validates that the purge behavior works correctly based on runtime status and time. + /// + [Theory] + [InlineData(OrchestrationStateTimeRangeFilterType.OrchestrationCreatedTimeFilter)] + [InlineData(OrchestrationStateTimeRangeFilterType.OrchestrationCompletedTimeFilter)] + public async Task PurgesInstancesByStatus(OrchestrationStateTimeRangeFilterType filterType) + { + var events = new ConcurrentDictionary>(); + + // Waits for an external event and then either completes or fails depending on that event + IReadOnlyList> instances = await this.testService.RunOrchestrations( + count: 30, // ideally some multiple of 3 + instanceIdGenerator: i => $"InstanceToPurge_{i:00}", + inputGenerator: i => $"Hello, world {i}", + orchestrationName: "SimpleDelay", + version: string.Empty, + implementation: async (ctx, input) => + { + var tcs = new TaskCompletionSource(); + events[ctx.OrchestrationInstance.InstanceId] = tcs; + + bool shouldFail = await tcs.Task; + if (shouldFail) + { + throw new Exception("Kah-BOOOOOM!!!"); + } + + return shouldFail; + }, + onEvent: (ctx, name, value) => + { + events[ctx.OrchestrationInstance.InstanceId].SetResult(bool.Parse(value)); + }); + + await Task.WhenAll(instances.Select(instance => instance.WaitForStart())); + + // Try to purge the instance and check that it still exists + await this.testService.PurgeAsync(DateTime.MaxValue, filterType); + foreach (TestInstance instance in instances) + { + OrchestrationState runningState = await instance.GetStateAsync(); + Assert.Equal(OrchestrationStatus.Running, runningState.OrchestrationStatus); + } + + TimeSpan timeout = TimeSpan.FromSeconds(30); + + // We want to test a mix of completed, failed, and terminated instances to make sure they are all handled correctly. + var tasks = new List(); + for (int i = 0; i < instances.Count; i++) + { + int index = i; + tasks.Add(Task.Run(async () => + { + TestInstance instance = instances[index]; + if (index % 3 == 0) + { + // Complete the instance + await instance.RaiseEventAsync("Action", false); + await instance.WaitForCompletion(timeout, OrchestrationStatus.Completed); + } + else if (index % 3 == 1) + { + // Fail the instance + await instance.RaiseEventAsync("Action", true); + await instance.WaitForCompletion(timeout, OrchestrationStatus.Failed); + } + else + { + // Terminate the instance + await instance.TerminateAsync("Terminated!"); + await instance.WaitForCompletion(timeout, OrchestrationStatus.Terminated); + } + })); + } + + // Wait for all instances to transition into their final state + await Task.WhenAll(tasks); + + // This time-based purge should remove all the instances + await this.testService.PurgeAsync(DateTime.MaxValue, filterType); + foreach (TestInstance instance in instances) + { + OrchestrationState purgedState = await instance.GetStateAsync(); + Assert.Null(purgedState); + } + + // One more purge, just to make sure there are no failures when there is nothing left to purge + await this.testService.PurgeAsync(DateTime.MaxValue, filterType); + } + + /// + /// Validates that external events sent to a completed orchestration are eventually removed from the database + /// and that a log message is emitted for each discarded event. + /// + [Fact] + public async Task EventsToCompletedOrchestrationAreDiscarded() + { + const int EventCount = 50; + + // Does nothing except return the original input + TestInstance instance = await this.testService.RunOrchestration( + input: string.Empty, + orchestrationName: "NoOp", + implementation: (ctx, input) => Task.FromResult(input)); + await instance.WaitForCompletion(); + + // Raise events to the completed instance + await Enumerable.Range(0, EventCount).ParallelForEachAsync(maxConcurrency: 10, action: i => + instance.RaiseEventAsync("BogusEvent", i)); + + // Check the logs to confirm that all the raised events were discarded + static bool IsDeletedExternalEvent(LogEntry log) + { + if (log.EventId == EventIds.DiscardingEvent) + { + LogAssert.FieldEquals(log, "EventType", EventType.EventRaised.ToString()); + return true; + } + + return false; + } + + int discardedEventCount = 0; + + TimeSpan timeout = TimeSpan.FromSeconds(5).AdjustForDebugging(); + Stopwatch sw = Stopwatch.StartNew(); + while (sw.Elapsed < timeout) + { + discardedEventCount = this.testService.GetAndValidateLogs().Count(IsDeletedExternalEvent); + + // Make sure we're getting the exact count and not a larger count + Assert.True(discardedEventCount <= EventCount); + if (discardedEventCount == EventCount) + { + break; + } + + await Task.Delay(TimeSpan.FromMilliseconds(500)); + } + + Assert.Equal(EventCount, discardedEventCount); + + // Last, check the database to confirm there are no outstanding events. + // The logs get written before the actual deletion happens, so wait an additional second to ensure that + // the events had a chance to be deleted. + await Task.Delay(TimeSpan.FromSeconds(1)); + int unprocessedEvents = await this.GetUnprocessedEventCountAsync(); + Assert.Equal(0, unprocessedEvents); + } + + /// + /// Verifies that canceled timers get properly cleaned up from the database. + /// This test is a response to https://github.com/microsoft/durabletask-mssql/issues/93. + /// + [Fact] + public async Task InvalidTimerEventsAreDiscarded() + { + var neverCompletingSource = new TaskCompletionSource(); + + TestInstance instance = await this.testService.RunOrchestration( + input: true, + orchestrationName: "CreateAndInvalidateTimers", + implementation: async (ctx, isFirstIteration) => + { + if (isFirstIteration) + { + using var cts = new CancellationTokenSource(); + + // This timer event will arrive for a different execution ID and is supposed to be discarded + _ = ctx.CreateTimer(ctx.CurrentUtcDateTime, string.Empty, cts.Token); + ctx.ContinueAsNew(false); + cts.Cancel(); + } + else + { + // Prevent the orchestration from completing + await neverCompletingSource.Task; + } + + // Cancel the timer to allow the orchestration to continue + return Task.FromResult(isFirstIteration); + }); + + await instance.WaitForStart(); + + TimeSpan timeout = TimeSpan.FromSeconds(5).AdjustForDebugging(); + Stopwatch sw = Stopwatch.StartNew(); + while (true) + { + int unprocessedEventCount = await this.GetUnprocessedEventCountAsync(); + if (unprocessedEventCount == 0) + { + break; + } + + Assert.True(sw.Elapsed < timeout, "Timeout expired waiting for unprocessed events to clear"); + await Task.Delay(TimeSpan.FromMilliseconds(500)); + } + } + + async Task GetUnprocessedEventCountAsync() + { + string taskHubName = await this.testService.GetTaskHubNameAsync(); + int unprocessedEvents = (int)await SharedTestHelpers.ExecuteSqlAsync( + $"SELECT COUNT(*) FROM dt.[NewEvents] WHERE TaskHub = '{taskHubName}'"); + return unprocessedEvents; + } + } +} diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index ff1d03a..4c594ed 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -279,6 +279,7 @@ static void ValidateDatabaseSchema(TestDatabase database) "dt._AddOrchestrationEvents", "dt._CheckpointOrchestration", "dt._CompleteTasks", + "dt._DiscardEventsAndUnlockInstance", "dt._GetVersions", "dt._LockNextOrchestration", "dt._LockNextTask", diff --git a/test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs b/test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs deleted file mode 100644 index e3be07a..0000000 --- a/test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See LICENSE in the project root for license information. - -namespace DurableTask.SqlServer.Tests.Integration -{ - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.Linq; - using System.Threading.Tasks; - using DurableTask.Core; - using DurableTask.SqlServer.Tests.Utils; - using Xunit; - using Xunit.Abstractions; - - public class PurgeTests : IAsyncLifetime - { - readonly TestService testService; - - public PurgeTests(ITestOutputHelper output) - { - this.testService = new TestService(output); - } - - Task IAsyncLifetime.InitializeAsync() => this.testService.InitializeAsync(); - - Task IAsyncLifetime.DisposeAsync() => this.testService.DisposeAsync(); - - /// - /// This test validates that the purge behavior works correctly based on runtime status and time. - /// - [Theory] - [InlineData(OrchestrationStateTimeRangeFilterType.OrchestrationCreatedTimeFilter)] - [InlineData(OrchestrationStateTimeRangeFilterType.OrchestrationCompletedTimeFilter)] - public async Task PurgesInstancesByStatus(OrchestrationStateTimeRangeFilterType filterType) - { - var events = new ConcurrentDictionary>(); - - // Waits for an external event and then either completes or fails depending on that event - IReadOnlyList> instances = await this.testService.RunOrchestrations( - count: 30, // ideally some multiple of 3 - instanceIdGenerator: i => $"InstanceToPurge_{i:00}", - inputGenerator: i=> $"Hello, world {i}", - orchestrationName: "SimpleDelay", - version: string.Empty, - implementation: async (ctx, input) => - { - var tcs = new TaskCompletionSource(); - events[ctx.OrchestrationInstance.InstanceId] = tcs; - - bool shouldFail = await tcs.Task; - if (shouldFail) - { - throw new Exception("Kah-BOOOOOM!!!"); - } - - return shouldFail; - }, - onEvent: (ctx, name, value) => - { - events[ctx.OrchestrationInstance.InstanceId].SetResult(bool.Parse(value)); - }); - - await Task.WhenAll(instances.Select(instance => instance.WaitForStart())); - - // Try to purge the instance and check that it still exists - await this.testService.PurgeAsync(DateTime.MaxValue, filterType); - foreach (TestInstance instance in instances) - { - OrchestrationState runningState = await instance.GetStateAsync(); - Assert.Equal(OrchestrationStatus.Running, runningState.OrchestrationStatus); - } - - TimeSpan timeout = TimeSpan.FromSeconds(30); - - // We want to test a mix of completed, failed, and terminated instances to make sure they are all handled correctly. - var tasks = new List(); - for (int i = 0; i < instances.Count; i++) - { - int index = i; - tasks.Add(Task.Run(async () => - { - TestInstance instance = instances[index]; - if (index % 3 == 0) - { - // Complete the instance - await instance.RaiseEventAsync("Action", false); - await instance.WaitForCompletion(timeout, OrchestrationStatus.Completed); - } - else if (index % 3 == 1) - { - // Fail the instance - await instance.RaiseEventAsync("Action", true); - await instance.WaitForCompletion(timeout, OrchestrationStatus.Failed); - } - else - { - // Terminate the instance - await instance.TerminateAsync("Terminated!"); - await instance.WaitForCompletion(timeout, OrchestrationStatus.Terminated); - } - })); - } - - // Wait for all instances to transition into their final state - await Task.WhenAll(tasks); - - // This time-based purge should remove all the instances - await this.testService.PurgeAsync(DateTime.MaxValue, filterType); - foreach (TestInstance instance in instances) - { - OrchestrationState purgedState = await instance.GetStateAsync(); - Assert.Null(purgedState); - } - - // One more purge, just to make sure there are no failures when there is nothing left to purge - await this.testService.PurgeAsync(DateTime.MaxValue, filterType); - } - } -} diff --git a/test/DurableTask.SqlServer.Tests/Logging/LogAssert.cs b/test/DurableTask.SqlServer.Tests/Logging/LogAssert.cs index a6dd608..db91dbc 100644 --- a/test/DurableTask.SqlServer.Tests/Logging/LogAssert.cs +++ b/test/DurableTask.SqlServer.Tests/Logging/LogAssert.cs @@ -115,6 +115,11 @@ internal static void ValidateStructuredLogFields(LogEntry log) Assert.True(fields.ContainsKey("Name")); Assert.True(fields.ContainsKey("LatencyMs")); break; + case EventIds.ProcessingFailure: + Assert.True(fields.ContainsKey("InstanceId")); + Assert.True(fields.ContainsKey("ExecutionId")); + Assert.True(fields.ContainsKey("Details")); + break; case EventIds.CheckpointStarting: Assert.True(fields.ContainsKey("Name")); Assert.True(fields.ContainsKey("InstanceId")); @@ -143,20 +148,38 @@ internal static void ValidateStructuredLogFields(LogEntry log) case EventIds.CreatedDatabase: Assert.True(fields.ContainsKey("DatabaseName")); break; + case EventIds.DiscardingEvent: + Assert.True(fields.ContainsKey("InstanceId")); + Assert.True(fields.ContainsKey("EventType")); + Assert.True(fields.ContainsKey("TaskEventId")); + Assert.True(fields.ContainsKey("Details")); + break; + case EventIds.GenericInfo: + Assert.True(fields.ContainsKey("InstanceId")); + Assert.True(fields.ContainsKey("Details")); + break; default: throw new ArgumentException($"Log event {log.EventId} is not known. Does it need to be added to the log validator?", nameof(log)); } } public static T FieldEquals(LogEntry logEntry, string fieldName, T expectedValue) + { + T convertedValue = GetFieldValue(logEntry, fieldName); + Assert.Equal(expectedValue, convertedValue); + return convertedValue; + } + + public static T GetFieldValue(LogEntry logEntry, string fieldName) { var structuredEvent = logEntry.State as IReadOnlyDictionary; Assert.NotNull(structuredEvent); - IReadOnlyDictionary eventData = Assert.IsAssignableFrom>(logEntry.State); + IReadOnlyDictionary eventData = + Assert.IsAssignableFrom>(logEntry.State); + object fieldValue = Assert.Contains(fieldName, eventData); T convertedValue = Assert.IsType(fieldValue); - Assert.Equal(expectedValue, convertedValue); return convertedValue; } } diff --git a/test/DurableTask.SqlServer.Tests/Logging/LogAssertExtensions.cs b/test/DurableTask.SqlServer.Tests/Logging/LogAssertExtensions.cs index 0a18bf6..7d2e8fc 100644 --- a/test/DurableTask.SqlServer.Tests/Logging/LogAssertExtensions.cs +++ b/test/DurableTask.SqlServer.Tests/Logging/LogAssertExtensions.cs @@ -5,6 +5,7 @@ namespace DurableTask.SqlServer.Tests.Logging { using System.Collections.Generic; using System.Linq; + using DurableTask.SqlServer.Logging; using Xunit; static class LogAssertExtensions @@ -29,10 +30,19 @@ public static IEnumerable Expect(this IEnumerable logs, para int i = 0; foreach (LogEntry actual in logs) { + if (actual.EventId == EventIds.GenericInfo) + { + // Ignore generic info events, which can be non-deterministic + continue; + } + if (asserts.Length > i) { LogAssert expected = asserts[i++]; + // GenericInfo logs are not supported for validation + Assert.NotEqual(EventIds.GenericInfo, expected.EventId); + Assert.Equal(expected.EventName, actual.EventId.Name); Assert.Equal(expected.EventId, actual.EventId.Id); Assert.Equal(expected.Level, actual.LogLevel); diff --git a/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs b/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs index 1cfe339..6b0d373 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs @@ -50,30 +50,34 @@ public static string GetDefaultConnectionString(string database = "DurableDB") return builder.ToString(); } - public static async Task ExecuteSqlAsync(string commandText) + public static async Task ExecuteSqlAsync(string commandText, string connectionString = null) { + Exception lastException = null; for (int retry = 0; retry < 3; retry++) { try { - string connectionString = GetDefaultConnectionString(); + connectionString ??= GetDefaultConnectionString(); await using SqlConnection connection = new SqlConnection(connectionString); await using SqlCommand command = connection.CreateCommand(); await command.Connection.OpenAsync(); command.CommandText = commandText; - await command.ExecuteNonQueryAsync(); - break; + return await command.ExecuteScalarAsync(); } catch (SqlException e) when (e.Number == 15434) { // 15434 : Could not drop login 'XXX' as the user is currently logged in. + lastException = e; } catch (SqlException e) when (e.Number == 6106) { // 6106 : Process ID 'XXX' is not an active process ID + lastException = e; } } + + throw lastException; } public static async Task InitializeDatabaseAsync() diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs b/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs index c87e634..4791e26 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs @@ -11,7 +11,7 @@ namespace DurableTask.SqlServer.Tests.Utils using Newtonsoft.Json.Linq; using Xunit; - class TestInstance + class TestInstance { readonly TaskHubClient client; readonly OrchestrationInstance instance; @@ -19,7 +19,7 @@ class TestInstance readonly string version; DateTime startTime; - T input; + TInput input; public TestInstance( TaskHubClient client, @@ -27,7 +27,7 @@ public TestInstance( string name, string version, DateTime startTime, - T input) + TInput input) { this.client = client; this.instance = instance; @@ -127,7 +127,10 @@ public async Task WaitForCompletion( internal Task GetStateAsync() { - return this.client.GetOrchestrationStateAsync(this.instance); + return this.client.GetOrchestrationStateAsync(new OrchestrationInstance + { + InstanceId = this.instance.InstanceId, + }); } internal Task RaiseEventAsync(string name, object value) @@ -140,7 +143,7 @@ internal Task TerminateAsync(string reason) return this.client.TerminateInstanceAsync(this.instance, reason); } - internal async Task RestartAsync(T newInput) + internal async Task RestartAsync(TInput newInput) { OrchestrationInstance newInstance = await this.client.CreateOrchestrationInstanceAsync( this.name, diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs index e415fd7..ec5d763 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs @@ -258,7 +258,12 @@ public IEnumerable GetAndValidateLogs() } } - static Task ExecuteCommandAsync(string commandText) => SharedTestHelpers.ExecuteSqlAsync(commandText); + public async Task GetTaskHubNameAsync() + { + return (string)await SharedTestHelpers.ExecuteSqlAsync( + "SELECT dt.CurrentTaskHub()", + this.testCredential.ConnectionString); + } class ActivityShim : TaskActivity { @@ -293,7 +298,7 @@ public override Task RunTask(OrchestrationContext context, TInput input => this.Implementation(context, input); public override void RaiseEvent(OrchestrationContext context, string name, string input) - => this.OnEventRaised(context, name, input); + => this.OnEventRaised?.Invoke(context, name, input); } class TestObjectCreator : ObjectCreator