Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous reads for improved performance for large payloads #134

Merged
merged 7 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.1.1

### Updates

* Synchronous reads for improved performance for large payloads ([#134](https://github.com/microsoft/durabletask-mssql/pull/134)) - contributed by [@bhugot](https://github.com/bhugot)

## v1.1.0

### New
Expand Down
42 changes: 27 additions & 15 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ public override Task DeleteAsync(bool deleteInstanceStore)
int longestWaitTime = 0;
var messages = new List<TaskMessage>(capacity: batchSize);
var eventPayloadMappings = new EventPayloadMap(capacity: batchSize);
while (await reader.ReadAsync(cancellationToken))

// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
while (reader.Read())
{
cancellationToken.ThrowIfCancellationRequested();

TaskMessage message = reader.GetTaskMessage();
messages.Add(message);
Guid? payloadId = reader.GetPayloadId();
Expand Down Expand Up @@ -187,7 +191,8 @@ public override Task DeleteAsync(bool deleteInstanceStore)
// Result #2: The runtime status of the orchestration instance
if (await reader.NextResultAsync(cancellationToken))
{
bool instanceExists = await reader.ReadAsync(cancellationToken);
// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
bool instanceExists = reader.Read();
string instanceId;
OrchestrationStatus? currentStatus;

Expand Down Expand Up @@ -252,7 +257,7 @@ await SqlUtils.ExecuteNonQueryAsync(
IList<HistoryEvent> history;
if (await reader.NextResultAsync(cancellationToken))
{
history = await ReadHistoryEventsAsync(reader, executionIdFilter: null, cancellationToken);
history = ReadHistoryEvents(reader, executionIdFilter: null, cancellationToken);
}
else
{
Expand Down Expand Up @@ -402,11 +407,13 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
// removes the need for a DB access and also ensures that a work-item can't spam the error logs in a tight loop.
public override Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem) => Task.CompletedTask;

public override async Task<TaskActivityWorkItem?> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
public override async Task<TaskActivityWorkItem?> LockNextTaskActivityWorkItem(
TimeSpan receiveTimeout,
CancellationToken shutdownCancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
while (!shutdownCancellationToken.IsCancellationRequested)
{
using SqlConnection connection = await this.GetAndOpenConnectionAsync();
using SqlConnection connection = await this.GetAndOpenConnectionAsync(shutdownCancellationToken);
using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._LockNextTask");

DateTime lockExpiration = DateTime.UtcNow.Add(this.settings.WorkItemLockTimeout);
Expand All @@ -418,10 +425,10 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
command,
this.traceHelper,
instanceId: null,
cancellationToken);
if (!await reader.ReadAsync())
shutdownCancellationToken);
if (!await reader.ReadAsync(shutdownCancellationToken))
{
await this.activityBackoffHelper.WaitAsync(cancellationToken);
await this.activityBackoffHelper.WaitAsync(shutdownCancellationToken);
continue;
}

Expand Down Expand Up @@ -596,7 +603,8 @@ public override async Task<OrchestrationState> WaitForOrchestrationAsync(
instanceId,
cancellationToken);

if (await reader.ReadAsync(cancellationToken))
// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
if (reader.Read())
{
OrchestrationState state = reader.GetOrchestrationState();
return state;
Expand All @@ -615,18 +623,22 @@ public override async Task<string> GetOrchestrationHistoryAsync(string instanceI

using DbDataReader reader = await SqlUtils.ExecuteReaderAsync(command, this.traceHelper, instanceId);

List<HistoryEvent> history = await ReadHistoryEventsAsync(reader, executionIdFilter);
List<HistoryEvent> history = ReadHistoryEvents(reader, executionIdFilter);
return JsonConvert.SerializeObject(history);
}

static async Task<List<HistoryEvent>> ReadHistoryEventsAsync(
static List<HistoryEvent> ReadHistoryEvents(
DbDataReader reader,
string? executionIdFilter = null,
CancellationToken cancellationToken = default)
{
var history = new List<HistoryEvent>(capacity: 128);
while (await reader.ReadAsync(cancellationToken))

// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
while (reader.Read())
{
cancellationToken.ThrowIfCancellationRequested();

string executionId = SqlUtils.GetExecutionId(reader)!;
HistoryEvent e = reader.GetHistoryEvent(isOrchestrationHistory: true);
if (executionIdFilter == null)
Expand Down Expand Up @@ -820,7 +832,7 @@ public async Task<IReadOnlyCollection<OrchestrationState>> GetManyOrchestrations
cancellationToken);

var results = new List<OrchestrationState>(query.PageSize);
while (await reader.ReadAsync(cancellationToken))
while (!cancellationToken.IsCancellationRequested && reader.Read())
{
OrchestrationState state = reader.GetOrchestrationState();
results.Add(state);
Expand Down Expand Up @@ -872,7 +884,7 @@ public async Task<int> GetRecommendedReplicaCountAsync(int? currentReplicaCount
command.Parameters.Add("@MaxConcurrentOrchestrations", SqlDbType.Int).Value = this.MaxConcurrentTaskOrchestrationWorkItems;
command.Parameters.Add("@MaxConcurrentActivities", SqlDbType.Int).Value = this.MaxConcurrentTaskActivityWorkItems;

int recommendedReplicaCount = (int)await command.ExecuteScalarAsync();
int recommendedReplicaCount = (int)await command.ExecuteScalarAsync(cancellationToken);
if (currentReplicaCount != null && currentReplicaCount != recommendedReplicaCount)
{
this.traceHelper.ReplicaCountChangeRecommended(currentReplicaCount.Value, recommendedReplicaCount);
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>1</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
2 changes: 1 addition & 1 deletion test/DurableTask.SqlServer.AzureFunctions.Tests/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static async Task<DurableOrchestrationStatus> WaitForStartAsync(
return status;
}

await Task.Delay(TimeSpan.FromMilliseconds(500));
await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
}

cancellationToken.ThrowIfCancellationRequested();
Expand Down
44 changes: 42 additions & 2 deletions test/DurableTask.SqlServer.Tests/Integration/StressTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace DurableTask.SqlServer.Tests.Integration
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.SqlServer.Tests.Utils;
using Xunit;
Expand Down Expand Up @@ -32,13 +33,13 @@ public StressTests(ITestOutputHelper output)
// This test has previously been used to uncover various deadlock issues by stressing the code paths
// related to foreign keys that point to the Instances and Payloads tables.
// Example: https://github.com/microsoft/durabletask-mssql/issues/45
[Theory]
[Theory(Timeout = 1_000_000)]
[InlineData(10)]
[InlineData(2000)]
public async Task ParallelSubOrchestrations(int subOrchestrationCount)
{
const string SubOrchestrationName = "SubOrchestration";

this.testService.RegisterInlineOrchestration<DateTime, string>(
orchestrationName: SubOrchestrationName,
version: "",
Expand Down Expand Up @@ -72,5 +73,44 @@ public async Task ParallelSubOrchestrations(int subOrchestrationCount)
// On slower CI machines, this test could take several minutes to complete.
await testInstance.WaitForCompletion(TimeSpan.FromMinutes(5));
}

[Theory(Timeout = 100_000)]
[InlineData(10)]
public async Task ParallelWithBigPayload(int subOrchestrationCount)
{
const string SubOrchestrationName = "SubOrchestration";
string bigString = string.Join("", Enumerable.Range(0, 1024 * 1024 * 10).Select(x => "1"));

this.testService.RegisterInlineOrchestration<DateTime, string>(
orchestrationName: SubOrchestrationName,
version: "",
implementation: async (ctx, input) =>
{
await ctx.CreateTimer(DateTime.MinValue, input);
return ctx.CurrentUtcDateTime;
});

TestInstance<int> testInstance = await this.testService.RunOrchestration(
input: 1,
orchestrationName: nameof(ParallelSubOrchestrations),
implementation: async (ctx, input) =>
{
var listInstances = new List<Task<DateTime>>();
for (int i = 0; i < subOrchestrationCount; i++)
{
Task<DateTime> instance = ctx.CreateSubOrchestrationInstance<DateTime>(
name: SubOrchestrationName,
version: "",
instanceId: $"suborchestration[{i}]",
input: $"{i}-{bigString}");
listInstances.Add(instance);
}

DateTime[] results = await Task.WhenAll(listInstances);
return new List<DateTime>(results);
});

await testInstance.WaitForCompletion(TimeSpan.FromMinutes(1));
}
}
}