Skip to content

Commit

Permalink
GH-8685: Re-fetch group after setting condition (#8686)
Browse files Browse the repository at this point in the history
* GH-8685: Re-fetch group after setting condition

Fixes #8685

The `AbstractCorrelatingMessageHandler` updates the group metadata in DB
not only for provided `condition`, but also a `lastModified` field.
A subsequent scheduling for group timeout takes the `lastModified`
to compare with the value in the store after re-fetching group in task.
This does not reflect reality since adding `condition` modifies the data in DB,
but in-memory state remains the same.

* Re-fetch a group from the store in the `AbstractCorrelatingMessageHandler.setGroupConditionIfAny()`.
* Verify expected behavior via new `ConfigurableMongoDbMessageGroupStoreTests.groupIsForceReleaseAfterTimeoutWhenGroupConditionIsSet()`

**Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`**

* * Fix Checkstyle violation in the test
  • Loading branch information
artembilan authored Jul 24, 2023
1 parent 4e464e9 commit f4212d8
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -566,7 +566,7 @@ private boolean processMessageForGroup(Message<?> message, Object correlationKey
this.logger.trace(() -> "Adding message to group [ " + messageGroupToLog + "]");
messageGroup = store(correlationKey, message);

setGroupConditionIfAny(message, messageGroup);
messageGroup = setGroupConditionIfAny(message, messageGroup);

if (this.releaseStrategy.canRelease(messageGroup)) {
Collection<Message<?>> completedMessages = null;
Expand Down Expand Up @@ -605,12 +605,19 @@ private void cancelScheduledFutureIfAny(Object correlationKey, UUID groupIdUuid,
}
}

private void setGroupConditionIfAny(Message<?> message, MessageGroup messageGroup) {
private MessageGroup setGroupConditionIfAny(Message<?> message, MessageGroup messageGroup) {
MessageGroup messageGroupToUse = messageGroup;

if (this.groupConditionSupplier != null) {
String condition = this.groupConditionSupplier.apply(message, messageGroup.getCondition());
this.messageStore.setGroupCondition(messageGroup.getGroupId(), condition);
messageGroup.setCondition(condition);
String condition = this.groupConditionSupplier.apply(message, messageGroupToUse.getCondition());
this.messageStore.setGroupCondition(messageGroupToUse.getGroupId(), condition);
messageGroupToUse = this.messageStore.getMessageGroup(messageGroupToUse.getGroupId());
if (this.sequenceAware) {
messageGroupToUse = new SequenceAwareMessageGroup(messageGroupToUse);
}
}

return messageGroupToUse;
}

protected boolean isExpireGroupsUponCompletion() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,6 +65,29 @@ void testWithAggregatorWithShutdown() {
super.testWithAggregatorWithShutdown("mongo-aggregator-configurable-config.xml");
}

@Test
void groupIsForceReleaseAfterTimeoutWhenGroupConditionIsSet() {
try (var context = new ClassPathXmlApplicationContext("mongo-aggregator-configurable-config.xml", getClass())) {
MessageChannel input = context.getBean("inputChannel", MessageChannel.class);
QueueChannel output = context.getBean("outputChannel", QueueChannel.class);

Message<?> message = MessageBuilder.withPayload("test")
.setSequenceNumber(1)
.setSequenceSize(10)
.setCorrelationId("test")
.build();

input.send(message);

Message<?> receive = output.receive(10_000);

assertThat(receive)
.extracting("payload")
.asList()
.hasSize(1);
}
}

@Test
@Disabled("The performance test. Enough slow. Also needs the release strategy changed to size() == 1000")
void messageGroupStoreLazyLoadPerformance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

<int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoStore"
release-strategy="releaseStrategy"
group-timeout="500"
send-partial-result-on-expiry="true"
group-condition-supplier="conditionSupplier"/>

<util:constant id="releaseStrategy"
Expand Down

0 comments on commit f4212d8

Please sign in to comment.