Skip to content

fix(workflow): Fix error when timer is cancelled and immediately fired in the same activation #466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1706,3 +1706,28 @@ test('conditionWaiter', async (t) => {
compareCompletion(t, completion, makeSuccess([makeCompleteWorkflowExecution()]));
}
});

test('conditionRacer', async (t) => {
const { workflowType } = t.context;
{
const completion = await activate(t, makeStartWorkflow(workflowType));
compareCompletion(
t,
completion,
makeSuccess([makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs('1s') })])
);
}
{
const completion = await activate(
t,
makeActivation(
Date.now(),
{
signalWorkflow: { signalName: 'unblock', input: [] },
},
makeFireTimerJob(1)
)
);
compareCompletion(t, completion, makeSuccess([{ cancelTimer: { seq: 1 } }]));
Comment on lines +1723 to +1731
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you make this repro w/o the extra activation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"extra"? I think w/ these two jobs:

        {
          signalWorkflow: { signalName: 'unblock', input: [] },
        },
        makeFireTimerJob(1)

It processes the first, which results in a cancel (and immediate cleanup/removal of timer completion), and then when it tries to process the second, it drops it because it can't find the completion (and assumes it must have been canceled). (or in the repro case, throws)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue I had initially is that the fireTimer job doesn't get delivered if the workflow is already complete.
Adding another condition at the end of the workflow prevents that.

}
});
14 changes: 14 additions & 0 deletions packages/test/src/workflows/condition-completion-race.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Tests that the workflow doesn't throw if condition resolves and expires in the same activation
*
* @module
*/
import { condition, setHandler } from '@temporalio/workflow';
import { unblockSignal } from './definitions';

export async function conditionRacer(): Promise<void> {
let blocked = true;
setHandler(unblockSignal, () => void (blocked = false));
await condition(() => !blocked, '1s');
await condition(() => blocked);
}
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export * from './try-to-continue-after-completion';
export * from './fail-unless-signaled-before-start';
export * from './smorgasbord';
export * from './condition';
export * from './condition-completion-race';
export * from './sleep-invalid-duration';
export * from './signals-are-always-processed';
export * from './async-activity-completion-tester';
Expand Down
19 changes: 15 additions & 4 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ export class Activator implements ActivationHandler {
}

public fireTimer(activation: coresdk.workflow_activation.IFireTimer): void {
const { resolve } = consumeCompletion('timer', getSeq(activation));
resolve(undefined);
// Timers are a special case where their completion might not be in Workflow state,
// this is due to immediate timer cancellation that doesn't go wait for Core.
const completion = maybeConsumeCompletion('timer', getSeq(activation));
completion?.resolve(undefined);
}

public async resolveActivity(activation: coresdk.workflow_activation.IResolveActivity): Promise<void> {
Expand Down Expand Up @@ -525,12 +527,21 @@ async function failQuery(queryId: string, error: any) {
});
}

export function consumeCompletion(type: keyof State['completions'], taskSeq: number): Completion {
/** Consume a completion if it exists in Workflow state */
export function maybeConsumeCompletion(type: keyof State['completions'], taskSeq: number): Completion | undefined {
const completion = state.completions[type].get(taskSeq);
if (completion !== undefined) {
state.completions[type].delete(taskSeq);
}
return completion;
}

/** Consume a completion if it exists in Workflow state, throws if it doesn't */
export function consumeCompletion(type: keyof State['completions'], taskSeq: number): Completion {
const completion = maybeConsumeCompletion(type, taskSeq);
if (completion === undefined) {
throw new IllegalStateError(`No completion for taskSeq ${taskSeq}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change that message too. "Completion" is a confusing word there. It's more like "No pending awaitable for...", no?

Now, are there any cases in which a user would see this error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user could see this error if new workflow code processes history generated by old code.

}
state.completions[type].delete(taskSeq);
return completion;
}

Expand Down