Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process events for non-running orchestrations #97

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Changelog

## v1.0.0
## v1.0.0-rc2

### New

Expand All @@ -12,7 +12,9 @@
* Removed unnecessary .NET Standard 2.1 target ([#82](https://github.com/microsoft/durabletask-mssql/pull/82))
* Fixed problem terminating orchestration with running activity ([#83](https://github.com/microsoft/durabletask-mssql/pull/83))
* Fixed payload data leak for completed activities (same PR as above)
* Fixed NewEvents leak for completed or continued-as-new instances ([#97](https://github.com/microsoft/durabletask-mssql/pull/97))
* Activity payload IDs are now consistently saved to the history table ([#90](https://github.com/microsoft/durabletask-mssql/issues/90))
* Remove Microsoft.SqlServer.SqlManagementObjects dependency ([#92](https://github.com/microsoft/durabletask-mssql/pull/92)) - contributed by [@IGx89](https://github.com/IGx89)

### Breaking changes

Expand Down
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ steps:
vsVersion: 'latest'
logFileVerbosity: minimal
configuration: Release
msbuildArgs: /p:GITHUB_RUN_NUMBER=$(Build.BuildId) /p:ContinuousIntegrationBuild=true
msbuildArgs: /p:FileVersionRevision=$(Build.BuildId) /p:ContinuousIntegrationBuild=true

# Authenticode sign all the DLLs with the Microsoft certificate.
# This appears to be an in-place signing job, which is convenient.
Expand Down
16 changes: 16 additions & 0 deletions src/DurableTask.SqlServer/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ public void CreatedDatabase(string databaseName)
this.WriteLog(logEvent);
}

public void DiscardingEvent(string instanceId, string eventType, int taskEventId, string details)
{
var logEvent = new LogEvents.DiscardingEventEvent(
instanceId,
eventType,
taskEventId,
details);
this.WriteLog(logEvent);
}

public void GenericInfoEvent(string details, string? instanceId)
{
var logEvent = new LogEvents.GenericInfo(details, instanceId);
this.WriteLog(logEvent);
}

void WriteLog(ILogEvent logEvent)
{
// LogDurableEvent is an extension method defined in DurableTask.Core
Expand Down
34 changes: 33 additions & 1 deletion src/DurableTask.SqlServer/Logging/DefaultEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ internal void PurgedInstances(
}

[Event(EventIds.CommandCompleted, Level = EventLevel.Verbose)]
public void CommandCompleted(
internal void CommandCompleted(
string? InstanceId,
string CommandText,
long LatencyMs,
Expand Down Expand Up @@ -266,5 +266,37 @@ internal void CreatedDatabase(
AppName,
ExtensionVersion);
}

[Event(EventIds.DiscardingEvent, Level = EventLevel.Warning, Version = 1)]
internal void DiscardingEvent(
string InstanceId,
string EventType,
int TaskEventId,
string Details,
string AppName,
string ExtensionVersion)
{
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.DiscardingEvent,
InstanceId,
EventType,
TaskEventId,
Details,
AppName,
ExtensionVersion);
}

[Event(EventIds.GenericInfo, Level = EventLevel.Informational, Version = 1)]
internal void GenericInfo(string Details, string InstanceId, string AppName, string ExtensionVersion)
{
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.GenericInfo,
InstanceId,
Details,
AppName,
ExtensionVersion);
}
}
}
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ static class EventIds
public const int PurgedInstances = 310;
public const int CommandCompleted = 311;
public const int CreatedDatabase = 312;
public const int DiscardingEvent = 313;
public const int GenericInfo = 314;
}
}
73 changes: 73 additions & 0 deletions src/DurableTask.SqlServer/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -481,5 +481,78 @@ void IEventSourceEvent.WriteEventSource() =>
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}

internal class DiscardingEventEvent : StructuredLogEvent, IEventSourceEvent
{
public DiscardingEventEvent(string instanceId, string eventType, int taskEventId, string details)
{
this.InstanceId = instanceId;
this.EventType = eventType;
this.TaskEventId = taskEventId;
this.Details = details;
}

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string EventType { get; }

[StructuredLogField]
public int TaskEventId { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.DiscardingEvent,
nameof(EventIds.DiscardingEvent));

public override LogLevel Level => LogLevel.Warning;

protected override string CreateLogMessage() =>
$"{this.InstanceId}: Discarding {GetEventDescription(this.EventType, this.TaskEventId)}: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
DefaultEventSource.Log.DiscardingEvent(
this.InstanceId,
this.EventType,
this.TaskEventId,
this.Details,
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}

internal class GenericInfo : StructuredLogEvent, IEventSourceEvent
{
public GenericInfo(string details, string? instanceId)
{
this.Details = details;
this.InstanceId = instanceId;
}

[StructuredLogField]
public string Details { get; }

[StructuredLogField]
public string? InstanceId { get; }

public override EventId EventId => new EventId(
EventIds.GenericInfo,
nameof(EventIds.GenericInfo));

public override LogLevel Level => LogLevel.Information;

protected override string CreateLogMessage() => string.IsNullOrEmpty(this.InstanceId) ?
this.Details :
$"{this.InstanceId}: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
DefaultEventSource.Log.GenericInfo(
this.Details,
this.InstanceId ?? string.Empty,
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}
}
}
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/Scripts/drop-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByTime
DROP PROCEDURE IF EXISTS dt._AddOrchestrationEvents
DROP PROCEDURE IF EXISTS dt._CheckpointOrchestration
DROP PROCEDURE IF EXISTS dt._CompleteTasks
DROP PROCEDURE IF EXISTS dt._DiscardEventsAndUnlockInstance
DROP PROCEDURE IF EXISTS dt._GetVersions
DROP PROCEDURE IF EXISTS dt._LockNextOrchestration
DROP PROCEDURE IF EXISTS dt._LockNextTask
Expand Down
33 changes: 31 additions & 2 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ BEGIN
DECLARE @instanceID varchar(100)
DECLARE @parentInstanceID varchar(100)
DECLARE @version varchar(100)
DECLARE @runtimeStatus varchar(30)
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

BEGIN TRANSACTION
Expand All @@ -532,14 +533,15 @@ BEGIN
[LockExpiration] = @LockExpiration,
@instanceID = I.[InstanceID],
@parentInstanceID = I.[ParentInstanceID],
@runtimeStatus = I.[RuntimeStatus],
@version = I.[Version]
FROM
dt.Instances I WITH (READPAST) INNER JOIN NewEvents E WITH (READPAST) ON
E.[TaskHub] = @TaskHub AND
E.[InstanceID] = I.[InstanceID]
WHERE
I.TaskHub = @TaskHub AND
I.[RuntimeStatus] IN ('Pending', 'Running') AND
I.[RuntimeStatus] NOT IN ('Suspended') AND
(I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND
(E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now)

Expand Down Expand Up @@ -580,7 +582,10 @@ BEGIN
RETURN
END

-- Result #2: The full event history for the locked instance
-- Result #2: Basic information about this instance, including its runtime status
SELECT @instanceID AS [InstanceID], @runtimeStatus AS [RuntimeStatus]

-- Result #3: The full event history for the locked instance
-- NOTE: This must be kept consistent with the dt.HistoryEvents custom data type
SELECT
H.[InstanceID],
Expand Down Expand Up @@ -901,6 +906,30 @@ END
GO


CREATE OR ALTER PROCEDURE dt._DiscardEventsAndUnlockInstance
@InstanceID varchar(100),
@DeletedEvents MessageIDs READONLY
AS
BEGIN
DECLARE @taskHub varchar(50) = dt.CurrentTaskHub()

-- We return the list of deleted messages so that the caller can issue a
-- warning about missing messages
DELETE E
OUTPUT DELETED.InstanceID, DELETED.SequenceNumber
FROM dt.NewEvents E WITH (FORCESEEK(PK_NewEvents(TaskHub, InstanceID, SequenceNumber)))
INNER JOIN @DeletedEvents D ON
D.InstanceID = E.InstanceID AND
D.SequenceNumber = E.SequenceNumber AND
E.TaskHub = @taskHub

-- Release the lock on this instance
UPDATE Instances SET [LastUpdatedTime] = SYSUTCDATETIME(), [LockExpiration] = NULL
WHERE [TaskHub] = @taskHub and [InstanceID] = @InstanceID
END
GO


CREATE OR ALTER PROCEDURE dt._AddOrchestrationEvents
@NewOrchestrationEvents OrchestrationEvents READONLY
AS
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Scripts/permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ END
-- Functions
GRANT EXECUTE ON OBJECT::dt.GetScaleMetric TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.GetScaleRecommendation TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.CurrentTaskHub TO dt_runtime

-- Public sprocs
GRANT EXECUTE ON OBJECT::dt.CreateInstance TO dt_runtime
Expand All @@ -30,6 +31,7 @@ GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByTime TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._AddOrchestrationEvents TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._CheckpointOrchestration TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._CompleteTasks TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._DiscardEventsAndUnlockInstance TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._GetVersions TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._LockNextOrchestration TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._LockNextTask TO dt_runtime
Expand Down
76 changes: 75 additions & 1 deletion src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public override Task DeleteAsync(bool deleteInstanceStore)
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
bool isWaiting = false;
Stopwatch stopwatch = Stopwatch.StartNew();
do
{
Expand Down Expand Up @@ -166,14 +167,87 @@ public override Task DeleteAsync(bool deleteInstanceStore)

if (messages.Count == 0)
{
if (!isWaiting)
{
this.traceHelper.GenericInfoEvent(
"No events were found. Waiting for new events to appear.",
instanceId: null);
isWaiting = true;
}

// TODO: Make this dynamic based on the number of readers
await this.orchestrationBackoffHelper.WaitAsync(cancellationToken);
continue;
}

this.orchestrationBackoffHelper.Reset();
isWaiting = false;

// Result #2: The runtime status of the orchestration instance
if (await reader.NextResultAsync(cancellationToken))
{
bool instanceExists = await reader.ReadAsync(cancellationToken);
string instanceId;
OrchestrationStatus? currentStatus;

bool isRunning = false;
if (instanceExists)
{
instanceId = SqlUtils.GetInstanceId(reader);
currentStatus = SqlUtils.GetRuntimeStatus(reader);
isRunning =
currentStatus == OrchestrationStatus.Running ||
currentStatus == OrchestrationStatus.Pending;
}
else
{
instanceId = messages.Select(msg => msg.OrchestrationInstance.InstanceId).First();
currentStatus = null;
}

// If the instance is in a terminal state, log and discard the new events.
// NOTE: In the future, we may want to allow processing of some events if, for example, they may
// change the state of a completed instance. For example, a rewind command.
if (!isRunning)
{
string warningMessage = instanceExists ?
$"Target is in the {currentStatus} state." :
$"Target doesn't exist (either never existed or continued-as-new).";

messages.ForEach(msg => this.traceHelper.DiscardingEvent(
msg.OrchestrationInstance.InstanceId,
msg.Event.EventType.ToString(),
DTUtils.GetTaskEventId(msg.Event),
warningMessage));

// Close the already opened reader so that we can execute a new command
reader.Close();

// Delete the events and release the orchestration instance lock
using SqlCommand discardCommand = this.GetSprocCommand(
connection,
"dt._DiscardEventsAndUnlockInstance");
discardCommand.Parameters.Add("@InstanceID", SqlDbType.VarChar, 100).Value = instanceId;
discardCommand.Parameters.AddMessageIdParameter("@DeletedEvents", messages);
try
{
await SqlUtils.ExecuteNonQueryAsync(
discardCommand,
this.traceHelper,
instanceId,
cancellationToken);
}
catch (Exception e)
{
this.traceHelper.ProcessingError(e, new OrchestrationInstance { InstanceId = instanceId });
throw;
}

continue;
}
}

// Result #2: The full event history for the locked instance
// Result #3: The full event history for the locked instance
IList<HistoryEvent> history;
if (await reader.NextResultAsync(cancellationToken))
{
Expand Down
Loading