From 4ff7b2f5364aadf0b8d322a4eeb86df153ff746e Mon Sep 17 00:00:00 2001 From: Usein Mambediiev Date: Tue, 16 Mar 2021 16:19:51 -0700 Subject: [PATCH] Versioning support for orchestrations (#11) --- src/DurableTask.SqlServer/DTUtils.cs | 9 ++++ src/DurableTask.SqlServer/Scripts/logic.sql | 35 ++++++++++--- .../Scripts/schema-0.2.0.sql | 7 ++- src/DurableTask.SqlServer/SqlDbManager.cs | 2 +- .../SqlOrchestrationService.cs | 1 + .../SqlTypes/HistoryEventSqlType.cs | 2 + .../SqlTypes/OrchestrationEventSqlType.cs | 3 ++ src/DurableTask.SqlServer/SqlUtils.cs | 16 +++++- .../Integration/Orchestrations.cs | 49 +++++++++++++++++++ .../Integration/PurgeTests.cs | 1 + .../Utils/TestService.cs | 41 ++++++++++++---- 11 files changed, 146 insertions(+), 20 deletions(-) diff --git a/src/DurableTask.SqlServer/DTUtils.cs b/src/DurableTask.SqlServer/DTUtils.cs index 18e6d42..e84a892 100644 --- a/src/DurableTask.SqlServer/DTUtils.cs +++ b/src/DurableTask.SqlServer/DTUtils.cs @@ -141,5 +141,14 @@ public static bool TryGetPayloadText(HistoryEvent e, out string? payloadText) _ => null, }; } + + public static string? GetVersion(HistoryEvent historyEvent) + { + return historyEvent.EventType switch + { + EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version, + _ => null, + }; + } } } diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 966de14..1d86308 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -37,6 +37,7 @@ AS I.[InstanceID], I.[ExecutionID], I.[Name], + I.[Version], I.[CreatedTime], I.[LastUpdatedTime], I.[CompletedTime], @@ -84,6 +85,7 @@ GO CREATE OR ALTER PROCEDURE dt.CreateInstance @Name varchar(300), + @Version varchar(100), @InstanceID varchar(100) = NULL, @ExecutionID varchar(50) = NULL, @InputText varchar(MAX) = NULL, @@ -135,6 +137,7 @@ BEGIN INSERT INTO Instances ( [Name], + [Version], [TaskHub], [InstanceID], [ExecutionID], @@ -142,6 +145,7 @@ BEGIN [InputPayloadID]) VALUES ( @Name, + @Version, @TaskHub, @InstanceID, @ExecutionID, @@ -179,8 +183,13 @@ CREATE OR ALTER PROCEDURE dt.GetInstanceHistory AS BEGIN DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub() - DECLARE @ParentInstanceID varchar(100) = ( - SELECT [ParentInstanceID] FROM Instances WHERE [InstanceID] = @InstanceID) + DECLARE @ParentInstanceID varchar(100) + DECLARE @Version varchar(100) + + SELECT + @ParentInstanceID = [ParentInstanceID], + @Version = [Version] + FROM Instances WHERE [InstanceID] = @InstanceID SELECT H.[InstanceID], @@ -196,7 +205,8 @@ BEGIN P.[Reason], (CASE WHEN @GetInputsAndOutputs = 0 THEN NULL ELSE P.[Text] END) AS [PayloadText], [PayloadID], - @ParentInstanceID as [ParentInstanceID] + @ParentInstanceID as [ParentInstanceID], + @Version as [Version] FROM History H WITH (INDEX (PK_History)) LEFT OUTER JOIN Payloads P ON P.[TaskHub] = @TaskHub AND @@ -233,12 +243,14 @@ BEGIN [InstanceID], [ExecutionID], [Name], + [Version], [RuntimeStatus]) SELECT @TaskHub, @InstanceID, NEWID(), SUBSTRING(@InstanceID, 2, CHARINDEX('@', @InstanceID, 2) - 2), + '', 'Pending' WHERE LEFT(@InstanceID, 1) = '@' AND CHARINDEX('@', @InstanceID, 2) > 2 @@ -413,6 +425,7 @@ BEGIN DECLARE @now datetime2 = SYSUTCDATETIME() DECLARE @instanceID varchar(100) DECLARE @parentInstanceID varchar(100) + DECLARE @version varchar(100) DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub() BEGIN TRANSACTION @@ -430,7 +443,8 @@ BEGIN [LockedBy] = @LockedBy, [LockExpiration] = @LockExpiration, @instanceID = I.[InstanceID], - @parentInstanceID = I.[ParentInstanceID] + @parentInstanceID = I.[ParentInstanceID], + @version = I.[Version] FROM dt.Instances I WITH (READPAST) INNER JOIN NewEvents E WITH (READPAST) ON E.[TaskHub] = @TaskHub AND @@ -459,7 +473,8 @@ BEGIN P.[Text] AS [PayloadText], P.[PayloadID], DATEDIFF(millisecond, [Timestamp], @now) AS [WaitTime], - @parentInstanceID as [ParentInstanceID] + @parentInstanceID as [ParentInstanceID], + @version as [Version] FROM NewEvents N LEFT OUTER JOIN dt.[Payloads] P ON P.[TaskHub] = @TaskHub AND @@ -491,7 +506,8 @@ BEGIN -- Optimization: Do not load the data payloads for these history events - they are not needed since they are never replayed (CASE WHEN [EventType] IN ('TaskScheduled', 'SubOrchestrationInstanceCreated') THEN NULL ELSE P.[Text] END) AS [PayloadText], [PayloadID], - @parentInstanceID as [ParentInstanceID] + @parentInstanceID as [ParentInstanceID], + @version as [Version] FROM History H WITH (INDEX (PK_History)) LEFT OUTER JOIN Payloads P ON P.[TaskHub] = @TaskHub AND @@ -639,12 +655,14 @@ BEGIN [InstanceID], [ExecutionID], [Name], + [Version], [RuntimeStatus]) SELECT DISTINCT @TaskHub, E.[InstanceID], NEWID(), SUBSTRING(E.[InstanceID], 2, CHARINDEX('@', E.[InstanceID], 2) - 2), + '', 'Pending' FROM @NewOrchestrationEvents E WHERE LEFT(E.[InstanceID], 1) = '@' @@ -662,6 +680,7 @@ BEGIN [InstanceID], [ExecutionID], [Name], + [Version], [ParentInstanceID], [RuntimeStatus]) SELECT DISTINCT @@ -669,6 +688,7 @@ BEGIN E.[InstanceID], E.[ExecutionID], E.[Name], + E.[Version], E.[ParentInstanceID], 'Pending' FROM @NewOrchestrationEvents E @@ -810,12 +830,14 @@ BEGIN [InstanceID], [ExecutionID], [Name], + [Version], [RuntimeStatus]) SELECT DISTINCT @TaskHub, E.[InstanceID], NEWID(), SUBSTRING(E.[InstanceID], 2, CHARINDEX('@', E.[InstanceID], 2) - 2), + '', 'Pending' FROM @NewOrchestrationEvents E WHERE NOT EXISTS ( @@ -880,6 +902,7 @@ BEGIN I.[InstanceID], I.[ExecutionID], I.[Name], + I.[Version], I.[CreatedTime], I.[LastUpdatedTime], I.[CompletedTime], diff --git a/src/DurableTask.SqlServer/Scripts/schema-0.2.0.sql b/src/DurableTask.SqlServer/Scripts/schema-0.2.0.sql index 87fd5df..a47d33a 100644 --- a/src/DurableTask.SqlServer/Scripts/schema-0.2.0.sql +++ b/src/DurableTask.SqlServer/Scripts/schema-0.2.0.sql @@ -36,7 +36,8 @@ IF TYPE_ID(N'dt.HistoryEvents') IS NULL [Reason] varchar(max) NULL, [PayloadText] varchar(max) NULL, [PayloadID] uniqueidentifier NULL, - [ParentInstanceID] varchar(100) NULL + [ParentInstanceID] varchar(100) NULL, + [Version] varchar(100) NULL ) GO @@ -54,7 +55,8 @@ IF TYPE_ID(N'dt.OrchestrationEvents') IS NULL [Reason] varchar(max) NULL, [PayloadText] varchar(max) NULL, [PayloadID] uniqueidentifier NULL, - [ParentInstanceID] varchar(100) NULL + [ParentInstanceID] varchar(100) NULL, + [Version] varchar(100) NULL ) GO @@ -112,6 +114,7 @@ BEGIN [InstanceID] varchar(100) NOT NULL, [ExecutionID] varchar(50) NOT NULL CONSTRAINT DF_Instances_ExecutionID DEFAULT (NEWID()), -- expected to be system generated [Name] varchar(300) NOT NULL, -- the type name of the orchestration or entity + [Version] varchar(100) NOT NULL, -- the version of the orchestration [CreatedTime] datetime2 NOT NULL CONSTRAINT DF_Instances_CreatedTime DEFAULT SYSUTCDATETIME(), [LastUpdatedTime] datetime2 NULL, [CompletedTime] datetime2 NULL, diff --git a/src/DurableTask.SqlServer/SqlDbManager.cs b/src/DurableTask.SqlServer/SqlDbManager.cs index 0f19cf2..a977173 100644 --- a/src/DurableTask.SqlServer/SqlDbManager.cs +++ b/src/DurableTask.SqlServer/SqlDbManager.cs @@ -51,7 +51,7 @@ public async Task CreateOrUpgradeSchemaAsync(bool recreateIfExists) if (await reader.ReadAsync()) { // The first result contains the latest version - currentSchemaVersion = SqlUtils.GetVersion(reader); + currentSchemaVersion = SqlUtils.GetSemanticVersion(reader); if (currentSchemaVersion >= DTUtils.ExtensionVersion) { // The schema is already up-to-date. diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 03fda84..984cb48 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -398,6 +398,7 @@ public override async Task CreateTaskOrchestrationAsync(TaskMessage creationMess ExecutionStartedEvent startEvent = (ExecutionStartedEvent)creationMessage.Event; OrchestrationInstance instance = startEvent.OrchestrationInstance; command.Parameters.Add("@Name", SqlDbType.VarChar, size: 300).Value = startEvent.Name; + command.Parameters.Add("@Version", SqlDbType.VarChar, size: 100).Value = startEvent.Version; command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instance.InstanceId; command.Parameters.Add("@ExecutionID", SqlDbType.VarChar, size: 50).Value = instance.ExecutionId; command.Parameters.Add("@InputText", SqlDbType.VarChar).Value = startEvent.Input; diff --git a/src/DurableTask.SqlServer/SqlTypes/HistoryEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/HistoryEventSqlType.cs index bec22fc..e18a17d 100644 --- a/src/DurableTask.SqlServer/SqlTypes/HistoryEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/HistoryEventSqlType.cs @@ -31,6 +31,7 @@ static class HistoryEventSqlType new SqlMetaData("PayloadText", SqlDbType.VarChar, -1 /* max */), new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier), new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100), + new SqlMetaData("Version", SqlDbType.VarChar, 100), }; static class ColumnOrdinals @@ -50,6 +51,7 @@ static class ColumnOrdinals public const int PayloadText = 11; public const int PayloadID = 12; public const int ParentInstanceID = 13; + public const int Version = 14; }; public static SqlParameter AddHistoryEventsParameter( diff --git a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs index aa69319..1bf7aa2 100644 --- a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs @@ -31,6 +31,7 @@ static class OrchestrationEventSqlType new SqlMetaData("PayloadText", SqlDbType.VarChar, -1 /* max */), new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier), new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100), + new SqlMetaData("Version", SqlDbType.VarChar, 100), }; static class ColumnOrdinals @@ -48,6 +49,7 @@ static class ColumnOrdinals public const int PayloadText = 9; public const int PayloadId = 10; public const int ParentInstanceID = 11; + public const int Version = 12; } public static SqlParameter AddOrchestrationEventsParameter( @@ -135,6 +137,7 @@ static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord payloadText.IsNull && reason.IsNull ? SqlGuid.Null : new SqlGuid(Guid.NewGuid())); record.SetSqlString(ColumnOrdinals.ParentInstanceID, SqlUtils.GetParentInstanceId(msg.Event)); + record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event)); return record; } diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 2ffadf5..2178933 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -92,7 +92,7 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch ExecutionId = GetExecutionId(reader), }, Tags = null, // TODO - Version = null, // TODO + Version = GetVersion(reader), }; string? parentInstanceId = GetParentInstanceId(reader); if (parentInstanceId != null) @@ -190,6 +190,7 @@ public static OrchestrationState GetOrchestrationState(this DbDataReader reader) Input = reader.GetStringOrNull(reader.GetOrdinal("InputText")), LastUpdatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("LastUpdatedTime")) ?? default, Name = GetName(reader), + Version = GetVersion(reader), OrchestrationInstance = new OrchestrationInstance { InstanceId = GetInstanceId(reader), @@ -267,12 +268,23 @@ public static SqlString GetParentInstanceId(HistoryEvent historyEvent) return reader.IsDBNull(ordinal) ? (Guid?)null : reader.GetGuid(ordinal); } - public static SemanticVersion GetVersion(DbDataReader reader) + public static SemanticVersion GetSemanticVersion(DbDataReader reader) { string versionString = reader.GetString("SemanticVersion"); return SemanticVersion.Parse(versionString); } + public static SqlString GetVersion(HistoryEvent historyEvent) + { + return DTUtils.GetVersion(historyEvent) ?? SqlString.Null; + } + + public static string? GetVersion(DbDataReader reader) + { + int ordinal = reader.GetOrdinal("Version"); + return reader.IsDBNull(ordinal) ? null : reader.GetString(ordinal); + } + internal static SqlString GetReason(HistoryEvent historyEvent) { return historyEvent.EventType switch diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index a3759ed..c565af1 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -169,6 +169,7 @@ public async Task ActivityChain(int parallelCount) parallelCount, _ => null, orchestrationName: "OrchestrationsWithActivityChain", + version: string.Empty, implementation: async (ctx, _) => { int value = 0; @@ -407,5 +408,53 @@ public async Task SubOrchestration() await testInstance.WaitForCompletion( timeout: TimeSpan.FromSeconds(15), expectedOutput: 15); } + + [Fact] + public async Task VersionedOrchestration() + { + string orchestrationName = "VersionedOrchestrationTest"; + string version1 = "V1"; + string version2 = "V2"; + var waitTimeout = TimeSpan.FromSeconds(15); + + TestInstance v1Instance = await this.testService.RunOrchestration( + null, + orchestrationName, + version: version1, + implementation: (ctx, input) => Task.FromResult(version1)); + await v1Instance.WaitForCompletion(waitTimeout, expectedOutput: version1); + + TestInstance v2Instance = await this.testService.RunOrchestration( + null, + orchestrationName, + version: version2, + implementation: (ctx, input) => Task.FromResult(version2)); + await v2Instance.WaitForCompletion(waitTimeout, expectedOutput: version2); + } + + [Fact] + public async Task VersionedSubOrchestration() + { + string subOrchestrationName = "VersionedSubOrchestrationTest"; + string version1 = "V1"; + string version2 = "V2"; + var waitTimeout = TimeSpan.FromSeconds(30); + + this.testService.RegisterInlineOrchestration( + subOrchestrationName, version1, implementation: (ctx, input) => Task.FromResult(version1)); + this.testService.RegisterInlineOrchestration( + subOrchestrationName, version2, implementation: (ctx, input) => Task.FromResult(version2)); + + TestInstance parentInstance = await this.testService.RunOrchestration( + null, + "ParentOrchestration", + implementation: async (ctx, input) => + { + var result1 = await ctx.CreateSubOrchestrationInstance(subOrchestrationName, version1, null); + var result2 = await ctx.CreateSubOrchestrationInstance(subOrchestrationName, version2, null); + return result1 + result2; + }); + await parentInstance.WaitForCompletion(waitTimeout, expectedOutput: version1 + version2); + } } } diff --git a/test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs b/test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs index 0f8cf3c..87dd1d9 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs @@ -43,6 +43,7 @@ public async Task PurgesInstancesByStatus(OrchestrationStateTimeRangeFilterType count: 30, // ideally some multiple of 3 inputGenerator: i=> $"Hello, world {i}", orchestrationName: "SimpleDelay", + version: string.Empty, implementation: async (ctx, input) => { var tcs = new TaskCompletionSource(); diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs index aee9ddd..884d621 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs @@ -88,9 +88,18 @@ public async Task DisposeAsync() await this.DropTaskHubLoginAsync(); } + public Task> RunOrchestration( + TInput input, + string orchestrationName, + Func> implementation, + Action onEvent = null, + params (string name, TaskActivity activity)[] activities) => + RunOrchestration(input, orchestrationName, string.Empty, implementation, onEvent, activities); + public async Task> RunOrchestration( TInput input, string orchestrationName, + string version, Func> implementation, Action onEvent = null, params (string name, TaskActivity activity)[] activities) @@ -98,7 +107,8 @@ public async Task> RunOrchestration( var instances = await this.RunOrchestrations( count: 1, inputGenerator: i => input, - orchestrationName, + orchestrationName: orchestrationName, + version: version, implementation, onEvent, activities); @@ -110,16 +120,13 @@ public async Task>> RunOrchestrations int count, Func inputGenerator, string orchestrationName, + string version, Func> implementation, Action onEvent = null, params (string name, TaskActivity activity)[] activities) { // Register the inline orchestration - note that this will only work once per test - Type orchestrationType = typeof(OrchestrationShim); - - this.worker.AddTaskOrchestrations(new TestObjectCreator( - orchestrationName, - MakeOrchestration(implementation, onEvent))); + RegisterInlineOrchestration(orchestrationName, version, implementation, onEvent); foreach ((string name, TaskActivity activity) in activities) { @@ -134,7 +141,7 @@ public async Task>> RunOrchestrations TInput input = inputGenerator(i); OrchestrationInstance instance = await this.client.CreateOrchestrationInstanceAsync( orchestrationName, - string.Empty /* version */, + version, input); // Verify that the CreateOrchestrationInstanceAsync implementation set the InstanceID and ExecutionID fields @@ -147,6 +154,18 @@ public async Task>> RunOrchestrations return instances; } + public void RegisterInlineOrchestration( + string orchestrationName, + string version, + Func> implementation, + Action onEvent = null) + { + this.worker.AddTaskOrchestrations(new TestObjectCreator( + orchestrationName, + version, + MakeOrchestration(implementation, onEvent))); + } + public static TaskOrchestration MakeOrchestration( Func> implementation, Action onEvent = null) @@ -345,10 +364,14 @@ class TestObjectCreator : ObjectCreator { readonly T obj; - public TestObjectCreator(string name, T obj) + public TestObjectCreator(string name, T obj) : this(name, string.Empty, obj) + { + } + + public TestObjectCreator(string name, string version, T obj) { this.Name = name; - this.Version = string.Empty; + this.Version = version; this.obj = obj; }