Skip to content

Fix the issue with retries not happening correctly for Activities and… #1374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,29 @@

import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;

import java.time.Duration;

public class DemoChildWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting ChildWorkflow: " + ctx.getName());

WorkflowTaskRetryPolicy policy = WorkflowTaskRetryPolicy.newBuilder()
.setFirstRetryInterval(Duration.ofSeconds(1))
.setMaxNumberOfAttempts(10)
.build();

WorkflowTaskOptions options = new WorkflowTaskOptions(policy);

var childWorkflowInput = ctx.getInput(String.class);
ctx.getLogger().info("ChildWorkflow received input: " + childWorkflowInput);

ctx.getLogger().info("ChildWorkflow is calling Activity: " + ReverseActivity.class.getName());
String result = ctx.callActivity(ReverseActivity.class.getName(), childWorkflowInput, String.class).await();
String result = ctx.callActivity(ReverseActivity.class.getName(), childWorkflowInput, options, String.class).await();

ctx.getLogger().info("ChildWorkflow finished with: " + result);
ctx.complete(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static void main(String[] args) throws Exception {

// Build and then start the workflow runtime pulling and executing tasks
WorkflowRuntime runtime = builder.build();
runtime.start();
System.out.println("Start workflow runtime");
}
}
2 changes: 1 addition & 1 deletion sdk-workflows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.2</version>
<version>1.5.3</version>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ public Builder setMaxRetryInterval(@Nullable Duration maxRetryInterval) {
* @return This builder
*/
public Builder setRetryTimeout(Duration retryTimeout) {
if (retryTimeout != null && retryTimeout.compareTo(this.firstRetryInterval) < 0) {
if (retryTimeout == null || retryTimeout.compareTo(this.firstRetryInterval) < 0) {
throw new IllegalArgumentException(
"The value for retryTimeout must be greater than or equal to the value for firstRetryInterval.");
"The value for retryTimeout cannot be null and"
+ " must be greater than or equal to the value for firstRetryInterval.");
}

this.retryTimeout = retryTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
);

retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient());
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
if (workflowTaskRetryPolicy.getRetryTimeout() != null) {
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
}

return new TaskOptions(retryPolicy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ public void callChildWorkflowWithOptions() {

assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
}

@Test
Expand All @@ -327,4 +328,52 @@ public void newUuidTestNoImplementationExceptionTest() {
String expectedMessage = "No implementation found.";
assertEquals(expectedMessage, runtimeException.getMessage());
}

@Test
public void workflowRetryPolicyRetryTimeoutValueShouldHaveRightValueWhenBeingSet() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";
WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.setRetryTimeout(Duration.ofSeconds(10))
.build();
WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);

context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);

verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);

TaskOptions taskOptions = captor.getValue();

assertEquals(Duration.ofSeconds(10), taskOptions.getRetryPolicy().getRetryTimeout());
}

@Test
public void workflowRetryPolicyRetryThrowIllegalArgumentWhenNullRetryTimeoutIsSet() {
assertThrows(IllegalArgumentException.class, () ->
WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.setRetryTimeout(null)
.build());
}

@Test
public void workflowRetryPolicyRetryThrowIllegalArgumentWhenRetryTimeoutIsLessThanMaxRetryInterval() {
assertThrows(IllegalArgumentException.class, () -> WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.setRetryTimeout(Duration.ofSeconds(9))
.build());
}
}
Loading