Skip to content

Fix nondeterminism on Activity/Suborchestration failures #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/durableerror.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* A specfic error thrown when a scheduled activity or suborchestrator has failed.
* This error can be checked for via `instanceof` guards to catch only exceptions thrown
* by the DurableJS library.
*/
export class DurableError extends Error {
constructor(message: string | undefined) {
super(message);
}
}
29 changes: 29 additions & 0 deletions src/orchestrationfailureerror.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { OrchestratorState } from "./classes";

const outOfProcDataLabel = "\n\n$OutOfProcData$:";

/**
* A wrapper for all errors thrown within an orchestrator function. This exception will embed
* the orchestrator state in a way that the C# extension knows how to read so that it can replay the
* actions scheduled before throwing an exception. This prevents non-determinism errors in Durable Task.
*
* Note that making any changes to the following schema to OrchestrationFailureError.message could be considered a breaking change:
*
* "<error message as a string>\n\n$OutOfProcData$<json representation of state>"
*/
export class OrchestrationFailureError extends Error {
constructor(error: any, state: OrchestratorState) {
let errorMessage : String;
if (error instanceof Error) {
errorMessage= error.message;
} else if (error instanceof String) {
errorMessage = error;
} else {
errorMessage = JSON.stringify(error);
}

const message = `${errorMessage}${outOfProcDataLabel}${JSON.stringify(state)}`;
super(message);
this.stack = error.stack;
}
}
86 changes: 50 additions & 36 deletions src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { CallActivityAction, CallActivityWithRetryAction, CallEntityAction, Call
TaskScheduledEvent, TaskSet, TimerCreatedEvent, TimerFiredEvent, TimerTask,
Utils, WaitForExternalEventAction,
} from "./classes";
import { DurableError } from "./durableerror";
import { OrchestrationFailureError } from "./orchestrationfailureerror";
import { FailedSingleTask } from "./tasks/taskinterfaces";
import { TokenSource } from "./tokensource";

Expand Down Expand Up @@ -98,7 +100,7 @@ export class Orchestrator {
log("Iterator is done");
// The customer returned an absolute type.
context.done(
null,
undefined,
new OrchestratorState({
isDone: true,
output: g.value,
Expand Down Expand Up @@ -127,7 +129,7 @@ export class Orchestrator {
// Return continue as new events as completed, as the execution itself is now completed.
if (TaskFilter.isSingleTask(partialResult) && partialResult.action instanceof ContinueAsNewAction) {
context.done(
null,
undefined,
new OrchestratorState({
isDone: true,
output: undefined,
Expand All @@ -140,7 +142,7 @@ export class Orchestrator {

if (!TaskFilter.isCompletedTask(partialResult)) {
context.done(
null,
undefined,
new OrchestratorState({
isDone: false,
output: undefined,
Expand All @@ -164,7 +166,7 @@ export class Orchestrator {
// would yield the task before returning out of the generator function.
if (g.done) {
log("Iterator is done");
context.done(null,
context.done(undefined,
new OrchestratorState({
isDone: true,
actions,
Expand All @@ -185,17 +187,16 @@ export class Orchestrator {
g = gen.next(partialResult.result);
}
} catch (error) {
log(`Error: ${error}`);
context.done(
error,
new OrchestratorState({
isDone: false,
output: undefined,
actions,
error: error.stack,
customStatus: this.customStatus,
}),
);
// Wrap orchestration state in OutOfProcErrorWrapper to ensure data
// gets embedded in error message received by C# extension.
const errorState = new OrchestratorState({
isDone: false,
output: undefined,
actions,
error: error.message,
customStatus: this.customStatus,
});
context.done(new OrchestrationFailureError(error, errorState), undefined);
return;
}
}
Expand Down Expand Up @@ -225,7 +226,7 @@ export class Orchestrator {
taskFailed.Timestamp,
taskFailed.TaskScheduledId,
state.indexOf(taskFailed),
new Error(taskFailed.Reason),
new DurableError(taskFailed.Reason),
);
} else {
return TaskFactory.UncompletedTask(
Expand Down Expand Up @@ -256,14 +257,16 @@ export class Orchestrator {
const result = this.parseHistoryEvent(taskCompleted);

return TaskFactory.SuccessfulTask(newAction, result, taskCompleted.Timestamp, taskCompleted.TaskScheduledId, state.indexOf(taskCompleted));
} else if (taskFailed && taskRetryTimer && attempt >= retryOptions.maxNumberOfAttempts) {
} else if (taskFailed
&& taskRetryTimer
&& attempt >= retryOptions.maxNumberOfAttempts) {
return TaskFactory.FailedTask(
newAction,
taskFailed.Reason,
taskFailed.Timestamp,
taskFailed.TaskScheduledId,
state.indexOf(taskFailed),
new Error(taskFailed.Reason),
new DurableError(taskFailed.Reason),
);
}
}
Expand All @@ -278,7 +281,7 @@ export class Orchestrator {
const eventSent = this.findEventSent(state, schedulerId, "op");
let eventRaised;
if (eventSent) {
const eventSentInput = eventSent && eventSent.Input !== undefined ? JSON.parse(eventSent.Input) as RequestMessage : undefined;
const eventSentInput = eventSent && eventSent.Input ? JSON.parse(eventSent.Input) as RequestMessage : undefined;
eventRaised = eventSentInput ? this.findEventRaised(state, eventSentInput.id) : undefined;
}
this.setProcessed([ eventSent, eventRaised ]);
Expand Down Expand Up @@ -325,7 +328,7 @@ export class Orchestrator {
subOrchestratorFailed.Timestamp,
subOrchestratorFailed.TaskScheduledId,
state.indexOf(subOrchestratorFailed),
new Error(subOrchestratorFailed.Reason),
new DurableError(subOrchestratorFailed.Reason),
);
} else {
return TaskFactory.UncompletedTask(
Expand Down Expand Up @@ -373,14 +376,16 @@ export class Orchestrator {
subOrchestratorCompleted.TaskScheduledId,
state.indexOf(subOrchestratorCompleted),
);
} else if (subOrchestratorFailed && retryTimer && attempt >= retryOptions.maxNumberOfAttempts) {
} else if (subOrchestratorFailed
&& retryTimer
&& attempt >= retryOptions.maxNumberOfAttempts) {
return TaskFactory.FailedTask(
newAction,
subOrchestratorFailed.Reason,
subOrchestratorFailed.Timestamp,
subOrchestratorFailed.TaskScheduledId,
state.indexOf(subOrchestratorFailed),
new Error(subOrchestratorFailed.Reason),
new DurableError(subOrchestratorFailed.Reason),
);
}
}
Expand Down Expand Up @@ -427,7 +432,7 @@ export class Orchestrator {
httpFailed.Timestamp,
httpFailed.TaskScheduledId,
state.indexOf(httpFailed),
new Error(httpFailed.Reason),
new DurableError(httpFailed.Reason),
);
} else {
return TaskFactory.UncompletedTask(
Expand Down Expand Up @@ -464,7 +469,7 @@ export class Orchestrator {

private isLocked(contextLocks: EntityId[]): LockState {
return new LockState(
contextLocks !== undefined && contextLocks !== null,
contextLocks && contextLocks !== null,
contextLocks,
);
}
Expand Down Expand Up @@ -587,7 +592,8 @@ export class Orchestrator {
switch (directiveResult.EventType) {
case (HistoryEventType.EventRaised):
const eventRaised = directiveResult as EventRaisedEvent;
parsedDirectiveResult = (eventRaised && eventRaised.Input !== undefined) ? JSON.parse(eventRaised.Input) : undefined;
parsedDirectiveResult = (eventRaised && eventRaised.Input)
? JSON.parse(eventRaised.Input) : undefined;
break;
case (HistoryEventType.SubOrchestrationInstanceCompleted):
parsedDirectiveResult = JSON.parse((directiveResult as SubOrchestrationInstanceCompletedEvent).Result);
Expand Down Expand Up @@ -708,37 +714,45 @@ export class Orchestrator {
}

/* Returns undefined if not found. */
private findTaskScheduled(state: HistoryEvent[], name: string): TaskScheduledEvent {
private findTaskScheduled(state: HistoryEvent[], name: string): TaskScheduledEvent | undefined {
const returnValue = name
? state.filter((val: HistoryEvent) => {
return val.EventType === HistoryEventType.TaskScheduled
&& (val as TaskScheduledEvent).Name === name
&& !val.IsProcessed;
})[0]
})[0] as TaskScheduledEvent
: undefined;
return returnValue as TaskScheduledEvent;
return returnValue;
}

/* Returns undefined if not found. */
private findTaskCompleted(state: HistoryEvent[], scheduledTask: TaskScheduledEvent): TaskCompletedEvent {
private findTaskCompleted(state: HistoryEvent[], scheduledTask: TaskScheduledEvent | undefined): TaskCompletedEvent | undefined {
if (scheduledTask === undefined) {
return undefined;
}

const returnValue = scheduledTask
? state.filter((val: HistoryEvent) => {
return val.EventType === HistoryEventType.TaskCompleted
&& (val as TaskCompletedEvent).TaskScheduledId === scheduledTask.EventId;
})[0]
})[0] as TaskCompletedEvent
: undefined;
return returnValue as TaskCompletedEvent;
return returnValue;
}

/* Returns undefined if not found. */
private findTaskFailed(state: HistoryEvent[], scheduledTask: HistoryEvent): TaskFailedEvent {
private findTaskFailed(state: HistoryEvent[], scheduledTask: TaskScheduledEvent | undefined): TaskFailedEvent | undefined {
if (scheduledTask === undefined) {
return undefined;
}

const returnValue = scheduledTask
? state.filter((val: HistoryEvent) => {
return val.EventType === HistoryEventType.TaskFailed
&& (val as TaskFailedEvent).TaskScheduledId === scheduledTask.EventId;
})[0]
})[0] as TaskFailedEvent
: undefined;
return returnValue as TaskFailedEvent;
return returnValue;
}

/* Returns undefined if not found. */
Expand All @@ -758,9 +772,9 @@ export class Orchestrator {
? state.filter((val: HistoryEvent) => {
return val.EventType === HistoryEventType.TimerFired
&& (val as TimerFiredEvent).TimerId === createdTimer.EventId;
})[0]
})[0] as TimerFiredEvent
: undefined;
return returnValue as TimerFiredEvent;
return returnValue;
}

private setProcessed(events: Array<HistoryEvent | undefined>): void {
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/taskfactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class TaskFactory {
);
}

public static FailedTaskSet(tasks: Task[], exception: unknown): TaskSet {
public static FailedTaskSet(tasks: Task[], exception: Error): TaskSet {
return new TaskSet(
true,
true,
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/taskinterfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface SuccessfulSingleTask extends CompletedSingleTask {

export interface FailedSingleTask extends CompletedSingleTask {
readonly isFaulted: true;
readonly exception: unknown;
readonly exception: Error;
readonly result: undefined;
}

Expand All @@ -51,7 +51,7 @@ export interface UncompletedTaskSet extends TaskCollection {

export interface FailedTaskSet extends CompletedTaskSet {
readonly isFaulted: true;
readonly exception: unknown;
readonly exception: Error;
readonly result: undefined;
}

Expand Down
2 changes: 1 addition & 1 deletion src/tasks/taskset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export class TaskSet {
public readonly isFaulted: boolean,
public readonly tasks: SingleTask[],
public result?: unknown,
public exception?: unknown,
public exception?: Error,
) { }
}
Loading