Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Azure Functions support: instance purge, long timers, Linux telemetry #22

Merged
merged 4 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ public SqlDurabilityProvider(
this.durabilityOptions = durabilityOptions;
}

public override bool GuaranteesOrderedDelivery => true;

public override JObject ConfigurationJson => JObject.FromObject(this.durabilityOptions);

public override TimeSpan MaximumDelayTime { get; set; } = TimeSpan.MaxValue;

public override string EventSourceName => "DurableTask-SqlServer";

public override async Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
OrchestrationState? state = await this.service.GetOrchestrationStateAsync(instanceId, executionId: null);
Expand Down Expand Up @@ -163,6 +169,50 @@ public override async Task<OrchestrationStatusQueryResult> GetOrchestrationState
};
}

public override async Task<PurgeHistoryResult> PurgeInstanceHistoryByInstanceId(string instanceId)
{
int deletedInstances = await this.service.PurgeOrchestrationHistoryAsync(new[] { instanceId });
return new PurgeHistoryResult(deletedInstances);
}

/// <summary>
/// Purges the history of orchestrations and entities based on a set of filters.
/// </summary>
/// <remarks>
/// This method will purge at most 1000 instances. The caller can purge more than this by calling the method
/// multiple times, checking for a non-zero return value after each call.
/// </remarks>
/// <param name="createdTimeFrom">The minimum creation time filter. Only instances created after this date are purged.</param>
/// <param name="createdTimeTo">The maximum creation time filter. Only instances created before this date are purged.</param>
/// <param name="runtimeStatus">The set of orchestration status values to filter orchestrations by.</param>
/// <returns>Returns the number of purged instances.</returns>
public override async Task<int> PurgeHistoryByFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
{
var purgeQuery = new SqlOrchestrationQuery
{
PageSize = 1000,
CreatedTimeFrom = createdTimeFrom,
FetchInput = false,
FetchOutput = false,
};

if (createdTimeTo != null)
{
purgeQuery.CreatedTimeTo = createdTimeTo.Value;
}

if (runtimeStatus?.Any() == true)
{
purgeQuery.StatusFilter = new HashSet<OrchestrationStatus>(runtimeStatus);
}

IReadOnlyCollection<OrchestrationState> results = await this.service.GetManyOrchestrationsAsync(purgeQuery, CancellationToken.None);

IEnumerable<string> instanceIds = results.Select(r => r.OrchestrationInstance.InstanceId);
int deletedInstances = await this.service.PurgeOrchestrationHistoryAsync(instanceIds);
return deletedInstances;
}

public override bool TryGetScaleMonitor(
string functionId,
string functionName,
Expand Down
9 changes: 9 additions & 0 deletions src/DurableTask.SqlServer/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ public void ReplicaCountChangeRecommended(int currentCount, int recommendedCount
this.WriteLog(logEvent);
}

public void PurgedInstances(string userId, int purgedInstanceCount, Stopwatch latencyStopwatch)
{
var logEvent = new LogEvents.PurgedInstances(
userId,
purgedInstanceCount,
latencyStopwatch.ElapsedMilliseconds);
this.WriteLog(logEvent);
}

void WriteLog(ILogEvent logEvent)
{
// LogDurableEvent is an extension method defined in DurableTask.Core
Expand Down
18 changes: 18 additions & 0 deletions src/DurableTask.SqlServer/Logging/DefaultEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,23 @@ internal void ReplicaCountChangeRecommended(
AppName,
ExtensionVersion);
}

[Event(EventIds.PurgedInstances, Level = EventLevel.Informational)]
internal void PurgedInstances(
string UserId,
int InstanceCount,
long LatencyMs,
string AppName,
string ExtensionVersion)
{
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.PurgedInstances,
UserId,
InstanceCount,
LatencyMs,
AppName,
ExtensionVersion);
}
}
}
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ static class EventIds
public const int DuplicateExecutionDetected = 307;
public const int TransientDatabaseFailure = 308;
public const int ReplicaCountChangeRecommended = 309;
public const int PurgedInstances = 310;
}
}
36 changes: 36 additions & 0 deletions src/DurableTask.SqlServer/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,5 +379,41 @@ void IEventSourceEvent.WriteEventSource() =>
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}

internal class PurgedInstances : StructuredLogEvent, IEventSourceEvent
{
public PurgedInstances(string userId, int purgedInstanceCount, long latencyMs)
{
this.UserId = userId;
this.InstanceCount = purgedInstanceCount;
this.LatencyMs = latencyMs;
}

[StructuredLogField]
public int InstanceCount { get; }

[StructuredLogField]
public string UserId { get; }

[StructuredLogField]
public long LatencyMs { get; }

public override EventId EventId => new EventId(
EventIds.PurgedInstances,
nameof(EventIds.PurgedInstances));

public override LogLevel Level => LogLevel.Information;

protected override string CreateLogMessage() =>
$"User '{this.UserId}' purged {this.InstanceCount} orchestration instances. Latency = {this.LatencyMs}ms.";

void IEventSourceEvent.WriteEventSource() =>
DefaultEventSource.Log.PurgedInstances(
this.UserId,
this.InstanceCount,
this.LatencyMs,
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}
}
}
6 changes: 4 additions & 2 deletions src/DurableTask.SqlServer/Scripts/drop-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ DROP PROCEDURE IF EXISTS dt.QuerySingleOrchestration
DROP PROCEDURE IF EXISTS dt.RaiseEvent
DROP PROCEDURE IF EXISTS dt.SetGlobalSetting
DROP PROCEDURE IF EXISTS dt.TerminateInstance
DROP PROCEDURE IF EXISTS dt.PurgeInstanceState
DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByID
DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByTime

-- Private sprocs
DROP PROCEDURE IF EXISTS dt._AddOrchestrationEvents
Expand All @@ -41,8 +42,9 @@ DROP TABLE IF EXISTS dt.Payloads
DROP TABLE IF EXISTS dt.GlobalSettings

-- Custom types
DROP TYPE IF EXISTS dt.MessageIDs
DROP TYPE IF EXISTS dt.HistoryEvents
DROP TYPE IF EXISTS dt.InstanceIDs
DROP TYPE IF EXISTS dt.MessageIDs
DROP TYPE IF EXISTS dt.OrchestrationEvents
DROP TYPE IF EXISTS dt.TaskEvents

Expand Down
44 changes: 29 additions & 15 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -407,25 +407,47 @@ END
GO


CREATE OR ALTER PROCEDURE dt.PurgeInstanceState
CREATE OR ALTER PROCEDURE dt.PurgeInstanceStateByID
@InstanceIDs InstanceIDs READONLY
AS
BEGIN
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

BEGIN TRANSACTION

DELETE FROM NewEvents WHERE [TaskHub] = @TaskHub AND [InstanceID] IN (SELECT [InstanceID] FROM @InstanceIDs)
DELETE FROM Instances WHERE [TaskHub] = @TaskHub AND [InstanceID] IN (SELECT [InstanceID] FROM @InstanceIDs)
DECLARE @deletedInstances int = @@ROWCOUNT
DELETE FROM Payloads WHERE [TaskHub] = @TaskHub AND [InstanceID] IN (SELECT [InstanceID] FROM @InstanceIDs)
-- Other relevant tables are expected to be cleaned up via cascade deletes

COMMIT TRANSACTION

-- return the number of deleted instances
RETURN @deletedInstances
END
GO


CREATE OR ALTER PROCEDURE dt.PurgeInstanceStateByTime
@ThresholdTime datetime2,
@FilterType tinyint = 0
AS
BEGIN
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

DECLARE @InstanceIDs TABLE (InstanceID varchar(100))
DECLARE @instanceIDs InstanceIDs

IF @FilterType = 0 -- created time
BEGIN
INSERT INTO @InstanceIDs
INSERT INTO @instanceIDs
SELECT [InstanceID] FROM Instances
WHERE [TaskHub] = @TaskHub AND [RuntimeStatus] IN ('Completed', 'Terminated', 'Failed')
AND [CreatedTime] >= @ThresholdTime
END
ELSE IF @FilterType = 1 -- completed time
BEGIN
INSERT INTO @InstanceIDs
INSERT INTO @instanceIDs
SELECT [InstanceID] FROM Instances
WHERE [TaskHub] = @TaskHub AND [RuntimeStatus] IN ('Completed', 'Terminated', 'Failed')
AND [CompletedTime] >= @ThresholdTime
Expand All @@ -436,17 +458,9 @@ BEGIN
THROW 50000, @msg, 1;
END

BEGIN TRANSACTION

DELETE FROM NewEvents WHERE [TaskHub] = @TaskHub AND [InstanceID] IN (SELECT [InstanceID] FROM @InstanceIDs)
DELETE FROM Instances WHERE [TaskHub] = @TaskHub AND [InstanceID] IN (SELECT [InstanceID] FROM @InstanceIDs)
DELETE FROM Payloads WHERE [TaskHub] = @TaskHub AND [InstanceID] IN (SELECT [InstanceID] FROM @InstanceIDs)
-- Other relevant tables are expected to be cleaned up via cascade deletes

COMMIT TRANSACTION

-- return the number of deleted instances
RETURN (SELECT COUNT(*) FROM @InstanceIDs)
DECLARE @deletedInstances int
EXECUTE @deletedInstances = dt.PurgeInstanceStateByID @instanceIDs
RETURN @deletedInstances
END
GO

Expand Down
4 changes: 3 additions & 1 deletion src/DurableTask.SqlServer/Scripts/permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ GRANT EXECUTE ON OBJECT::dt.GetInstanceHistory TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.QuerySingleOrchestration TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.RaiseEvent TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.TerminateInstance TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.PurgeInstanceState TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByID TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByTime TO dt_runtime

-- Internal sprocs
GRANT EXECUTE ON OBJECT::dt._AddOrchestrationEvents TO dt_runtime
Expand All @@ -40,6 +41,7 @@ GRANT EXECUTE ON OBJECT::dt._UpdateVersion TO dt_runtime
-- Types
GRANT EXECUTE ON TYPE::dt.HistoryEvents TO dt_runtime
GRANT EXECUTE ON TYPE::dt.MessageIDs TO dt_runtime
GRANT EXECUTE ON TYPE::dt.InstanceIDs TO dt_runtime
GRANT EXECUTE ON TYPE::dt.OrchestrationEvents TO dt_runtime
GRANT EXECUTE ON TYPE::dt.TaskEvents TO dt_runtime

Expand Down
8 changes: 7 additions & 1 deletion src/DurableTask.SqlServer/Scripts/schema-0.2.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name = 'dt')
EXEC('CREATE SCHEMA dt');

-- Create custom types
IF TYPE_ID(N'dt.InstanceIDs') IS NULL
CREATE TYPE dt.InstanceIDs AS TABLE (
[InstanceID] varchar(100) NOT NULL
)
GO

IF TYPE_ID(N'dt.MessageIDs') IS NULL
-- WARNING: Reordering fields is a breaking change!
CREATE TYPE dt.MessageIDs AS TABLE (
Expand Down Expand Up @@ -82,7 +88,7 @@ GO

-- Rule #1: Use varchar instead of nvarchar
-- Rule #2: Do not use varchar(MAX) except in the Payloads table
-- Rule #3: Try to follow existing nameing and ordering conventions
-- Rule #3: Try to follow existing naming and ordering conventions

IF OBJECT_ID(N'dt.Versions', 'U') IS NULL
BEGIN
Expand Down
33 changes: 32 additions & 1 deletion src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ public class SqlOrchestrationService : OrchestrationServiceBase
readonly LogHelper traceHelper;
readonly SqlDbManager dbManager;
readonly string lockedByValue;
readonly string userId;

public SqlOrchestrationService(SqlOrchestrationServiceSettings? settings)
{
this.settings = ValidateSettings(settings) ?? throw new ArgumentNullException(nameof(settings));
this.traceHelper = new LogHelper(this.settings.LoggerFactory.CreateLogger("DurableTask.SqlServer"));
this.dbManager = new SqlDbManager(this.settings, this.traceHelper);
this.lockedByValue = $"{this.settings.AppName}|{Process.GetCurrentProcess().Id}";
this.userId = new SqlConnectionStringBuilder(this.settings.TaskHubConnectionString).UserID ?? string.Empty;
}

public override int MaxConcurrentTaskOrchestrationWorkItems => this.settings.MaxActiveOrchestrations;
Expand Down Expand Up @@ -549,12 +551,41 @@ public override async Task ForceTerminateTaskOrchestrationAsync(string instanceI
await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper, instanceId);
}

public async Task<int> PurgeOrchestrationHistoryAsync(IEnumerable<string> instanceIds)
{
if (instanceIds?.Any() != true)
{
return 0;
}

using SqlConnection connection = await this.GetAndOpenConnectionAsync();
using SqlCommand command = this.GetSprocCommand(connection, "dt.PurgeInstanceStateByID");

SqlParameter instancesDeletedReturnValue = command.Parameters.Add("@InstancesDeleted", SqlDbType.Int);
instancesDeletedReturnValue.Direction = ParameterDirection.ReturnValue;

command.Parameters.AddInstanceIDsParameter("@InstanceIDs", instanceIds);

Stopwatch latencyStopwatch = Stopwatch.StartNew();
await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper);
int purgedInstanceCount = (int)instancesDeletedReturnValue.Value;
if (purgedInstanceCount > 0)
{
this.traceHelper.PurgedInstances(
this.userId,
purgedInstanceCount,
latencyStopwatch);
}

return purgedInstanceCount;
}

public override async Task PurgeOrchestrationHistoryAsync(
DateTime thresholdDateTimeUtc,
OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
using SqlConnection connection = await this.GetAndOpenConnectionAsync();
using SqlCommand command = this.GetSprocCommand(connection, "dt.PurgeInstanceState");
using SqlCommand command = this.GetSprocCommand(connection, "dt.PurgeInstanceStateByTime");

command.Parameters.Add("@ThresholdTime", SqlDbType.DateTime2).Value = thresholdDateTimeUtc;
command.Parameters.Add("@FilterType", SqlDbType.TinyInt).Value = (int)timeRangeFilterType;
Expand Down
22 changes: 22 additions & 0 deletions src/DurableTask.SqlServer/SqlUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DurableTask.SqlServer
{
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Data.SqlTypes;
Expand Down Expand Up @@ -348,6 +349,27 @@ static DateTime GetUtcDateTime(DbDataReader reader, int ordinal)
return DateTime.SpecifyKind(reader.GetDateTime(ordinal), DateTimeKind.Utc);
}

public static SqlParameter AddInstanceIDsParameter(
this SqlParameterCollection commandParameters,
string paramName,
IEnumerable<string> instanceIds)
{
static IEnumerable<SqlDataRecord> GetInstanceIdRecords(IEnumerable<string> instanceIds)
{
var record = new SqlDataRecord(new SqlMetaData("InstanceID", SqlDbType.VarChar, maxLength: 100));
foreach (string instanceId in instanceIds)
{
record.SetString(0, instanceId);
yield return record;
}
}

SqlParameter param = commandParameters.Add(paramName, SqlDbType.Structured);
param.TypeName = "dt.InstanceIDs";
param.Value = instanceIds.Any() ? GetInstanceIdRecords(instanceIds) : null;
return param;
}

public static Task<DbDataReader> ExecuteReaderAsync(
DbCommand command,
LogHelper traceHelper,
Expand Down
Loading