Skip to content

Commit 277802d

Browse files
Gate by SDK flag
1 parent 627d5c6 commit 277802d

File tree

7 files changed

+120
-73
lines changed

7 files changed

+120
-73
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ public enum SdkFlag {
1414
* Changes behavior of GetVersion to never yield.
1515
*/
1616
SKIP_YIELD_ON_VERSION(2),
17+
/*
18+
* Changes behavior of CancellationScope to cancel children in a deterministic order.
19+
*/
20+
DETERMINISTIC_CANCELLATION_SCOPE_ORDER(3),
1721
UNKNOWN(Integer.MAX_VALUE);
1822

1923
private final int value;

temporal-sdk/src/main/java/io/temporal/internal/sync/CancellationScopeImpl.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ private static void popCurrent(CancellationScopeImpl expected) {
3333

3434
private final Runnable runnable;
3535
private CancellationScopeImpl parent;
36-
// We use a LinkedHashSet because we will iterate through the children, so we need to keep a deterministic order.
37-
private final Set<CancellationScopeImpl> children = new LinkedHashSet<>();
36+
// We use a LinkedHashSet because we will iterate through the children, so we need to keep a
37+
// deterministic order.
38+
private final Set<CancellationScopeImpl> children;
3839

3940
/**
4041
* When disconnected scope has no parent and thus doesn't receive cancellation requests from it.
@@ -43,20 +44,37 @@ private static void popCurrent(CancellationScopeImpl expected) {
4344

4445
private String reason;
4546

46-
CancellationScopeImpl(boolean ignoreParentCancellation, Runnable runnable) {
47-
this(ignoreParentCancellation, runnable, current());
47+
CancellationScopeImpl(
48+
boolean ignoreParentCancellation, boolean deterministicOrder, Runnable runnable) {
49+
this(ignoreParentCancellation, deterministicOrder, runnable, current());
4850
}
4951

50-
CancellationScopeImpl(boolean detached, Runnable runnable, CancellationScopeImpl parent) {
52+
CancellationScopeImpl(
53+
boolean detached,
54+
boolean deterministicOrder,
55+
Runnable runnable,
56+
CancellationScopeImpl parent) {
5157
this.detached = detached;
5258
this.runnable = runnable;
59+
if (deterministicOrder) {
60+
this.children = new LinkedHashSet<>();
61+
} else {
62+
this.children = new HashSet<>();
63+
}
5364
setParent(parent);
5465
}
5566

5667
public CancellationScopeImpl(
57-
boolean ignoreParentCancellation, Functions.Proc1<CancellationScope> proc) {
68+
boolean ignoreParentCancellation,
69+
boolean deterministicOrder,
70+
Functions.Proc1<CancellationScope> proc) {
5871
this.detached = ignoreParentCancellation;
5972
this.runnable = () -> proc.apply(this);
73+
if (deterministicOrder) {
74+
this.children = new LinkedHashSet<>();
75+
} else {
76+
this.children = new HashSet<>();
77+
}
6078
setParent(current());
6179
}
6280

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.common.primitives.Ints;
55
import io.temporal.common.context.ContextPropagator;
66
import io.temporal.internal.WorkflowThreadMarker;
7+
import io.temporal.internal.common.SdkFlag;
78
import io.temporal.internal.context.ContextThreadLocal;
89
import io.temporal.internal.worker.WorkflowExecutorCache;
910
import io.temporal.serviceclient.CheckedExceptionWrapper;
@@ -157,7 +158,12 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
157158
// a bad practice
158159
this.workflowContext.setRunner(this);
159160
this.cache = cache;
160-
this.runnerCancellationScope = new CancellationScopeImpl(true, null, null);
161+
boolean deterministicCancellationScopeOrder =
162+
workflowContext
163+
.getReplayContext()
164+
.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
165+
this.runnerCancellationScope =
166+
new CancellationScopeImpl(true, deterministicCancellationScopeOrder, null, null);
161167
this.rootRunnable = root;
162168
}
163169

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.temporal.internal.WorkflowThreadMarker;
2525
import io.temporal.internal.common.ActivityOptionUtils;
2626
import io.temporal.internal.common.NonIdempotentHandle;
27+
import io.temporal.internal.common.SdkFlag;
2728
import io.temporal.internal.common.SearchAttributesUtil;
2829
import io.temporal.internal.logging.ReplayAwareLogger;
2930
import io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest;
@@ -546,12 +547,20 @@ public static Promise<Object> promiseAnyOf(Promise<?>... promises) {
546547
}
547548

548549
public static CancellationScope newCancellationScope(boolean detached, Runnable runnable) {
549-
return new CancellationScopeImpl(detached, runnable);
550+
boolean deterministicCancellationScopeOrder =
551+
getRootWorkflowContext()
552+
.getReplayContext()
553+
.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
554+
return new CancellationScopeImpl(detached, deterministicCancellationScopeOrder, runnable);
550555
}
551556

552557
public static CancellationScope newCancellationScope(
553558
boolean detached, Functions.Proc1<CancellationScope> proc) {
554-
return new CancellationScopeImpl(detached, proc);
559+
boolean deterministicCancellationScopeOrder =
560+
getRootWorkflowContext()
561+
.getReplayContext()
562+
.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
563+
return new CancellationScopeImpl(detached, deterministicCancellationScopeOrder, proc);
555564
}
556565

557566
public static CancellationScopeImpl currentCancellationScope() {

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.temporal.common.context.ContextPropagator;
55
import io.temporal.failure.CanceledFailure;
66
import io.temporal.internal.common.NonIdempotentHandle;
7+
import io.temporal.internal.common.SdkFlag;
78
import io.temporal.internal.context.ContextThreadLocal;
89
import io.temporal.internal.logging.LoggerTag;
910
import io.temporal.internal.replay.ReplayWorkflowContext;
@@ -55,7 +56,11 @@ class RunnableWrapper implements Runnable {
5556
this.threadContext = threadContext;
5657
this.replayWorkflowContext = replayWorkflowContext;
5758
this.name = name;
58-
this.cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
59+
boolean deterministicCancellationScopeOrder =
60+
replayWorkflowContext.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
61+
this.cancellationScope =
62+
new CancellationScopeImpl(
63+
detached, deterministicCancellationScopeOrder, runnable, parent);
5964
Preconditions.checkState(
6065
context.getStatus() == Status.CREATED, "threadContext not in CREATED state");
6166
this.contextPropagators = contextPropagators;

temporal-sdk/src/test/java/io/temporal/workflow/cancellationTests/WorkflowCancellationScopeDeterminism.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@
55
import io.temporal.client.WorkflowClient;
66
import io.temporal.client.WorkflowStub;
77
import io.temporal.common.WorkflowExecutionHistory;
8+
import io.temporal.internal.common.SdkFlag;
9+
import io.temporal.internal.statemachines.WorkflowStateMachines;
810
import io.temporal.testing.WorkflowReplayer;
911
import io.temporal.testing.internal.SDKTestWorkflowRule;
1012
import io.temporal.workflow.*;
1113
import java.time.Duration;
14+
import java.util.Arrays;
15+
import java.util.Collections;
16+
import org.junit.Before;
1217
import org.junit.Rule;
1318
import org.junit.Test;
14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
1619

1720
public class WorkflowCancellationScopeDeterminism {
18-
private static final Logger log =
19-
LoggerFactory.getLogger(WorkflowCancellationScopeDeterminism.class);
20-
2121
@Rule
2222
public SDKTestWorkflowRule testWorkflowRule =
2323
SDKTestWorkflowRule.newBuilder()
@@ -26,10 +26,15 @@ public class WorkflowCancellationScopeDeterminism {
2626
.setUseExternalService(true)
2727
.build();
2828

29-
@Test(timeout = 1000000)
29+
@Before
30+
public void setUp() {
31+
WorkflowStateMachines.initialFlags =
32+
Collections.unmodifiableList(Arrays.asList(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER));
33+
}
34+
35+
@Test(timeout = 60000)
3036
public void replayCanceledWorkflow() throws Exception {
31-
for (int i = 0; i < 1000; i++) {
32-
log.info("Running test iteration {}", i);
37+
for (int i = 0; i < 100; i++) {
3338
TestWorkflow testWorkflow = testWorkflowRule.newWorkflowStub(TestWorkflow.class);
3439

3540
WorkflowClient.start(testWorkflow::start);

0 commit comments

Comments
 (0)