-
Notifications
You must be signed in to change notification settings - Fork 192
Closed
Description
Expected Behavior
I have a workflow that schedules activities with 5 elements to process, on every iteration of the 5 elements, I remove the currently processed row and update the heartbeat with the remaining elements, therefore, we call heartbeat with
- hearbeat (remaining = 4)
- hearbeat(remaining = 3)
- heartbeat(remaining=2)
- heartbeat(remaining=1)
If I throw an exception in the 4th iteration, then I would expect a retry that calls me with the remaining 1 elements
Actual Behavior
however, I am called with remaining 4)
Steps to Reproduce the Problem
- define an activity as follows
package io.temporal.samples.springboot.propagation;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.spring.boot.ActivityImpl;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
import static io.temporal.samples.springboot.propagation.ChunkedWorkflow.*;
@Component
@ActivityImpl(taskQueues = "RowTaskQueue")
public class ChunkProcessingActivityImpl implements ChunkProcessingActivity {
private static final Logger log = LoggerFactory.getLogger(ChunkProcessingActivityImpl.class);
@AllArgsConstructor
@NoArgsConstructor
public static final class HeartbeatData {
List<String> remaining;
ProcessingResult progressSoFar;
}
@Override
public ProcessingResult processRows(List<String> rowId) {
ActivityExecutionContext context = Activity.getExecutionContext();
Optional<HeartbeatData> heartbeatDetails = context.getHeartbeatDetails(HeartbeatData.class);
ProcessingResult result =
new ProcessingResult();
result.rowsRead = 0;
result.rowsWritten = 0;
HeartbeatData heartbeatData = heartbeatDetails.orElse(new HeartbeatData(rowId, result));
List<String> remainingUuids = heartbeatData.remaining;
result = heartbeatData.progressSoFar;
log.info("propagating {}, heartbeat {}, rows {}", remainingUuids.size(), heartbeatDetails, remainingUuids);
int index = 0;
List<String> newRemaining = new ArrayList<>(remainingUuids);
for (String uuid : remainingUuids) {
if (index == 4) {
throw new RuntimeException("foo error for uuid " + uuid);
}
newRemaining.remove(uuid);
ProcessingResult innerResult = processSingleRow(UUID.fromString(uuid));
log.info(" sending heartbeat with remaining {}, {}", newRemaining.size(), newRemaining);
result.rowsRead += innerResult.rowsRead;
result.rowsWritten += innerResult.rowsWritten;
result.rowsProcessed.addAll(innerResult.rowsProcessed);
context.heartbeat(new HeartbeatData(newRemaining, result));
index++;
}
log.info("processing {} rows {} … done", remainingUuids.size(), remainingUuids);
log.info("processing result {}", result);
return result;
}
private ProcessingResult processSingleRow(UUID uuid) {
ProcessingResult result =
new ProcessingResult();
result.rowsRead = 1;
result.rowsWritten = 1;
log.info("processing single row {}", uuid);
result.addRowId(uuid.toString());
return result;
}
}- start the server with
gradle bootRun - on the hello world page, start the job
- watch the spring boot log
2024-01-06T20:17:15.028+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : propagating 5, heartbeat Optional.empty, rows [4ab57ee9-0daf-4e46-a6de-1e0643daeec1, 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.028+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row 4ab57ee9-0daf-4e46-a6de-1e0643daeec1
2024-01-06T20:17:15.028+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 4, [557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd
2024-01-06T20:17:15.049+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 3, [1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row 1350507c-4126-4a0a-868d-d0a96e90c744
2024-01-06T20:17:15.049+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 2, [b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row b61a2443-5b5a-4888-bc37-f3e504340e77
2024-01-06T20:17:15.049+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 1, [56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00 WARN 4593 --- [temporal-samples] [ce="default": 1] tivityTaskExecutors$ActivityTaskExecutor : Activity failure. ActivityId=646434e9-72c7-313a-8301-214499ef42ea, activityType=ProcessRows, attempt=1
java.lang.RuntimeException: foo error for uuid 56115510-3c5c-4bc8-ae77-e206bdf9f4fb
at io.temporal.samples.springboot.propagation.ChunkProcessingActivityImpl.processRows(ChunkProcessingActivityImpl.java:68) ~[main/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.22.2.jar:na]
at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.22.2.jar:na]
at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:107) ~[temporal-sdk-1.22.2.jar:na]
at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:124) ~[temporal-sdk-1.22.2.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:278) ~[temporal-sdk-1.22.2.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:243) ~[temporal-sdk-1.22.2.jar:na]
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:216) ~[temporal-sdk-1.22.2.jar:na]
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105) ~[temporal-sdk-1.22.2.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
>>>>> here we are back with the first heartbeat details rather than only the single one
2024-01-06T20:17:16.092+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : propagating 4, heartbeat Optional[io.temporal.samples.springboot.propagation.ChunkProcessingActivityImpl$HeartbeatData@7d0ec774], rows [557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.093+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd
2024-01-06T20:17:16.093+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 3, [1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.096+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row 1350507c-4126-4a0a-868d-d0a96e90c744
2024-01-06T20:17:16.096+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 2, [b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.096+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row b61a2443-5b5a-4888-bc37-f3e504340e77
2024-01-06T20:17:16.096+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 1, [56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.096+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing single row 56115510-3c5c-4bc8-ae77-e206bdf9f4fb
2024-01-06T20:17:16.096+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : sending heartbeat with remaining 0, []
2024-01-06T20:17:16.096+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing 4 rows [557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb] … done
2024-01-06T20:17:16.097+01:00 INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl : processing result ChunkedWorkflow.ProcessingResult(rowsRead=5, rowsWritten=5, rowsProcessed=[4ab57ee9-0daf-4e46-a6de-1e0643daeec1, 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb])
Specifications
- Version:1.22.2
- Platform: mac / java 17 / spring boot 3.2
Sample repo: https://github.com/rburgst/temporal-java-heartbeat-problem
( the springboot sample)
Metadata
Metadata
Assignees
Labels
No labels