Description
In what version(s) of Spring Integration are you seeing this issue?
6.1.0
To Reproduce
Use MongoDbMessageStore
or ConfigurableMongoDbMessageStore
as the MessageGroupStore
implementation.
Create aggregator:
- Provide the
groupConditionSupplier
for theAbstractCorrelatingMessageHandler
. - Provide the
groupTimeoutExpression
for theAbstractCorrelatingMessageHandler
with a positive value of 5-10 seconds.
Describe the bug
Let's take a look at this function from AbstractCorrelatingMessageHandler:
private void processForceRelease(Object groupId, long timestamp, long lastModified) {
MessageGroup messageGroup = this.messageStore.getMessageGroup(groupId);
if (messageGroup.getTimestamp() == timestamp && messageGroup.getLastModified() == lastModified) {
this.forceReleaseProcessor.processMessageGroup(messageGroup);
}
}
To run the forceReleaseProcessor
the timestamp
and lastModified
must be the same as the corresponding properties from the message group that was fetched at the beginning of this function.
The timestamp
and lastModified
parameters come from the message group that was fetched in the processMessageForGroup
function.
The problem is that after fetching the message group in the processMessageForGroup
we do a call to setGroupConditionIfAny(message, messageGroup)
. And if we are using the group condition, this function will call messageStore.setGroupCondition()
which will update the message group condition and its lastModifiedTime
property in MongoDB implementation.
public void setGroupCondition(Object groupId, String condition) {
updateGroup(groupId, lastModifiedUpdate().set("condition", condition));
}
So we end up in a situation where the lastModifiedTime
is newer in the database but the message group fetched in the processMessageForGroup
has an old value. This means that the condition in the forceReleaseProcessor
will never be true if we are using the group condition.
Expected behavior
The message group instance in the processMessageForGroup
should be updated after a call to setGroupConditionIfAny
.