Skip to content

Commit

Permalink
Versioning support for orchestrations (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
usemam committed Mar 16, 2021
1 parent 3df2541 commit 4ff7b2f
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 20 deletions.
9 changes: 9 additions & 0 deletions src/DurableTask.SqlServer/DTUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,14 @@ public static bool TryGetPayloadText(HistoryEvent e, out string? payloadText)
_ => null,
};
}

public static string? GetVersion(HistoryEvent historyEvent)
{
return historyEvent.EventType switch
{
EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version,
_ => null,
};
}
}
}
35 changes: 29 additions & 6 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ AS
I.[InstanceID],
I.[ExecutionID],
I.[Name],
I.[Version],
I.[CreatedTime],
I.[LastUpdatedTime],
I.[CompletedTime],
Expand Down Expand Up @@ -84,6 +85,7 @@ GO

CREATE OR ALTER PROCEDURE dt.CreateInstance
@Name varchar(300),
@Version varchar(100),
@InstanceID varchar(100) = NULL,
@ExecutionID varchar(50) = NULL,
@InputText varchar(MAX) = NULL,
Expand Down Expand Up @@ -135,13 +137,15 @@ BEGIN

INSERT INTO Instances (
[Name],
[Version],
[TaskHub],
[InstanceID],
[ExecutionID],
[RuntimeStatus],
[InputPayloadID])
VALUES (
@Name,
@Version,
@TaskHub,
@InstanceID,
@ExecutionID,
Expand Down Expand Up @@ -179,8 +183,13 @@ CREATE OR ALTER PROCEDURE dt.GetInstanceHistory
AS
BEGIN
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()
DECLARE @ParentInstanceID varchar(100) = (
SELECT [ParentInstanceID] FROM Instances WHERE [InstanceID] = @InstanceID)
DECLARE @ParentInstanceID varchar(100)
DECLARE @Version varchar(100)

SELECT
@ParentInstanceID = [ParentInstanceID],
@Version = [Version]
FROM Instances WHERE [InstanceID] = @InstanceID

SELECT
H.[InstanceID],
Expand All @@ -196,7 +205,8 @@ BEGIN
P.[Reason],
(CASE WHEN @GetInputsAndOutputs = 0 THEN NULL ELSE P.[Text] END) AS [PayloadText],
[PayloadID],
@ParentInstanceID as [ParentInstanceID]
@ParentInstanceID as [ParentInstanceID],
@Version as [Version]
FROM History H WITH (INDEX (PK_History))
LEFT OUTER JOIN Payloads P ON
P.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -233,12 +243,14 @@ BEGIN
[InstanceID],
[ExecutionID],
[Name],
[Version],
[RuntimeStatus])
SELECT
@TaskHub,
@InstanceID,
NEWID(),
SUBSTRING(@InstanceID, 2, CHARINDEX('@', @InstanceID, 2) - 2),
'',
'Pending'
WHERE LEFT(@InstanceID, 1) = '@' AND CHARINDEX('@', @InstanceID, 2) > 2

Expand Down Expand Up @@ -413,6 +425,7 @@ BEGIN
DECLARE @now datetime2 = SYSUTCDATETIME()
DECLARE @instanceID varchar(100)
DECLARE @parentInstanceID varchar(100)
DECLARE @version varchar(100)
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

BEGIN TRANSACTION
Expand All @@ -430,7 +443,8 @@ BEGIN
[LockedBy] = @LockedBy,
[LockExpiration] = @LockExpiration,
@instanceID = I.[InstanceID],
@parentInstanceID = I.[ParentInstanceID]
@parentInstanceID = I.[ParentInstanceID],
@version = I.[Version]
FROM
dt.Instances I WITH (READPAST) INNER JOIN NewEvents E WITH (READPAST) ON
E.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -459,7 +473,8 @@ BEGIN
P.[Text] AS [PayloadText],
P.[PayloadID],
DATEDIFF(millisecond, [Timestamp], @now) AS [WaitTime],
@parentInstanceID as [ParentInstanceID]
@parentInstanceID as [ParentInstanceID],
@version as [Version]
FROM NewEvents N
LEFT OUTER JOIN dt.[Payloads] P ON
P.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -491,7 +506,8 @@ BEGIN
-- Optimization: Do not load the data payloads for these history events - they are not needed since they are never replayed
(CASE WHEN [EventType] IN ('TaskScheduled', 'SubOrchestrationInstanceCreated') THEN NULL ELSE P.[Text] END) AS [PayloadText],
[PayloadID],
@parentInstanceID as [ParentInstanceID]
@parentInstanceID as [ParentInstanceID],
@version as [Version]
FROM History H WITH (INDEX (PK_History))
LEFT OUTER JOIN Payloads P ON
P.[TaskHub] = @TaskHub AND
Expand Down Expand Up @@ -639,12 +655,14 @@ BEGIN
[InstanceID],
[ExecutionID],
[Name],
[Version],
[RuntimeStatus])
SELECT DISTINCT
@TaskHub,
E.[InstanceID],
NEWID(),
SUBSTRING(E.[InstanceID], 2, CHARINDEX('@', E.[InstanceID], 2) - 2),
'',
'Pending'
FROM @NewOrchestrationEvents E
WHERE LEFT(E.[InstanceID], 1) = '@'
Expand All @@ -662,13 +680,15 @@ BEGIN
[InstanceID],
[ExecutionID],
[Name],
[Version],
[ParentInstanceID],
[RuntimeStatus])
SELECT DISTINCT
@TaskHub,
E.[InstanceID],
E.[ExecutionID],
E.[Name],
E.[Version],
E.[ParentInstanceID],
'Pending'
FROM @NewOrchestrationEvents E
Expand Down Expand Up @@ -810,12 +830,14 @@ BEGIN
[InstanceID],
[ExecutionID],
[Name],
[Version],
[RuntimeStatus])
SELECT DISTINCT
@TaskHub,
E.[InstanceID],
NEWID(),
SUBSTRING(E.[InstanceID], 2, CHARINDEX('@', E.[InstanceID], 2) - 2),
'',
'Pending'
FROM @NewOrchestrationEvents E
WHERE NOT EXISTS (
Expand Down Expand Up @@ -880,6 +902,7 @@ BEGIN
I.[InstanceID],
I.[ExecutionID],
I.[Name],
I.[Version],
I.[CreatedTime],
I.[LastUpdatedTime],
I.[CompletedTime],
Expand Down
7 changes: 5 additions & 2 deletions src/DurableTask.SqlServer/Scripts/schema-0.2.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ IF TYPE_ID(N'dt.HistoryEvents') IS NULL
[Reason] varchar(max) NULL,
[PayloadText] varchar(max) NULL,
[PayloadID] uniqueidentifier NULL,
[ParentInstanceID] varchar(100) NULL
[ParentInstanceID] varchar(100) NULL,
[Version] varchar(100) NULL
)
GO

Expand All @@ -54,7 +55,8 @@ IF TYPE_ID(N'dt.OrchestrationEvents') IS NULL
[Reason] varchar(max) NULL,
[PayloadText] varchar(max) NULL,
[PayloadID] uniqueidentifier NULL,
[ParentInstanceID] varchar(100) NULL
[ParentInstanceID] varchar(100) NULL,
[Version] varchar(100) NULL
)
GO

Expand Down Expand Up @@ -112,6 +114,7 @@ BEGIN
[InstanceID] varchar(100) NOT NULL,
[ExecutionID] varchar(50) NOT NULL CONSTRAINT DF_Instances_ExecutionID DEFAULT (NEWID()), -- expected to be system generated
[Name] varchar(300) NOT NULL, -- the type name of the orchestration or entity
[Version] varchar(100) NOT NULL, -- the version of the orchestration
[CreatedTime] datetime2 NOT NULL CONSTRAINT DF_Instances_CreatedTime DEFAULT SYSUTCDATETIME(),
[LastUpdatedTime] datetime2 NULL,
[CompletedTime] datetime2 NULL,
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.SqlServer/SqlDbManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task CreateOrUpgradeSchemaAsync(bool recreateIfExists)
if (await reader.ReadAsync())
{
// The first result contains the latest version
currentSchemaVersion = SqlUtils.GetVersion(reader);
currentSchemaVersion = SqlUtils.GetSemanticVersion(reader);
if (currentSchemaVersion >= DTUtils.ExtensionVersion)
{
// The schema is already up-to-date.
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ public override async Task CreateTaskOrchestrationAsync(TaskMessage creationMess
ExecutionStartedEvent startEvent = (ExecutionStartedEvent)creationMessage.Event;
OrchestrationInstance instance = startEvent.OrchestrationInstance;
command.Parameters.Add("@Name", SqlDbType.VarChar, size: 300).Value = startEvent.Name;
command.Parameters.Add("@Version", SqlDbType.VarChar, size: 100).Value = startEvent.Version;
command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instance.InstanceId;
command.Parameters.Add("@ExecutionID", SqlDbType.VarChar, size: 50).Value = instance.ExecutionId;
command.Parameters.Add("@InputText", SqlDbType.VarChar).Value = startEvent.Input;
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/SqlTypes/HistoryEventSqlType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static class HistoryEventSqlType
new SqlMetaData("PayloadText", SqlDbType.VarChar, -1 /* max */),
new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier),
new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100),
new SqlMetaData("Version", SqlDbType.VarChar, 100),
};

static class ColumnOrdinals
Expand All @@ -50,6 +51,7 @@ static class ColumnOrdinals
public const int PayloadText = 11;
public const int PayloadID = 12;
public const int ParentInstanceID = 13;
public const int Version = 14;
};

public static SqlParameter AddHistoryEventsParameter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static class OrchestrationEventSqlType
new SqlMetaData("PayloadText", SqlDbType.VarChar, -1 /* max */),
new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier),
new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100),
new SqlMetaData("Version", SqlDbType.VarChar, 100),
};

static class ColumnOrdinals
Expand All @@ -48,6 +49,7 @@ static class ColumnOrdinals
public const int PayloadText = 9;
public const int PayloadId = 10;
public const int ParentInstanceID = 11;
public const int Version = 12;
}

public static SqlParameter AddOrchestrationEventsParameter(
Expand Down Expand Up @@ -135,6 +137,7 @@ static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord
payloadText.IsNull && reason.IsNull ? SqlGuid.Null : new SqlGuid(Guid.NewGuid()));

record.SetSqlString(ColumnOrdinals.ParentInstanceID, SqlUtils.GetParentInstanceId(msg.Event));
record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event));

return record;
}
Expand Down
16 changes: 14 additions & 2 deletions src/DurableTask.SqlServer/SqlUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch
ExecutionId = GetExecutionId(reader),
},
Tags = null, // TODO
Version = null, // TODO
Version = GetVersion(reader),
};
string? parentInstanceId = GetParentInstanceId(reader);
if (parentInstanceId != null)
Expand Down Expand Up @@ -190,6 +190,7 @@ public static OrchestrationState GetOrchestrationState(this DbDataReader reader)
Input = reader.GetStringOrNull(reader.GetOrdinal("InputText")),
LastUpdatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("LastUpdatedTime")) ?? default,
Name = GetName(reader),
Version = GetVersion(reader),
OrchestrationInstance = new OrchestrationInstance
{
InstanceId = GetInstanceId(reader),
Expand Down Expand Up @@ -267,12 +268,23 @@ public static SqlString GetParentInstanceId(HistoryEvent historyEvent)
return reader.IsDBNull(ordinal) ? (Guid?)null : reader.GetGuid(ordinal);
}

public static SemanticVersion GetVersion(DbDataReader reader)
public static SemanticVersion GetSemanticVersion(DbDataReader reader)
{
string versionString = reader.GetString("SemanticVersion");
return SemanticVersion.Parse(versionString);
}

public static SqlString GetVersion(HistoryEvent historyEvent)
{
return DTUtils.GetVersion(historyEvent) ?? SqlString.Null;
}

public static string? GetVersion(DbDataReader reader)
{
int ordinal = reader.GetOrdinal("Version");
return reader.IsDBNull(ordinal) ? null : reader.GetString(ordinal);
}

internal static SqlString GetReason(HistoryEvent historyEvent)
{
return historyEvent.EventType switch
Expand Down
49 changes: 49 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public async Task ActivityChain(int parallelCount)
parallelCount,
_ => null,
orchestrationName: "OrchestrationsWithActivityChain",
version: string.Empty,
implementation: async (ctx, _) =>
{
int value = 0;
Expand Down Expand Up @@ -407,5 +408,53 @@ public async Task SubOrchestration()
await testInstance.WaitForCompletion(
timeout: TimeSpan.FromSeconds(15), expectedOutput: 15);
}

[Fact]
public async Task VersionedOrchestration()
{
string orchestrationName = "VersionedOrchestrationTest";
string version1 = "V1";
string version2 = "V2";
var waitTimeout = TimeSpan.FromSeconds(15);

TestInstance<string> v1Instance = await this.testService.RunOrchestration<string, string>(
null,
orchestrationName,
version: version1,
implementation: (ctx, input) => Task.FromResult(version1));
await v1Instance.WaitForCompletion(waitTimeout, expectedOutput: version1);

TestInstance<string> v2Instance = await this.testService.RunOrchestration<string, string>(
null,
orchestrationName,
version: version2,
implementation: (ctx, input) => Task.FromResult(version2));
await v2Instance.WaitForCompletion(waitTimeout, expectedOutput: version2);
}

[Fact]
public async Task VersionedSubOrchestration()
{
string subOrchestrationName = "VersionedSubOrchestrationTest";
string version1 = "V1";
string version2 = "V2";
var waitTimeout = TimeSpan.FromSeconds(30);

this.testService.RegisterInlineOrchestration<string, string>(
subOrchestrationName, version1, implementation: (ctx, input) => Task.FromResult(version1));
this.testService.RegisterInlineOrchestration<string, string>(
subOrchestrationName, version2, implementation: (ctx, input) => Task.FromResult(version2));

TestInstance<string> parentInstance = await this.testService.RunOrchestration<string, string>(
null,
"ParentOrchestration",
implementation: async (ctx, input) =>
{
var result1 = await ctx.CreateSubOrchestrationInstance<string>(subOrchestrationName, version1, null);
var result2 = await ctx.CreateSubOrchestrationInstance<string>(subOrchestrationName, version2, null);
return result1 + result2;
});
await parentInstance.WaitForCompletion(waitTimeout, expectedOutput: version1 + version2);
}
}
}
1 change: 1 addition & 0 deletions test/DurableTask.SqlServer.Tests/Integration/PurgeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public async Task PurgesInstancesByStatus(OrchestrationStateTimeRangeFilterType
count: 30, // ideally some multiple of 3
inputGenerator: i=> $"Hello, world {i}",
orchestrationName: "SimpleDelay",
version: string.Empty,
implementation: async (ctx, input) =>
{
var tcs = new TaskCompletionSource<bool>();
Expand Down
Loading

0 comments on commit 4ff7b2f

Please sign in to comment.