Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import com.netflix.maestro.annotations.Nullable;
import com.netflix.maestro.exceptions.MaestroInternalError;
import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException;
import com.netflix.maestro.models.parameter.ParamSource;
Expand Down Expand Up @@ -91,7 +92,7 @@ public int getDepth() {
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder(
value = {"workflow_id", "instance_id", "run_id", "step_id", "step_attempt_id"},
value = {"workflow_id", "instance_id", "run_id", "step_id", "step_attempt_id", "sync"},
alphabetic = true)
@Data
public static class Info {
Expand All @@ -100,6 +101,8 @@ public static class Info {
private long runId;
private String stepId;
private long stepAttemptId;
// this is used to indicate if the downstream should be run in sync mode, default is true
@Nullable private Boolean sync;

@Override
public String toString() {
Expand All @@ -111,6 +114,11 @@ public String toString() {
public boolean isInline() {
return IdHelper.isInlineWorkflowId(workflowId);
}

@JsonIgnore
public boolean isAsync() {
return sync != null && !sync;
}
}

/** Create a concrete upstream initiator based on the given type. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
package com.netflix.maestro.models.initiator;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.netflix.maestro.MaestroBaseTest;
import lombok.Data;
Expand Down Expand Up @@ -87,4 +90,20 @@ public void testInitiatorAncestors() throws Exception {
"test-parent",
((TemplateInitiator) initiators.getTemplate()).getNonInlineParent().getWorkflowId());
}

@Test
public void testInitiatorAncestorSyncFlag() throws Exception {
Initiators initiators =
loadObject("fixtures/initiator/sample-initiators.json", Initiators.class);
assertTrue(initiators.getSubworkflow().getParent().isAsync());
assertFalse(initiators.getSubworkflow().getParent().getSync());
assertFalse(((UpstreamInitiator) initiators.getSubworkflow()).getRoot().isAsync());
assertNull(((UpstreamInitiator) initiators.getSubworkflow()).getRoot().getSync());
assertFalse(initiators.getForeach().getParent().isAsync());
assertNull(initiators.getForeach().getParent().getSync());
assertFalse(initiators.getTemplate().getParent().isAsync());
assertTrue(initiators.getTemplate().getParent().getSync());
assertFalse(((UpstreamInitiator) initiators.getTemplate()).getRoot().isAsync());
assertNull(((UpstreamInitiator) initiators.getTemplate()).getRoot().getSync());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"instance_id": 1,
"run_id": 1,
"step_id": "test-step",
"step_attempt_id": 1
"step_attempt_id": 1,
"sync": false
}
],
"depth": 2,
Expand Down Expand Up @@ -64,7 +65,8 @@
"instance_id": 1,
"run_id": 1,
"step_id": "test-step",
"step_attempt_id": 1
"step_attempt_id": 1,
"sync": true
}],
"type": "TEMPLATE"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,23 @@ private Optional<StepAction> getAction(WorkflowSummary summary, String stepId) {
Initiator initiator = summary.getInitiator();
List<UpstreamInitiator.Info> path = new ArrayList<>();
StringBuilder sqlBuilder = new StringBuilder(GET_ACTION_QUERY);
if (initiator instanceof UpstreamInitiator) {
path.addAll(((UpstreamInitiator) initiator).getAncestors());
sqlBuilder
.append(" OR (")
.append(String.join(") OR (", Collections.nCopies(path.size(), CONDITION_POSTFIX)))
.append(')');
if (initiator instanceof UpstreamInitiator upstreamInitiator) {
List<UpstreamInitiator.Info> ancestorPath = new ArrayList<>();
for (var ancestor : upstreamInitiator.getAncestors().reversed()) {
Copy link

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using var for the loop variable reduces code readability. Consider using the explicit type UpstreamInitiator.Info ancestor to make the code more self-documenting.

Suggested change
for (var ancestor : upstreamInitiator.getAncestors().reversed()) {
for (UpstreamInitiator.Info ancestor : upstreamInitiator.getAncestors().reversed()) {

Copilot uses AI. Check for mistakes.
// if a (sub)workflow is async, ignore actions from upstream ancestors at this point
if (ancestor.isAsync()) {
break;
} else {
ancestorPath.add(ancestor);
}
}
path.addAll(ancestorPath.reversed());
Copy link

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling reversed() twice on the same collection is inefficient. Consider reversing ancestorPath once after the loop instead of reversing both during iteration and when adding to path.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's wrong.

if (!path.isEmpty()) {
sqlBuilder
.append(" OR (")
.append(String.join(") OR (", Collections.nCopies(path.size(), CONDITION_POSTFIX)))
.append(')');
}
}
String sql = sqlBuilder.toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,8 @@ private Optional<TimelineEvent> takePendingActionIfFeasible(
Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
createForeachRunParams(
restartIterationId - 1, workflowSummary, foreachStep, runtimeSummary),
generateDedupKey(restartArtifact, restartIterationId - 1))
generateDedupKey(restartArtifact, restartIterationId - 1),
null)
.toBuilder()
.restartConfig(foreachAction.getRestartConfig())
.build();
Expand Down Expand Up @@ -753,7 +754,8 @@ private Optional<Details> launchForeachIterations(
runtimeSummary,
Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
createForeachRunParams(idx, workflowSummary, step, runtimeSummary),
generateDedupKey(artifact, idx));
generateDedupKey(artifact, idx),
null);

if (instanceStepConcurrencyHandler.addInstance(runRequest)) {
runRequests.add(runRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private Result runSubworkflowInstance(
runtimeSummary,
Collections.singletonList(Tag.create(SUBWORKFLOW_TAG_NAME)),
createSubworkflowRunParam(workflowSummary, step, runtimeSummary),
workflowSummary.getIdentity() + runtimeSummary.getIdentity());
workflowSummary.getIdentity() + runtimeSummary.getIdentity(),
((SubworkflowStep) step).getSync());

if (!instanceStepConcurrencyHandler.addInstance(runRequest)) {
return new Result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public static RunRequest createInternalWorkflowRunRequest(
StepRuntimeSummary runtimeSummary,
List<Tag> tags,
Map<String, ParamDefinition> runParams,
String dedupKey) {
String dedupKey,
Boolean syncMode) {

UpstreamInitiator initiator =
UpstreamInitiator.withType(Initiator.Type.valueOf(runtimeSummary.getType().name()));
Expand All @@ -189,6 +190,7 @@ public static RunRequest createInternalWorkflowRunRequest(
parent.setRunId(workflowSummary.getWorkflowRunId());
parent.setStepId(runtimeSummary.getStepId());
parent.setStepAttemptId(runtimeSummary.getStepAttemptId());
parent.setSync(syncMode);

List<UpstreamInitiator.Info> ancestors = new ArrayList<>();
if (workflowSummary.getInitiator() instanceof UpstreamInitiator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,140 @@ public void testGetActionFromUpstream() throws Exception {
verify(queueSystem, times(8)).notify(any());
}

@Test
public void testGetActionFromUpstreamWithSyncFalse() {
WorkflowSummary summary = setupWorkflowSummaryAndAction(false, false);

// Since parent.sync=false, action lookup should NOT continue to ancestors
// No action should be found from ancestors, and no local action exists
Optional<StepAction> stepAction = actionDao.tryGetAction(summary, "job1");
Assert.assertFalse(stepAction.isPresent());

actionDao.terminate(summary, "job1", user, RESTART, "test-reason");
// Since parent.sync=false, action lookup should NOT continue to ancestors. It picks up its own
// action.
stepAction = actionDao.tryGetAction(summary, "job1");
Assert.assertTrue(stepAction.isPresent());
stepAction.ifPresent(
action -> {
Assert.assertEquals("sample-dag-test-3", action.getWorkflowId());
Assert.assertEquals(1, action.getWorkflowInstanceId());
Assert.assertEquals(1, action.getWorkflowRunId());
Assert.assertEquals("job1", action.getStepId());
Assert.assertEquals(RESTART, action.getAction());
Assert.assertEquals(user, action.getUser());
});
Assert.assertEquals(1, actionDao.cleanUp("sample-dag-test-3", 1, 1));
Assert.assertEquals(1, actionDao.cleanUp("sample-root-wf", 3, 2));
Assert.assertEquals(1, actionDao.cleanUp("sample-subworkflow-wf", 1, 2));
}

@Test
public void testGetActionFromUpstreamWithSyncTrue() {
WorkflowSummary summary = setupWorkflowSummaryAndAction(true, false);

// Since parent.sync=true, action lookup should continue to ancestors
// Should get root action (first usingUpstream action in sorted order)
Optional<StepAction> stepAction = actionDao.tryGetAction(summary, "job1");
Assert.assertTrue(stepAction.isPresent());
stepAction.ifPresent(
action -> {
Assert.assertEquals("sample-root-wf", action.getWorkflowId());
Assert.assertEquals(3, action.getWorkflowInstanceId());
Assert.assertEquals(2, action.getWorkflowRunId());
Assert.assertEquals("root-step1", action.getStepId());
Assert.assertEquals(STOP, action.getAction());
Assert.assertEquals(user, action.getUser());
});

Assert.assertEquals(1, actionDao.cleanUp("sample-root-wf", 3, 2));
Assert.assertEquals(1, actionDao.cleanUp("sample-subworkflow-wf", 1, 2));
}

@Test
public void testGetActionFromUpstreamWithMixedSyncFlags() {
WorkflowSummary summary = setupWorkflowSummaryAndAction(true, true);

// Since parent.sync=true and grandparent.sync=false, action lookup should NOT include
// grandparent or any ancestors
// No action should be found from ancestors, and no local action exists
Optional<StepAction> stepAction = actionDao.tryGetAction(summary, "job1");
Assert.assertTrue(stepAction.isPresent());
stepAction.ifPresent(
action -> {
Assert.assertEquals("sample-subworkflow-wf", action.getWorkflowId());
Assert.assertEquals(1, action.getWorkflowInstanceId());
Assert.assertEquals(2, action.getWorkflowRunId());
Assert.assertEquals("sub-step1", action.getStepId());
Assert.assertEquals(KILL, action.getAction());
Assert.assertEquals(user, action.getUser());
});

Assert.assertEquals(1, actionDao.cleanUp("sample-root-wf", 3, 2));
Assert.assertEquals(1, actionDao.cleanUp("sample-grandparent-wf", 4, 3));
Assert.assertEquals(1, actionDao.cleanUp("sample-subworkflow-wf", 1, 2));
}

private WorkflowSummary setupWorkflowSummaryAndAction(boolean sync, boolean withGrandparent) {
WorkflowSummary summary = new WorkflowSummary();
summary.setWorkflowId("sample-dag-test-3");
summary.setWorkflowInstanceId(1);
summary.setWorkflowRunId(1);

SubworkflowInitiator initiator = new SubworkflowInitiator();

UpstreamInitiator.Info parent = new UpstreamInitiator.Info();
parent.setWorkflowId("sample-subworkflow-wf");
parent.setInstanceId(1);
parent.setRunId(2);
parent.setStepId("sub-step1");
parent.setSync(sync);

UpstreamInitiator.Info root = new UpstreamInitiator.Info();
root.setWorkflowId("sample-root-wf");
root.setInstanceId(3);
root.setRunId(2);
root.setStepId("root-step1");
root.setSync(true); // Set sync to true

if (withGrandparent) {
UpstreamInitiator.Info grandParent = new UpstreamInitiator.Info();
grandParent.setWorkflowId("sample-grandparent-wf");
grandParent.setInstanceId(4);
grandParent.setRunId(3);
grandParent.setStepId("grandparent-step1");
grandParent.setSync(false); // Set sync to false - this should stop the search
initiator.setAncestors(Arrays.asList(root, grandParent, parent));
} else {
initiator.setAncestors(Arrays.asList(root, parent));
}
summary.setInitiator(initiator);

WorkflowSummary rootSummary = new WorkflowSummary();
rootSummary.setWorkflowId("sample-root-wf");
rootSummary.setWorkflowInstanceId(3);
rootSummary.setWorkflowRunId(2);

WorkflowSummary parentSummary = new WorkflowSummary();
parentSummary.setWorkflowId("sample-subworkflow-wf");
parentSummary.setWorkflowInstanceId(1);
parentSummary.setWorkflowRunId(2);

if (withGrandparent) {
WorkflowSummary grandParentSummary = new WorkflowSummary();
grandParentSummary.setWorkflowId("sample-grandparent-wf");
grandParentSummary.setWorkflowInstanceId(4);
grandParentSummary.setWorkflowRunId(3);
// Create actions in grandparent
actionDao.terminate(grandParentSummary, "grandparent-step1", user, RESTART, "test-reason");
}

// Create actions in both root and parent
actionDao.terminate(rootSummary, "root-step1", user, STOP, "test-reason");
actionDao.terminate(parentSummary, "sub-step1", user, KILL, "test-reason");
return summary;
}

@Test
public void testInvalidRestart() {
RunResponse restartStepInfo1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.maestro.engine.execution.WorkflowSummary;
import com.netflix.maestro.engine.handlers.WorkflowActionHandler;
import com.netflix.maestro.engine.handlers.WorkflowInstanceActionHandler;
import com.netflix.maestro.engine.utils.StepHelper;
import com.netflix.maestro.models.artifact.Artifact;
import com.netflix.maestro.models.artifact.SubworkflowArtifact;
import com.netflix.maestro.models.definition.StepType;
Expand All @@ -50,9 +51,6 @@
import org.mockito.Mock;

public class SubworkflowStepRuntimeTest extends MaestroBaseTest {

private SubworkflowStepRuntime subworkflowStepRuntime;

@Mock private WorkflowActionHandler workflowActionHandler;
@Mock private WorkflowInstanceActionHandler instanceActionHandler;
@Mock private InstanceStepConcurrencyHandler concurrencyHandler;
Expand All @@ -61,6 +59,9 @@ public class SubworkflowStepRuntimeTest extends MaestroBaseTest {
@Mock private MaestroQueueSystem queueSystem;
@Mock private SubworkflowStep step;

private SubworkflowStepRuntime subworkflowStepRuntime;
private WorkflowSummary workflowSummary;

@Before
public void setUp() {
subworkflowStepRuntime =
Expand All @@ -72,18 +73,17 @@ public void setUp() {
stepInstanceDao,
queueSystem,
Collections.emptySet());
}

@Test
public void testSubworkflowLaunchDuplicateInstance() {
WorkflowSummary workflowSummary = new WorkflowSummary();
workflowSummary = new WorkflowSummary();
workflowSummary.setWorkflowId("test-workflow");
workflowSummary.setWorkflowInstanceId(1L);
workflowSummary.setWorkflowRunId(1L);
workflowSummary.setRunPolicy(null);
workflowSummary.setParams(Collections.emptyMap());
workflowSummary.setRunPolicy(RunPolicy.START_FRESH_NEW_RUN);
}

@Test
public void testSubworkflowLaunchDuplicateInstance() {
StepRuntimeSummary runtimeSummary =
StepRuntimeSummary.builder()
.stepId("test-step")
Expand Down Expand Up @@ -143,4 +143,26 @@ public void testSubworkflowLaunchDuplicateInstance() {
TimelineLogEvent logEvent = (TimelineLogEvent) result.getTimeline().getFirst();
assertEquals("Started a subworkflow with uuid: " + subworkflowUuid, logEvent.getMessage());
}

@Test
public void testSubworkflowSyncFlagUsedInRunRequest() {
Map.of("sync", true, "async", false)
.forEach(
(k, v) -> {
SubworkflowStep step = new SubworkflowStep();
step.setId(k + "-test-step");
step.setSync(v);
StepRuntimeSummary runtimeSummary =
StepRuntimeSummary.builder()
.stepId("test-step")
.stepAttemptId(1L)
.type(StepType.SUBWORKFLOW)
.stepRetry(StepInstance.StepRetry.from(null))
.build();
RunRequest runRequest =
StepHelper.createInternalWorkflowRunRequest(
workflowSummary, runtimeSummary, null, null, "test-dedup", step.getSync());
assertEquals(!v, runRequest.getInitiator().getParent().isAsync());
});
}
}
Loading