Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ jobs:
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
--dynamic-config-value frontend.activityAPIsEnabled=true \
--dynamic-config-value system.enableDeploymentVersions=true &
sleep 10s

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client;

import io.temporal.activity.ActivityInfo;

/***
* Indicates that the activity was paused by the user.
*
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
*/
public final class ActivityPausedException extends ActivityCompletionException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, so I see this is in the client package. I am guessing some other extensions of this activity completion exception are in client package instead of the activity one because they can be the cause of the outer activity failure when caught inside a workflow, correct? My concern with this exception is that it is only for inside activities and never for inside workflows.

Not that it's wrong or anything, but I wonder if it's worth noting in javadoc that even if, say, max attempts were reached as a result of this being bubbled out of the activity, the workflow caller will never see this exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other ActivityCompletionException are in client , I agree it is not the ideal package but it is consistent with the rest of the SDK.

the workflow caller will never see this exception.

That is just the basic failure conversion rules for activities https://docs.temporal.io/references/failures#errors-in-activities I don't think we should document it for every SDK exception.

public ActivityPausedException(ActivityInfo info) {
super(info);
}

public ActivityPausedException() {
super();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* Indicates that {@link WorkerFactory#shutdown()} or {@link WorkerFactory#shutdownNow()} was
* called. It is OK to ignore the exception to let activity to complete. It assumes that {@link
* called. It is OK to ignore the exception to let the activity complete. It assumes that {@link
* WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity
* execution time.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ private void sendHeartbeatRequest(Object details) {
metricsScope);
if (status.getCancelRequested()) {
lastException = new ActivityCanceledException(info);
} else if (status.getActivityPaused()) {
lastException = new ActivityPausedException(info);
} else {
lastException = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.activity;

import static org.junit.Assume.assumeTrue;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflow.v1.PendingActivityInfo;
import io.temporal.api.workflowservice.v1.PauseActivityRequest;
import io.temporal.client.ActivityPausedException;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Async;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class ActivityPauseTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowImpl.class)
.setActivityImplementations(new HeartBeatingActivityImpl())
.build();

@Test
public void activityPause() {
assumeTrue(
"Test Server doesn't support activity pause", SDKTestWorkflowRule.useExternalService);

TestWorkflows.TestWorkflowReturnString workflow =
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
Assert.assertEquals("I am stopped by Pause", workflow.execute());
Assert.assertEquals(
1,
WorkflowStub.fromTyped(workflow)
.describe()
.getRawDescription()
.getPendingActivitiesCount());
PendingActivityInfo activityInfo =
WorkflowStub.fromTyped(workflow).describe().getRawDescription().getPendingActivities(0);
Assert.assertTrue(activityInfo.getPaused());
}

public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {

private final TestActivities.TestActivity1 activities =
Workflow.newActivityStub(
TestActivities.TestActivity1.class,
SDKTestOptions.newActivityOptions20sScheduleToClose());

@Override
public String execute() {
Async.function(activities::execute, "");
Workflow.sleep(Duration.ofSeconds(1));
return activities.execute("CompleteOnPause");
}
}

public static class HeartBeatingActivityImpl implements TestActivities.TestActivity1 {
@Override
public String execute(String arg) {
ActivityInfo info = Activity.getExecutionContext().getInfo();
// Have the activity pause itself
Activity.getExecutionContext()
.getWorkflowClient()
.getWorkflowServiceStubs()
.blockingStub()
.pauseActivity(
PauseActivityRequest.newBuilder()
.setNamespace(info.getNamespace())
.setExecution(
WorkflowExecution.newBuilder().setWorkflowId(info.getWorkflowId()).build())
.setId(info.getActivityId())
.build());
while (true) {
try {
Thread.sleep(1000);
// Heartbeat and verify that the correct exception is thrown
Activity.getExecutionContext().heartbeat("1");
} catch (ActivityPausedException pe) {
if (arg.equals("CompleteOnPause")) {
// An activity should be able to succeed if paused
return "I am stopped by Pause";
}
// This will fail the attempt, and the activity will not be retried if not unpaused
throw pe;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
Loading