Skip to content

Aggregator: discardChannel() to have an optional flag to discard the whole group, instead of individual messages #9754

Closed
@shintasmith

Description

@shintasmith

Expected Behavior

We are using Spring Integration 6.3.3 Aggregator pattern to group messages together. We are setting these:

        return IntegrationFlow.from(petStoreSubscriptionChannel)
                .enrichHeaders(spec -> spec.header("petStore.FlowName", "PetStoreIntake"))
                .aggregate(aggregatorSpec -> aggregatorSpec
                        // .messageStore(jdbcMessageStore)
                        .outputProcessor(petOutputProcessor)
                        .expireGroupsUponCompletion(true)
                        .groupTimeout(300 * 1000)
                        .sendPartialResultOnExpiry(true)
                        .correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
                        .releaseStrategy(group -> group.size() >= 3))
                .handle(petGroupHandler, "handle")
                .get();

(Link to full repo: here, link to IntegrationFlow where the Aggregator spec is defined)

We have a need to process a partial/incomplete group differently than a complete group. We want to set .sendPartialResultOnExpiry(false) and a discard channel for processing the incomplete group. But the problem is the discard channel is receiving the individual Messages of the discarded group, instead of receiving a MessageGroup.

We would like the ability to receive discarded messages as a partial/incomplete group.

Current Behavior

Currently, in AbstractCorrelatingMessageHandler.expireGroup(), messages are discarded individually, instead of as a group.

https://github.com/spring-projects/spring-integration/blob/6.3.x/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L878-L879

Context

Our unit of work in the handler (Service Activator) is really a group of messages. We do not want to be processing the discarded messages individually because they could be very many.

The current workaround we are using right now is to call:

group.complete()

in our Release Strategy. Then implement a MessageGroupProcessor that checks on group.isComplete() and insert a boolean flag in the header. The Service Activator checks this header. If the boolean flag indicates the group is complete, then normal processing happens. Otherwise, it goes into an different path to handle the incomplete group (such as logging and alerting).

Our issue is also posted in StackOverflow here, where Artem suggested to open a GH issue.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions