From 7031788acf5db24821af3264859ed10445c54e9b Mon Sep 17 00:00:00 2001 From: Greybird Date: Sat, 28 May 2022 13:39:37 +0200 Subject: [PATCH] Add support for rewinding orchestrations --- .../SqlDurabilityProvider.cs | 5 + .../Scripts/drop-schema.sql | 2 + src/DurableTask.SqlServer/Scripts/logic.sql | 151 +++++++++++++++++- .../Scripts/permissions.sql | 2 + .../SqlOrchestrationService.cs | 16 ++ 5 files changed, 175 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs index bfc754d..5522a64 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs @@ -193,6 +193,11 @@ public override async Task PurgeHistoryByFilters(DateTime createdTimeFrom, return purgeResult.DeletedInstanceCount; } + public override async Task RewindAsync(string instanceId, string reason) + { + await this.service.RewindTaskOrchestrationAsync(instanceId, reason); + } + public override bool TryGetScaleMonitor( string functionId, string functionName, diff --git a/src/DurableTask.SqlServer/Scripts/drop-schema.sql b/src/DurableTask.SqlServer/Scripts/drop-schema.sql index 5d50224..5ffae98 100644 --- a/src/DurableTask.SqlServer/Scripts/drop-schema.sql +++ b/src/DurableTask.SqlServer/Scripts/drop-schema.sql @@ -19,6 +19,7 @@ DROP PROCEDURE IF EXISTS dt.SetGlobalSetting DROP PROCEDURE IF EXISTS dt.TerminateInstance DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByID DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByTime +DROP PROCEDURE IF EXISTS dt.RewindInstance -- Private sprocs DROP PROCEDURE IF EXISTS dt._AddOrchestrationEvents @@ -31,6 +32,7 @@ DROP PROCEDURE IF EXISTS dt._QueryManyOrchestrations DROP PROCEDURE IF EXISTS dt._RenewOrchestrationLocks DROP PROCEDURE IF EXISTS dt._RenewTaskLocks DROP PROCEDURE IF EXISTS dt._UpdateVersion +DROP PROCEDURE IF EXISTS dt._RewindInstance -- Tables DROP TABLE IF EXISTS dt.Versions diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 21a18ce..e8e8b2f 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -174,7 +174,7 @@ BEGIN -- Instance IDs can be overwritten only if the orchestration is in a terminal state IF @existingStatus IN ('Pending', 'Running') BEGIN - DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot create instance with ID ''%s'' because a pending or running instance with ID already exists.', @InstanceId); + DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot create instance with ID ''%s'' because a pending or running instance with ID already exists.', @InstanceID); THROW 50001, @msg, 1; END ELSE IF @existingStatus IS NOT NULL @@ -478,6 +478,20 @@ BEGIN END GO +CREATE OR ALTER PROCEDURE dt.RewindInstance + @InstanceID varchar(100), + @Reason varchar(max) = NULL +AS +BEGIN + DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub() + + BEGIN TRANSACTION + + EXEC dt._RewindInstance @InstanceID, @Reason + + COMMIT TRANSACTION +END +GO CREATE OR ALTER PROCEDURE dt.SetGlobalSetting @Name varchar(300), @@ -1252,3 +1266,138 @@ BEGIN VALUES (@SemanticVersion) END GO + + +CREATE OR ALTER PROCEDURE dt._RewindInstance + @InstanceID varchar(100), + @Reason varchar(max) = NULL +AS +BEGIN + DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub() + + -- *** IMPORTANT *** + -- To prevent deadlocks, it is important to maintain consistent table access + -- order across all stored procedures that execute within a transaction. + -- Table order for this sproc: Instances --> (History --> Payloads --> History), NewEvents) + + DECLARE @existingStatus varchar(30) + DECLARE @executionID varchar(50) + + SELECT TOP 1 @existingStatus = existing.[RuntimeStatus], @executionID = existing.[ExecutionID] + FROM Instances existing WITH (HOLDLOCK) + WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID + + -- Instance IDs can be overwritten only if the orchestration is in a terminal state + IF @existingStatus NOT IN ('Failed') + BEGIN + DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus); + THROW 50001, @msg, 1; + END + + DECLARE @eventsInFailure TABLE ( + [SequenceNumber] bigint NULL, + [EventType] varchar(40) NULL, + [TaskID] int NULL, + [DataPayloadID] uniqueidentifier NULL) + + -- List all events related to failures (ie TaskScheduled/TaskFailed and SubOrchestrationInstanceStarted/SubOrchestrationInstanceFailed couples) + INSERT INTO @eventsInFailure + SELECT h.[SequenceNumber], h.[EventType], h.[TaskID], h.[DataPayloadID] + FROM History h + WHERE h.[TaskHub] = @TaskHub AND h.[InstanceID] = @InstanceID + AND (h.[EventType] IN ('TaskFailed', 'SubOrchestrationInstanceFailed') + OR (h.[EventType] IN ('TaskScheduled', 'SubOrchestrationInstanceStarted') AND EXISTS (SELECT 1 + FROM History f + WHERE f.[TaskHub] = @TaskHub AND f.[InstanceID] = @InstanceID AND f.[TaskID] = h.[TaskID] AND f.[EventType] = CASE WHEN h.[EventType] = 'TaskScheduled' THEN 'TaskFailed' ELSE 'SubOrchestrationInstanceFailed' END))) + + -- Mark all events related to failure as rewound + UPDATE Payloads + SET [Reason] = CONCAT('Rewound: ', ef.[EventType]) + FROM Payloads p + JOIN @eventsInFailure ef ON p.[PayloadID] = ef.[DataPayloadID] + WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [SequenceNumber] IN (SELECT [SequenceNumber] FROM @eventsInFailure WHERE [DataPayloadID] IS NOT NULL AND [EventType] IN ('TaskScheduled', 'SubOrchestrationInstanceCreated')) + + DECLARE @sequenceNumber bigint + DECLARE @eventType varchar(40) + DECLARE @payloadId uniqueidentifier + DECLARE sequenceNumberCursor CURSOR LOCAL FOR + SELECT [SequenceNumber], [EventType] + FROM @eventsInFailure + WHERE [DataPayloadID] IS NULL + + OPEN sequenceNumberCursor + FETCH NEXT FROM sequenceNumberCursor INTO @sequenceNumber, @eventType + + WHILE @@FETCH_STATUS = 0 BEGIN + SET @payloadId = NEWID() + INSERT INTO Payloads ( + [TaskHub], + [InstanceID], + [PayloadID], + [Reason] + ) + VALUES (@TaskHub, @InstanceID, @payloadId, CONCAT('Rewound: ', @eventType)) + FETCH NEXT FROM sequenceNumberCursor INTO @sequenceNumber, @eventType + END + CLOSE sequenceNumberCursor + DEALLOCATE sequenceNumberCursor + + -- Transform all events related to failure to GenericEvents, except for SubOrchestrationInstanceStarted that can be kept + UPDATE History + SET [EventType] = 'GenericEvent' + WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID + AND ([SequenceNumber] IN (SELECT [SequenceNumber] + FROM @eventsInFailure WHERE [EventType] <> 'SubOrchestrationInstanceStarted') + OR [RuntimeStatus] = 'Failed') + + -- Enumerate instances of sub orchestrations related to SubOrchestrationInstanceFailed events and rewing them recursively + DECLARE @subOrchestrationInstanceID varchar(100) + DECLARE subOrchestrationCursor CURSOR LOCAL FOR + SELECT i.[InstanceID] + FROM dt.Instances i + JOIN dt.History h ON i.[TaskHub] = h.[TaskHub] AND i.[InstanceID] = h.[InstanceID] + JOIN @eventsInFailure e ON e.[TaskID] = h.[TaskID] + WHERE i.[ParentInstanceID] = @InstanceID + AND h.[EventType] = 'ExecutionStarted' + AND e.[EventType] = 'SubOrchestrationInstanceFailed' + + + OPEN subOrchestrationCursor + FETCH NEXT FROM subOrchestrationCursor INTO @subOrchestrationInstanceID + + WHILE @@FETCH_STATUS = 0 BEGIN + EXECUTE dt._RewindInstance @subOrchestrationInstanceID, @Reason + FETCH NEXT FROM subOrchestrationCursor INTO @subOrchestrationInstanceID + END + CLOSE subOrchestrationCursor + DEALLOCATE subOrchestrationCursor + + -- Insert a line in NewEvents to ensure orchestration will start + SET @payloadId = NEWID() + INSERT INTO Payloads ( + [TaskHub], + [InstanceID], + [PayloadID], + [Text] + ) + VALUES (@TaskHub, @InstanceID, @payloadId, @reason) + INSERT INTO NewEvents ( + [TaskHub], + [InstanceID], + [ExecutionID], + [EventType], + [PayloadID] + ) + VALUES( + @TaskHub, + @InstanceID, + @executionID, + 'GenericEvent', + @payloadId) + + -- Set orchestration status to Pending + UPDATE Instances + SET [RuntimeStatus] = 'Pending', [LastUpdatedTime] = SYSUTCDATETIME() + WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID +END +GO \ No newline at end of file diff --git a/src/DurableTask.SqlServer/Scripts/permissions.sql b/src/DurableTask.SqlServer/Scripts/permissions.sql index 4d8252c..436aa93 100644 --- a/src/DurableTask.SqlServer/Scripts/permissions.sql +++ b/src/DurableTask.SqlServer/Scripts/permissions.sql @@ -25,6 +25,7 @@ GRANT EXECUTE ON OBJECT::dt.RaiseEvent TO dt_runtime GRANT EXECUTE ON OBJECT::dt.TerminateInstance TO dt_runtime GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByID TO dt_runtime GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByTime TO dt_runtime +GRANT EXECUTE ON OBJECT::dt.RewindInstance TO dt_runtime -- Internal sprocs GRANT EXECUTE ON OBJECT::dt._AddOrchestrationEvents TO dt_runtime @@ -37,6 +38,7 @@ GRANT EXECUTE ON OBJECT::dt._QueryManyOrchestrations TO dt_runtime GRANT EXECUTE ON OBJECT::dt._RenewOrchestrationLocks TO dt_runtime GRANT EXECUTE ON OBJECT::dt._RenewTaskLocks TO dt_runtime GRANT EXECUTE ON OBJECT::dt._UpdateVersion TO dt_runtime +GRANT EXECUTE ON OBJECT::dt._RewindInstance TO dt_runtime -- Types GRANT EXECUTE ON TYPE::dt.HistoryEvents TO dt_runtime diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 66f98dd..66bdd67 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -742,6 +742,22 @@ public async Task> GetManyOrchestrations return results; } + /// + /// Rewinds an orchestration + /// + /// Instance id of the orchestration + /// A reason for rewinding the orchestration. + public async Task RewindTaskOrchestrationAsync(string instanceId, string reason) + { + using SqlConnection connection = await this.GetAndOpenConnectionAsync(); + using SqlCommand command = this.GetSprocCommand(connection, "dt.RewindInstance"); + + command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instanceId; + command.Parameters.Add("@Reason", SqlDbType.VarChar).Value = reason; + + await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper); + } + /// /// Gets a value that represents the recommended instance count for handling the current event and work-item backlogs. ///