Skip to content

Commit

Permalink
Versioning support for activity tasks (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
usemam authored Apr 6, 2021
1 parent def9b32 commit 6257830
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/DTUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public static bool TryGetPayloadText(HistoryEvent e, out string? payloadText)
return historyEvent.EventType switch
{
EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version,
EventType.TaskScheduled => ((TaskScheduledEvent)historyEvent).Version,
_ => null,
};
}
Expand Down
7 changes: 5 additions & 2 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,8 @@ BEGIN
[VisibleTime],
[LockedBy],
[LockExpiration],
[PayloadID]
[PayloadID],
[Version]
)
OUTPUT
INSERTED.[SequenceNumber],
Expand All @@ -827,7 +828,8 @@ BEGIN
[VisibleTime],
[LockedBy],
[LockExpiration],
[PayloadID]
[PayloadID],
[Version]
FROM @NewTaskEvents

COMMIT TRANSACTION
Expand Down Expand Up @@ -999,6 +1001,7 @@ BEGIN
[VisibleTime],
[Timestamp],
[DequeueCount],
[Version],
(SELECT TOP 1 [Text] FROM Payloads P WHERE
P.[TaskHub] = @TaskHub AND
P.[InstanceID] = N.[InstanceID] AND
Expand Down
4 changes: 3 additions & 1 deletion src/DurableTask.SqlServer/Scripts/schema-0.2.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ IF TYPE_ID(N'dt.TaskEvents') IS NULL
[LockExpiration] datetime2 NULL,
[Reason] varchar(max) NULL,
[PayloadText] varchar(max) NULL,
[PayloadID] uniqueidentifier NULL
[PayloadID] uniqueidentifier NULL,
[Version] varchar(100) NULL
)
GO

Expand Down Expand Up @@ -193,6 +194,7 @@ IF OBJECT_ID(N'dt.NewTasks', 'U') IS NULL
[LockedBy] varchar(100) NULL,
[LockExpiration] datetime2 NULL,
[PayloadID] uniqueidentifier NULL,
[Version] varchar(100) NULL,

CONSTRAINT PK_NewTasks PRIMARY KEY (TaskHub, SequenceNumber),
CONSTRAINT FK_NewTasks_Instances FOREIGN KEY (TaskHub, InstanceID) REFERENCES dt.Instances(TaskHub, InstanceID) ON DELETE CASCADE,
Expand Down
4 changes: 4 additions & 0 deletions src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static class TaskEventSqlType
new SqlMetaData("Reason", SqlDbType.VarChar, -1 /* max */),
new SqlMetaData("PayloadText", SqlDbType.VarChar, -1 /* max */),
new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier),
new SqlMetaData("Version", SqlDbType.VarChar, 100),
};

static class ColumnOrdinals
Expand All @@ -46,6 +47,7 @@ static class ColumnOrdinals
public const int Reason = 8;
public const int PayloadText = 9;
public const int PayloadId = 10;
public const int Version = 11;
}

public static SqlParameter AddTaskEventsParameter(
Expand Down Expand Up @@ -119,6 +121,8 @@ static SqlDataRecord PopulateTaskMessageRecord(TaskMessage msg, SqlDataRecord re
// Optionally, the LockedBy and LockExpiration fields can be specified
// to pre-lock task work items for this particular node.

record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event));

return record;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.SqlServer/SqlUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch
{
Input = GetPayloadText(reader),
Name = GetName(reader),
Version = null, // TODO
Version = GetVersion(reader),
};
break;
case EventType.TimerCreated:
Expand Down
25 changes: 25 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -456,5 +456,30 @@ public async Task VersionedSubOrchestration()
});
await parentInstance.WaitForCompletion(waitTimeout, expectedOutput: version1 + version2);
}

[Fact]
public async Task VersionedActivity()
{
string activityName = "VersionedActivityTest";
string version1 = "V1";
string version2 = "V2";
var waitTimeout = TimeSpan.FromSeconds(30);

this.testService.RegisterInlineActivity(
activityName, version1, TestService.MakeActivity<string, string>((ctx, input) => version1));
this.testService.RegisterInlineActivity(
activityName, version2, TestService.MakeActivity<string, string>((ctx, input) => version2));

var instance = await this.testService.RunOrchestration<string, string>(
null,
"OrchestrationWithVersionedActivities",
implementation: async (ctx, input) =>
{
var result1 = await ctx.ScheduleTask<string>(activityName, version1, input);
var result2 = await ctx.ScheduleTask<string>(activityName, version2, input);
return result1 + result2;
});
await instance.WaitForCompletion(waitTimeout, expectedOutput: version1 + version2);
}
}
}
11 changes: 6 additions & 5 deletions test/DurableTask.SqlServer.Tests/Utils/TestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public async Task<List<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>

foreach ((string name, TaskActivity activity) in activities)
{
this.worker.AddTaskActivities(new TestObjectCreator<TaskActivity>(name, activity));
RegisterInlineActivity(name, string.Empty, activity);
}

DateTime utcNow = DateTime.UtcNow;
Expand All @@ -154,6 +154,11 @@ public async Task<List<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>
return instances;
}

public void RegisterInlineActivity(string name, string version, TaskActivity activity)
{
this.worker.AddTaskActivities(new TestObjectCreator<TaskActivity>(name, version, activity));
}

public void RegisterInlineOrchestration<TOutput, TInput>(
string orchestrationName,
string version,
Expand Down Expand Up @@ -364,10 +369,6 @@ class TestObjectCreator<T> : ObjectCreator<T>
{
readonly T obj;

public TestObjectCreator(string name, T obj) : this(name, string.Empty, obj)
{
}

public TestObjectCreator(string name, string version, T obj)
{
this.Name = name;
Expand Down

0 comments on commit 6257830

Please sign in to comment.