Skip to content

Commit

Permalink
Add support for rewinding orchestrations
Browse files Browse the repository at this point in the history
  • Loading branch information
Greybird committed May 28, 2022
1 parent 9a54563 commit 13ad593
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ public override async Task<int> PurgeHistoryByFilters(DateTime createdTimeFrom,
return purgeResult.DeletedInstanceCount;
}

public override async Task RewindAsync(string instanceId, string reason)
{
await this.service.RewindTaskOrchestrationAsync(instanceId, reason);
}

public override bool TryGetScaleMonitor(
string functionId,
string functionName,
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Scripts/drop-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ DROP PROCEDURE IF EXISTS dt.SetGlobalSetting
DROP PROCEDURE IF EXISTS dt.TerminateInstance
DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByID
DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByTime
DROP PROCEDURE IF EXISTS dt.RewindInstance

-- Private sprocs
DROP PROCEDURE IF EXISTS dt._AddOrchestrationEvents
Expand All @@ -31,6 +32,7 @@ DROP PROCEDURE IF EXISTS dt._QueryManyOrchestrations
DROP PROCEDURE IF EXISTS dt._RenewOrchestrationLocks
DROP PROCEDURE IF EXISTS dt._RenewTaskLocks
DROP PROCEDURE IF EXISTS dt._UpdateVersion
DROP PROCEDURE IF EXISTS dt._RewindInstance

-- Tables
DROP TABLE IF EXISTS dt.Versions
Expand Down
149 changes: 149 additions & 0 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,20 @@ BEGIN
END
GO

CREATE OR ALTER PROCEDURE dt.RewindInstance
@InstanceID varchar(100),
@Reason varchar(max) = NULL
AS
BEGIN
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

BEGIN TRANSACTION

EXEC dt._RewindInstance @InstanceID, @Reason

COMMIT TRANSACTION
END
GO

CREATE OR ALTER PROCEDURE dt.SetGlobalSetting
@Name varchar(300),
Expand Down Expand Up @@ -1252,3 +1266,138 @@ BEGIN
VALUES (@SemanticVersion)
END
GO


CREATE OR ALTER PROCEDURE dt._RewindInstance
@InstanceID varchar(100),
@Reason varchar(max) = NULL
AS
BEGIN
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

-- *** IMPORTANT ***
-- To prevent deadlocks, it is important to maintain consistent table access
-- order across all stored procedures that execute within a transaction.
-- Table order for this sproc: Instances --> (History --> Payloads --> History), NewEvents)

DECLARE @existingStatus varchar(30)
DECLARE @executionID varchar(50)

SELECT TOP 1 @existingStatus = existing.[RuntimeStatus], @executionID = existing.[ExecutionID]
FROM Instances existing WITH (HOLDLOCK)
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

-- Instance IDs can be overwritten only if the orchestration is in a terminal state
IF @existingStatus NOT IN ('Failed')
BEGIN
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
THROW 50001, @msg, 1;
END

DECLARE @eventsInFailure TABLE (
[SequenceNumber] bigint NULL,
[EventType] varchar(40) NULL,
[TaskID] int NULL,
[DataPayloadID] uniqueidentifier NULL)

-- List all events related to failures (ie TaskScheduled/TaskFailed and SubOrchestrationInstanceStarted/SubOrchestrationInstanceFailed couples)
INSERT INTO @eventsInFailure
SELECT h.[SequenceNumber], h.[EventType], h.[TaskID], h.[DataPayloadID]
FROM History h
WHERE h.[TaskHub] = @TaskHub AND h.[InstanceID] = @InstanceID
AND (h.[EventType] IN ('TaskFailed', 'SubOrchestrationInstanceFailed')
OR (h.[EventType] IN ('TaskScheduled', 'SubOrchestrationInstanceStarted') AND EXISTS (SELECT 1
FROM History f
WHERE f.[TaskHub] = @TaskHub AND f.[InstanceID] = @InstanceID AND f.[TaskID] = h.[TaskID] AND f.[EventType] = CASE WHEN h.[EventType] = 'TaskScheduled' THEN 'TaskFailed' ELSE 'SubOrchestrationInstanceFailed' END)))

-- Mark all events related to failure as rewound
UPDATE Payloads
SET [Reason] = CONCAT('Rewound: ', ef.[EventType])
FROM Payloads p
JOIN @eventsInFailure ef ON p.[PayloadID] = ef.[DataPayloadID]
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [SequenceNumber] IN (SELECT [SequenceNumber] FROM @eventsInFailure WHERE [DataPayloadID] IS NOT NULL AND [EventType] IN ('TaskScheduled', 'SubOrchestrationInstanceCreated'))

DECLARE @sequenceNumber bigint
DECLARE @eventType varchar(40)
DECLARE @payloadId uniqueidentifier
DECLARE sequenceNumberCursor CURSOR LOCAL FOR
SELECT [SequenceNumber], [EventType]
FROM @eventsInFailure
WHERE [DataPayloadID] IS NULL

OPEN sequenceNumberCursor
FETCH NEXT FROM sequenceNumberCursor INTO @sequenceNumber, @eventType

WHILE @@FETCH_STATUS = 0 BEGIN
SET @payloadId = NEWID()
INSERT INTO Payloads (
[TaskHub],
[InstanceID],
[PayloadID],
[Reason]
)
VALUES (@TaskHub, @InstanceID, @payloadId, CONCAT('Rewound: ', @eventType))
FETCH NEXT FROM sequenceNumberCursor INTO @sequenceNumber, @eventType
END
CLOSE sequenceNumberCursor
DEALLOCATE sequenceNumberCursor

-- Transform all events related to failure to GenericEvents, except for SubOrchestrationInstanceStarted that can be kept
UPDATE History
SET [EventType] = 'GenericEvent'
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
AND ([SequenceNumber] IN (SELECT [SequenceNumber]
FROM @eventsInFailure WHERE [EventType] <> 'SubOrchestrationInstanceStarted')
OR [RuntimeStatus] = 'Failed')

-- Enumerate instances of sub orchestrations related to SubOrchestrationInstanceFailed events and rewing them recursively
DECLARE @subOrchestrationInstanceID varchar(100)
DECLARE subOrchestrationCursor CURSOR LOCAL FOR
SELECT i.[InstanceID]
FROM dt.Instances i
JOIN dt.History h ON i.[TaskHub] = h.[TaskHub] AND i.[InstanceID] = h.[InstanceID]
JOIN @eventsInFailure e ON e.[TaskID] = h.[TaskID]
WHERE i.[ParentInstanceID] = @InstanceID
AND h.[EventType] = 'ExecutionStarted'
AND e.[EventType] = 'SubOrchestrationInstanceFailed'


OPEN subOrchestrationCursor
FETCH NEXT FROM subOrchestrationCursor INTO @subOrchestrationInstanceID

WHILE @@FETCH_STATUS = 0 BEGIN
EXECUTE dt._RewindInstance @subOrchestrationInstanceID, @Reason
FETCH NEXT FROM subOrchestrationCursor INTO @subOrchestrationInstanceID
END
CLOSE subOrchestrationCursor
DEALLOCATE subOrchestrationCursor

-- Insert a line in NewEvents to ensure orchestration will start
SET @payloadId = NEWID()
INSERT INTO Payloads (
[TaskHub],
[InstanceID],
[PayloadID],
[Text]
)
VALUES (@TaskHub, @InstanceID, @payloadId, @reason)
INSERT INTO NewEvents (
[TaskHub],
[InstanceID],
[ExecutionID],
[EventType],
[PayloadID]
)
VALUES(
@TaskHub,
@InstanceID,
@executionID,
'GenericEvent',
@payloadId)

-- Set orchestration status to Pending
UPDATE Instances
SET [RuntimeStatus] = 'Pending', [LastUpdatedTime] = SYSUTCDATETIME()
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
END
GO
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Scripts/permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ GRANT EXECUTE ON OBJECT::dt.RaiseEvent TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.TerminateInstance TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByID TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByTime TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.RewindInstance TO dt_runtime

-- Internal sprocs
GRANT EXECUTE ON OBJECT::dt._AddOrchestrationEvents TO dt_runtime
Expand All @@ -37,6 +38,7 @@ GRANT EXECUTE ON OBJECT::dt._QueryManyOrchestrations TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._RenewOrchestrationLocks TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._RenewTaskLocks TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._UpdateVersion TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._RewindInstance TO dt_runtime

-- Types
GRANT EXECUTE ON TYPE::dt.HistoryEvents TO dt_runtime
Expand Down
16 changes: 16 additions & 0 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,22 @@ public async Task<IReadOnlyCollection<OrchestrationState>> GetManyOrchestrations
return results;
}

/// <summary>
/// Rewinds an orchestration
/// </summary>
/// <param name="instanceId">Instance id of the orchestration</param>
/// <param name="reason">A reason for rewinding the orchestration.</param>
public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
{
using SqlConnection connection = await this.GetAndOpenConnectionAsync();
using SqlCommand command = this.GetSprocCommand(connection, "dt.RewindInstance");

command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instanceId;
command.Parameters.Add("@Reason", SqlDbType.VarChar).Value = reason;

await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper);
}

/// <summary>
/// Gets a value that represents the recommended instance count for handling the current event and work-item backlogs.
/// </summary>
Expand Down

0 comments on commit 13ad593

Please sign in to comment.