Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2046,11 +2046,15 @@ private static State failActivityTask(
RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
if (request instanceof RespondActivityTaskFailedRequest) {
RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
data.heartbeatDetails = req.getLastHeartbeatDetails();
if (req.hasLastHeartbeatDetails()) {
data.heartbeatDetails = req.getLastHeartbeatDetails();
}
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
} else if (request instanceof RespondActivityTaskFailedByIdRequest) {
RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
data.heartbeatDetails = req.getLastHeartbeatDetails();
if (req.hasLastHeartbeatDetails()) {
data.heartbeatDetails = req.getLastHeartbeatDetails();
}
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
} else {
throw new IllegalArgumentException("Unknown request: " + request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.testserver.functional.activity;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import com.google.protobuf.ByteString;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityInfo;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
import io.temporal.common.RetryOptions;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.failure.ActivityFailure;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testserver.functional.common.TestActivities;
import io.temporal.testserver.functional.common.TestWorkflows;
import io.temporal.workflow.Workflow;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Rule;
import org.junit.Test;

public class ActivityHeartbeat {
private static final ConcurrentLinkedQueue<Optional<Payloads>> activityHeartbeats =
new ConcurrentLinkedQueue<>();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflow.class)
.setActivityImplementations(new TestActivity())
.build();

@Test
public void testActivityHeartbeatNoLastHeartbeatDetails() {
// Test that when last heartbeat details are not set on failure, the test server
// clear the heartbeat details.
String result =
testWorkflowRule.newWorkflowStub(TestWorkflows.WorkflowReturnsString.class).execute();
assertEquals("", result);
assertEquals(2, activityHeartbeats.size());
assertFalse(activityHeartbeats.poll().isPresent());
assertEquals(
"heartbeat details",
DefaultDataConverter.STANDARD_INSTANCE.fromPayloads(
0, activityHeartbeats.poll(), String.class, String.class));
Comment on lines +65 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is the assertion that would have failed without the change because it'd still have 2 in the queue, but even the second would not be present due to overwrite (just kinda proving I read it and understand it heh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this fails without the test server change

}

public static class TestActivity implements TestActivities.ActivityReturnsString {
@Override
public String execute() {
ActivityInfo info = Activity.getExecutionContext().getInfo();
activityHeartbeats.add(info.getHeartbeatDetails());
// Heartbeat with the raw service stub to avoid the SDK keeping track of the heartbeat
Activity.getExecutionContext()
.getWorkflowClient()
.getWorkflowServiceStubs()
.blockingStub()
.recordActivityTaskHeartbeat(
RecordActivityTaskHeartbeatRequest.newBuilder()
.setNamespace(info.getNamespace())
.setTaskToken(ByteString.copyFrom(info.getTaskToken()))
.setDetails(
DefaultDataConverter.STANDARD_INSTANCE.toPayloads("heartbeat details").get())
.build());
throw new IllegalStateException("simulated failure");
}
}

public static class TestWorkflow implements TestWorkflows.WorkflowReturnsString {
@Override
public String execute() {
ActivityOptions options =
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build())
.build();

try {
Workflow.newActivityStub(TestActivities.ActivityReturnsString.class, options).execute();
} catch (ActivityFailure e) {
// Expected
}
return "";
}
}
}
Loading