Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ private void handleSingleEventLookahead(HistoryEvent event) {
// other state machines because a rejected update produces no event in history.
protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
break;
default:
break;
}
}

Expand Down Expand Up @@ -625,6 +627,9 @@ public List<Message> takeMessages() {
List<Message> result = new ArrayList<>(messageOutbox.size());
result.addAll(messageOutbox);
messageOutbox.clear();
// Remove any finished update protocol state machines. We can't remove them on an event like
// other state machines because a rejected update produces no event in history.
protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.*;
import org.junit.rules.TestWatcher;
import org.junit.rules.Timeout;
import org.junit.runner.Description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,19 +423,19 @@ public String toString() {
/** Represents an accepted update workflow execution request */
static final class UpdateWorkflowExecutionData {
final String id;
final CompletableFuture<UpdateWorkflowExecutionResponse> acceptance;
final CompletableFuture<UpdateWorkflowExecutionResponse> complete;
final CompletableFuture<Boolean> accepted;
final CompletableFuture<Outcome> outcome;
final Request initialRequest;

public UpdateWorkflowExecutionData(
String id,
Request initialRequest,
CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
CompletableFuture<Boolean> accepted,
CompletableFuture<Outcome> outcome) {
this.id = id;
this.initialRequest = initialRequest;
this.acceptance = acceptance;
this.complete = complete;
this.accepted = accepted;
this.outcome = outcome;
}

@Override
Expand Down Expand Up @@ -560,10 +560,10 @@ public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
String updateId,
Request initialRequest,
CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
CompletableFuture<Boolean> accepted,
CompletableFuture<Outcome> outcome) {
return new StateMachine<>(
new UpdateWorkflowExecutionData(updateId, initialRequest, acceptance, complete))
new UpdateWorkflowExecutionData(updateId, initialRequest, accepted, outcome))
.add(NONE, START, STARTED, StateMachines::acceptUpdate)
.add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
}
Expand Down Expand Up @@ -1805,19 +1805,10 @@ private static void acceptUpdate(
if (!ctx.getWorkflowMutableState().isTerminalState()) {
ctx.addEvent(event);
}

UpdateWorkflowExecutionResponse response =
UpdateWorkflowExecutionResponse.newBuilder()
.setUpdateRef(
UpdateRef.newBuilder()
.setWorkflowExecution(ctx.getExecution())
.setUpdateId(data.id))
.setStage(
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED)
.build();

data.acceptance.complete(response);
ctx.onCommit(
(int historySize) -> {
data.accepted.complete(true);
});
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -1849,20 +1840,10 @@ private static void completeUpdate(
if (!ctx.getWorkflowMutableState().isTerminalState()) {
ctx.addEvent(event);
}

UpdateWorkflowExecutionResponse updateResponse =
UpdateWorkflowExecutionResponse.newBuilder()
.setUpdateRef(
UpdateRef.newBuilder()
.setWorkflowExecution(ctx.getExecution())
.setUpdateId(data.id))
.setOutcome(response.getOutcome())
.setStage(
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED)
.build();

data.complete.complete(updateResponse);
ctx.onCommit(
(int historySize) -> {
data.outcome.complete(response.getOutcome());
});
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
Expand Down
Loading