Skip to content

Add support for activity reset #2546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.temporal.client;

import io.temporal.activity.ActivityInfo;
import io.temporal.common.Experimental;

/***
* Indicates that the activity was paused by the user.
*
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
*/
@Experimental
public final class ActivityPausedException extends ActivityCompletionException {
public ActivityPausedException(ActivityInfo info) {
super(info);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.temporal.client;

import io.temporal.activity.ActivityInfo;
import io.temporal.common.Experimental;

/***
* Indicates that the activity attempt was reset by the user.
*
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
*/
@Experimental
public final class ActivityResetException extends ActivityCompletionException {
public ActivityResetException(ActivityInfo info) {
super(info);
}

public ActivityResetException() {
super();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ private void sendHeartbeatRequest(Object details) {
metricsScope);
if (status.getCancelRequested()) {
lastException = new ActivityCanceledException(info);
} else if (status.getActivityReset()) {
lastException = new ActivityResetException(info);
} else if (status.getActivityPaused()) {
lastException = new ActivityPausedException(info);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.client.ActivityCanceledException;
import io.temporal.client.ActivityCompletionFailureException;
import io.temporal.client.ActivityNotExistsException;
import io.temporal.client.*;
import io.temporal.common.converter.DataConverter;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.client.ActivityClientHelper;
Expand Down Expand Up @@ -190,6 +188,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure {
metricsScope);
if (status.getCancelRequested()) {
throw new ActivityCanceledException();
} else if (status.getActivityReset()) {
throw new ActivityResetException();
} else if (status.getActivityPaused()) {
throw new ActivityPausedException();
}
} else {
RecordActivityTaskHeartbeatByIdResponse status =
Expand All @@ -203,6 +205,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure {
metricsScope);
if (status.getCancelRequested()) {
throw new ActivityCanceledException();
} else if (status.getActivityReset()) {
throw new ActivityResetException();
} else if (status.getActivityPaused()) {
throw new ActivityPausedException();
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.temporal.activity;

import static org.junit.Assume.assumeTrue;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflow.v1.PendingActivityInfo;
import io.temporal.api.workflowservice.v1.ResetActivityRequest;
import io.temporal.client.ActivityResetException;
import io.temporal.client.WorkflowStub;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Async;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class ActivityResetTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowImpl.class)
.setActivityImplementations(new HeartBeatingActivityImpl())
.build();

@Test
public void activityReset() {
assumeTrue(
"Test Server doesn't support activity pause", SDKTestWorkflowRule.useExternalService);

TestWorkflows.TestWorkflowReturnString workflow =
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
Assert.assertEquals("I am stopped after reset", workflow.execute());
Assert.assertEquals(
1,
WorkflowStub.fromTyped(workflow)
.describe()
.getRawDescription()
.getPendingActivitiesCount());
PendingActivityInfo activityInfo =
WorkflowStub.fromTyped(workflow).describe().getRawDescription().getPendingActivities(0);
Assert.assertEquals(
"1",
GlobalDataConverter.get()
.fromPayload(
activityInfo.getHeartbeatDetails().getPayloads(0), String.class, String.class));
}

public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {

private final TestActivities.TestActivity1 activities =
Workflow.newActivityStub(
TestActivities.TestActivity1.class,
SDKTestOptions.newActivityOptions20sScheduleToClose());

@Override
public String execute() {
Async.function(activities::execute, "");
Workflow.sleep(Duration.ofSeconds(1));
return activities.execute("CompleteOnPause");
}
}

public static class HeartBeatingActivityImpl implements TestActivities.TestActivity1 {
private final AtomicInteger resetCounter = new AtomicInteger(0);

@Override
public String execute(String arg) {
ActivityInfo info = Activity.getExecutionContext().getInfo();
// Have the activity pause itself
Activity.getExecutionContext()
.getWorkflowClient()
.getWorkflowServiceStubs()
.blockingStub()
.resetActivity(
ResetActivityRequest.newBuilder()
.setNamespace(info.getNamespace())
.setExecution(
WorkflowExecution.newBuilder()
.setWorkflowId(info.getWorkflowId())
.setRunId(info.getRunId())
.build())
.setId(info.getActivityId())
.build());
while (true) {
try {
Thread.sleep(1000);
// Check if the activity has been reset, and the activity info shows we are on the 1st
// attempt.
if (resetCounter.get() >= 1
&& Activity.getExecutionContext().getInfo().getAttempt() == 1) {
return "I am stopped after reset";
}
// Heartbeat and verify that the correct exception is thrown
Activity.getExecutionContext().heartbeat("1");
} catch (ActivityResetException pe) {
// Counter is incremented to track the number of resets
resetCounter.addAndGet(1);
// This will fail the attempt, and the activity will be retried.
throw pe;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
Loading