Skip to content

Heartbeat activity retries first heartbeat data instead of last #1965

@rburgst

Description

@rburgst

Expected Behavior

I have a workflow that schedules activities with 5 elements to process, on every iteration of the 5 elements, I remove the currently processed row and update the heartbeat with the remaining elements, therefore, we call heartbeat with

  1. hearbeat (remaining = 4)
  2. hearbeat(remaining = 3)
  3. heartbeat(remaining=2)
  4. heartbeat(remaining=1)

If I throw an exception in the 4th iteration, then I would expect a retry that calls me with the remaining 1 elements

Actual Behavior

however, I am called with remaining 4)

Steps to Reproduce the Problem

  1. define an activity as follows
package io.temporal.samples.springboot.propagation;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.spring.boot.ActivityImpl;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.*;

import static io.temporal.samples.springboot.propagation.ChunkedWorkflow.*;

@Component
@ActivityImpl(taskQueues = "RowTaskQueue")
public class ChunkProcessingActivityImpl implements ChunkProcessingActivity {
    private static final Logger log = LoggerFactory.getLogger(ChunkProcessingActivityImpl.class);

    @AllArgsConstructor
    @NoArgsConstructor
    public static final class HeartbeatData {
        List<String> remaining;
        ProcessingResult progressSoFar;
    }

    @Override
    public ProcessingResult processRows(List<String> rowId) {

        ActivityExecutionContext context = Activity.getExecutionContext();
        Optional<HeartbeatData> heartbeatDetails = context.getHeartbeatDetails(HeartbeatData.class);

        ProcessingResult result =
                new ProcessingResult();
        result.rowsRead = 0;
        result.rowsWritten = 0;

        HeartbeatData heartbeatData = heartbeatDetails.orElse(new HeartbeatData(rowId, result));
        List<String> remainingUuids = heartbeatData.remaining;
        result = heartbeatData.progressSoFar;
        log.info("propagating {}, heartbeat {}, rows {}", remainingUuids.size(), heartbeatDetails, remainingUuids);

        int index = 0;

        List<String> newRemaining = new ArrayList<>(remainingUuids);
        for (String uuid : remainingUuids) {
            if (index == 4) {
                throw new RuntimeException("foo error for uuid " + uuid);
            }
            newRemaining.remove(uuid);
            ProcessingResult innerResult = processSingleRow(UUID.fromString(uuid));
            log.info("   sending heartbeat with remaining {}, {}", newRemaining.size(), newRemaining);
            result.rowsRead += innerResult.rowsRead;
            result.rowsWritten += innerResult.rowsWritten;
            result.rowsProcessed.addAll(innerResult.rowsProcessed);
            context.heartbeat(new HeartbeatData(newRemaining, result));
            index++;
        }
        log.info("processing {} rows {} … done", remainingUuids.size(), remainingUuids);
        log.info("processing result {}", result);


        return result;
    }

    private ProcessingResult processSingleRow(UUID uuid) {

        ProcessingResult result =
                new ProcessingResult();
        result.rowsRead = 1;
        result.rowsWritten = 1;
        log.info("processing single row {}", uuid);
        result.addRowId(uuid.toString());
        return result;
    }
}
  1. start the server with gradle bootRun
  2. on the hello world page, start the job
  3. watch the spring boot log
2024-01-06T20:17:15.028+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : propagating 5, heartbeat Optional.empty, rows [4ab57ee9-0daf-4e46-a6de-1e0643daeec1, 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.028+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row 4ab57ee9-0daf-4e46-a6de-1e0643daeec1
2024-01-06T20:17:15.028+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 4, [557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd
2024-01-06T20:17:15.049+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 3, [1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row 1350507c-4126-4a0a-868d-d0a96e90c744
2024-01-06T20:17:15.049+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 2, [b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row b61a2443-5b5a-4888-bc37-f3e504340e77
2024-01-06T20:17:15.049+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 1, [56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:15.049+01:00  WARN 4593 --- [temporal-samples] [ce="default": 1] tivityTaskExecutors$ActivityTaskExecutor : Activity failure. ActivityId=646434e9-72c7-313a-8301-214499ef42ea, activityType=ProcessRows, attempt=1

java.lang.RuntimeException: foo error for uuid 56115510-3c5c-4bc8-ae77-e206bdf9f4fb
	at io.temporal.samples.springboot.propagation.ChunkProcessingActivityImpl.processRows(ChunkProcessingActivityImpl.java:68) ~[main/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.22.2.jar:na]
	at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.22.2.jar:na]
	at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:107) ~[temporal-sdk-1.22.2.jar:na]
	at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:124) ~[temporal-sdk-1.22.2.jar:na]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:278) ~[temporal-sdk-1.22.2.jar:na]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:243) ~[temporal-sdk-1.22.2.jar:na]
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:216) ~[temporal-sdk-1.22.2.jar:na]
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105) ~[temporal-sdk-1.22.2.jar:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]


>>>>> here we are back with the first heartbeat details rather than only the single one

2024-01-06T20:17:16.092+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : propagating 4, heartbeat Optional[io.temporal.samples.springboot.propagation.ChunkProcessingActivityImpl$HeartbeatData@7d0ec774], rows [557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.093+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd
2024-01-06T20:17:16.093+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 3, [1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.096+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row 1350507c-4126-4a0a-868d-d0a96e90c744
2024-01-06T20:17:16.096+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 2, [b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.096+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row b61a2443-5b5a-4888-bc37-f3e504340e77
2024-01-06T20:17:16.096+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 1, [56115510-3c5c-4bc8-ae77-e206bdf9f4fb]
2024-01-06T20:17:16.096+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing single row 56115510-3c5c-4bc8-ae77-e206bdf9f4fb
2024-01-06T20:17:16.096+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    :    sending heartbeat with remaining 0, []
2024-01-06T20:17:16.096+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing 4 rows [557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb] … done
2024-01-06T20:17:16.097+01:00  INFO 4593 --- [temporal-samples] [ce="default": 1] i.t.s.s.p.ChunkProcessingActivityImpl    : processing result ChunkedWorkflow.ProcessingResult(rowsRead=5, rowsWritten=5, rowsProcessed=[4ab57ee9-0daf-4e46-a6de-1e0643daeec1, 557d0cb6-e9a9-4a57-a8d7-d261a2bc90fd, 1350507c-4126-4a0a-868d-d0a96e90c744, b61a2443-5b5a-4888-bc37-f3e504340e77, 56115510-3c5c-4bc8-ae77-e206bdf9f4fb])

Specifications

  • Version:1.22.2
  • Platform: mac / java 17 / spring boot 3.2

Sample repo: https://github.com/rburgst/temporal-java-heartbeat-problem
( the springboot sample)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions