Skip to content

Fixing ListenTask #521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions api/src/main/resources/schema/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,6 @@ $defs:
description: Defines the properties of event to emit.
required: [ source, type ]
additionalProperties: true
cc:
$ref: '#/$defs/endpoint'
title: EmitCarbonCopyDefinition
description: Defines an additional endpoint, if any, to publish an event's carbon copy to.
required: [ event ]
forTask:
type: object
Expand Down Expand Up @@ -1343,7 +1339,7 @@ $defs:
- properties:
until: false
title: AnyEventUntilConsumed
required: [ any ]
required: [ any ]
- title: OneEventConsumptionStrategy
properties:
one:
Expand Down Expand Up @@ -1717,20 +1713,20 @@ $defs:
- properties:
amount:
type: integer
title: AsyncApiMessageConsumptionPolicyAmount
description: The amount of (filtered) messages to consume before disposing of the subscription.
title: AsyncApiMessageConsumptionPolicyAmount
required: [ amount ]
- properties:
while:
$ref: '#/$defs/runtimeExpression'
title: AsyncApiMessageConsumptionPolicyWhile
description: A runtime expression evaluated after each consumed (filtered) message to decide if message consumption should continue.
title: AsyncApiMessageConsumptionPolicyWhile
required: [ while ]
- properties:
until:
$ref: '#/$defs/runtimeExpression'
title: AsyncApiMessageConsumptionPolicyUntil
description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
title: AsyncApiMessageConsumptionPolicyUntil
required: [ until ]
subscriptionIterator:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,16 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class ListenExecutor extends RegularTaskExecutor<ListenTask> {

protected final EventRegistrationBuilderCollection regBuilders;
protected final EventRegistrationBuilderCollection untilRegBuilders;
protected final Optional<WorkflowFilter> until;
protected final Optional<TaskExecutor<?>> loop;
protected final Function<CloudEvent, JsonNode> converter;
protected final EventConsumer eventConsumer;
protected final AtomicBoolean untilEvent = new AtomicBoolean(true);

private static record EventRegistrationBuilderCollection(
Collection<EventRegistrationBuilder> registrations, boolean isAnd) {}
Expand Down Expand Up @@ -177,22 +173,37 @@ protected void internalProcessCe(
arrayNode.add(node);
future.complete(node);
}

@Override
protected CompletableFuture<?> combine(CompletableFuture<JsonNode>[] completables) {
return CompletableFuture.allOf(completables);
}
}

public static class OrListenExecutor extends ListenExecutor {

private final Optional<WorkflowFilter> until;
private final EventRegistrationBuilderCollection untilRegBuilders;

public OrListenExecutor(ListenExecutorBuilder builder) {
super(builder);
this.until = Optional.ofNullable(builder.until);
this.untilRegBuilders = builder.untilRegistrations;
}

@Override
protected CompletableFuture<?> combine(CompletableFuture<JsonNode>[] completables) {
return CompletableFuture.anyOf(completables);
protected <T> CompletableFuture<?> buildFuture(
EventRegistrationBuilderCollection regCollection,
Collection<EventRegistration> registrations,
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
CompletableFuture<?> combinedFuture =
super.buildFuture(regCollection, registrations, consumer);
if (untilRegBuilders != null) {
Collection<EventRegistration> untilRegistrations = new ArrayList<>();
CompletableFuture<?> untilFuture =
combine(untilRegBuilders, untilRegistrations, (ce, f) -> f.complete(null));
untilFuture.thenAccept(
v -> {
combinedFuture.complete(null);
untilRegistrations.forEach(reg -> eventConsumer.unregister(reg));
});
}
return combinedFuture;
}

protected void internalProcessCe(
Expand All @@ -206,14 +217,12 @@ protected void internalProcessCe(
|| until
.filter(u -> u.apply(workflow, taskContext, arrayNode).asBoolean())
.isPresent())
&& untilEvent.get()) {
&& untilRegBuilders == null) {
future.complete(arrayNode);
}
}
}

protected abstract CompletableFuture<?> combine(CompletableFuture<JsonNode>[] completables);

protected abstract void internalProcessCe(
JsonNode node,
ArrayNode arrayNode,
Expand All @@ -226,48 +235,37 @@ protected CompletableFuture<JsonNode> internalExecute(
WorkflowContext workflow, TaskContext taskContext) {
ArrayNode output = JsonUtils.mapper().createArrayNode();
Collection<EventRegistration> registrations = new ArrayList<>();
if (untilRegBuilders != null) {
untilEvent.set(false);
}
CompletableFuture<?> combinedFuture =
combine(
toCompletables(
regBuilders,
registrations,
(ce, future) ->
processCe(converter.apply(ce), output, workflow, taskContext, future)));
CompletableFuture<JsonNode> resultFuture =
combinedFuture.thenApply(
return buildFuture(
regBuilders,
registrations,
(BiConsumer<CloudEvent, CompletableFuture<JsonNode>>)
((ce, future) ->
processCe(converter.apply(ce), output, workflow, taskContext, future)))
.thenApply(
v -> {
registrations.forEach(reg -> eventConsumer.unregister(reg));
return output;
});
if (untilRegBuilders != null) {
Collection<EventRegistration> untilRegistrations = new ArrayList<>();
CompletableFuture<?>[] futures =
toCompletables(
untilRegBuilders, untilRegistrations, (ce, future) -> future.complete(null));
CompletableFuture<?> untilFuture =
untilRegBuilders.isAnd()
? CompletableFuture.allOf(futures)
: CompletableFuture.anyOf(futures);
untilFuture.thenAccept(
v -> {
untilEvent.set(true);
combinedFuture.complete(null);
untilRegistrations.forEach(reg -> eventConsumer.unregister(reg));
});
}
return resultFuture;
}

private <T> CompletableFuture<T>[] toCompletables(
protected <T> CompletableFuture<?> buildFuture(
EventRegistrationBuilderCollection regCollection,
Collection<EventRegistration> registrations,
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
return combine(regCollection, registrations, consumer);
}

protected final <T> CompletableFuture<?> combine(
EventRegistrationBuilderCollection regCollection,
Collection<EventRegistration> registrations,
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
return regCollection.registrations().stream()
.map(reg -> toCompletable(reg, registrations, consumer))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture<T>[] futures =
regCollection.registrations().stream()
.map(reg -> toCompletable(reg, registrations, consumer))
.toArray(size -> new CompletableFuture[size]);
return regCollection.isAnd()
? CompletableFuture.allOf(futures)
: CompletableFuture.anyOf(futures);
}

private <T> CompletableFuture<T> toCompletable(
Expand Down Expand Up @@ -307,9 +305,7 @@ protected ListenExecutor(ListenExecutorBuilder builder) {
super(builder);
this.eventConsumer = builder.application.eventConsumer();
this.regBuilders = builder.registrations;
this.until = Optional.ofNullable(builder.until);
this.loop = Optional.ofNullable(builder.loop);
this.converter = builder.converter;
this.untilRegBuilders = builder.untilRegistrations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ void testUntilConsumed() throws IOException {
emitOutDefinition.instance(Map.of()).start().join();
assertThat(future).isCompleted();
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED);
assertThat(waitingInstance.outputAsJsonNode()).isEqualTo(temperature());
}

private static Stream<Arguments> eventListenerParameters() {
Expand Down Expand Up @@ -106,4 +107,11 @@ private static JsonNode doctor() {
node.put("isSick", true);
return mapper.createArrayNode().add(node);
}

private static JsonNode temperature() {
ObjectMapper mapper = JsonUtils.mapper();
ObjectNode node = mapper.createObjectNode();
node.put("temperature", 39);
return mapper.createArrayNode().add(node);
}
}
Loading