Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars

IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "gather-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "gather-timeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "requires-reply");

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4296,8 +4296,16 @@ The list of component name patterns you want to track (e.g., tracked-components
<xsd:documentation>
Allows to specify how long the Scatter-Gather will wait for reply Messages for gathering.
By default it will wait indefinitely. Value is specified in milliseconds.
It will be applied only if the 'gather-channel' is specified and it is some blocking channel,
e.g. 'QueueChannel'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="requires-reply" type="xsd:string" use="optional" default="true">
<xsd:annotation>
<xsd:documentation>
Specify whether the Scatter-Gather must return a non-null value. This value is
'true' by default, hence a ReplyRequiredException will be thrown when
the underlying aggregator returns a null value after 'gather-timeout'.
Note, if the 'null' is a case, the 'gather-timeout' should be specified to avoid indefinite wait.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void testAuction() {

Object reaper = this.beanFactory.getBean("reaper");
assertSame(gatherer.getMessageStore(), TestUtils.getPropertyValue(reaper, "messageGroupStore"));
assertTrue(TestUtils.getPropertyValue(scatterGather, "requiresReply", Boolean.class));
}

@Test
Expand Down
221 changes: 217 additions & 4 deletions src/reference/docbook/scatter-gather.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,235 @@
<section id="scatter-gather-introduction">
<title>Introduction</title>
<para>
TBD
Starting with <emphasis>version 4.1</emphasis>, Spring Integration provides an implementation
of the <ulink url="http://www.eaipatterns.com/BroadcastAggregate.html/">Scatter-Gather</ulink>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind removing the trailing on merge? The EIP site gets deal with simple HTMLs 😄 , hence "Resource not found"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Enterprise Integration Pattern. It is a compound endpoint, where the goal is to send a message
to the recipients and aggregate the results. Quoting the EIP Book, it is a component for scenarios like
<emphasis>best quote</emphasis>, when we need to request information from several suppliers
and decide which one provides us with the best term for the requested item.
</para>
<para>
Previously, the pattern could be configured using discrete components, this enhancement brings
more convenient configuration.
</para>
<para>
The <classname>ScatterGatherHandler</classname> is a <emphasis>request-reply</emphasis> endpoint
that combines a
<classname>PublishSubscribeChannel</classname> (or <classname>RecipientListRouter</classname>)
and an <classname>AggregatingMessageHandler</classname>. The request message is sent to the
<code>scatter</code> channel and the <classname>ScatterGatherHandler</classname> waits for the reply
from the aggregator to sends to the <code>outputChannel</code>.
</para>
</section>

<section id="scatter-gather-functionality">
<title>Functionality</title>
<para>
TBD
The <code>Scatter-Gather</code> pattern suggests two scenarios - <emphasis>Auction</emphasis> and
<emphasis>Distribution</emphasis>. In both cases, the <code>aggregation</code> function is the same and
provides all options available for the <classname>AggregatingMessageHandler</classname>. Actually the
<classname>ScatterGatherHandler</classname> just requires an <classname>AggregatingMessageHandler</classname>
as a constructor argument. See <xref linkend="aggregator"/> for more information.
</para>
<para><emphasis>Auction</emphasis></para>
<para>
The <emphasis>Auction</emphasis> <code>Scatter-Gather</code> variant uses
<code>publish-subscribe</code> logic for the request message, where the
<code>scatter</code> channel is a <classname>PublishSubscribeChannel</classname> with
<code>apply-sequence="true"</code>. However, this channel can be any
<interfacename>MessageChannel</interfacename> implementation as is the case with the <code>request-channel</code>
in the <code>ContentEnricher</code> (see <xref linkend="content-enricher"/>) but, in this case, the end-user
should support his own custom <code>correlationStrategy</code> for the <code>aggregation</code> function.
</para>
<para><emphasis>Distribution</emphasis></para>
<para>
The <emphasis>Distribution</emphasis> <code>Scatter-Gather</code> variant is based on the
<classname>RecipientListRouter</classname> (see <xref linkend="router-implementations-recipientlistrouter"/>)
with all available options for the <classname>RecipientListRouter</classname>. This is the second
<classname>ScatterGatherHandler</classname> constructor argument. If you want to rely just on the default
<code>correlationStrategy</code> for the <code>recipient-list-router</code> and the
<code>aggregator</code>, you should specify <code>apply-sequence="true"</code>. Otherwise, a custom
<code>correlationStrategy</code> should be supplied for the <code>aggregator</code>.
Unlike the <classname>PublishSubscribeChannel</classname> (<emphasis>Auction</emphasis>) variant, having a
<code>recipient-list-router</code> <code>selector</code> option, we can <emphasis>filter</emphasis>
target suppliers based on the message. With <code>apply-sequence="true"</code> the default
<code>sequenceSize</code> will be supplied and the <code>aggregator</code> will be able to release the group
correctly. The <emphasis>Distribution</emphasis> option is mutually exclusive with the
<emphasis>Auction</emphasis> option.
</para>
<para>
In both cases, the request (<emphasis>scatter</emphasis>) message is enriched with the
<code>gatherResultChannel</code> <classname>QueueChannel</classname> header, to wait for a reply message from
the <code>aggregator</code>.
</para>
<para>
By default, all suppliers should send their result to the <code>replyChannel</code> header
(usually by omitting the <code>output-channel</code> from the ultimate endpoint).
However, the <code>gatherChannel</code> option is also provided, allowing suppliers to send their
reply to that channel for the aggregation.
</para>
</section>

<section id="scatter-gather-namespace">
<title>Configuring a Scatter-Gather</title>
<title>Configuring a Scatter-Gather Endpoint</title>
<para>
For Java and Annotation configuration, the bean definition for the <code>Scatter-Gather</code>
is:
</para>
<programlisting language="java"><![CDATA[@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}

@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}]]></programlisting>
<para>
TBD
Here, we configure the <classname>RecipientListRouter</classname> <code>distributor</code> bean, with
<code>applySequence="true"</code> and the list of recipient channels. The next bean is for an
<classname>AggregatingMessageHandler</classname>. Finally, we inject both those beans into the
<classname>ScatterGatherHandler</classname> bean definition and mark it as a
<classname>@ServiceActivator</classname> to wire the Scatter-Gather component into the integration flow.
</para>
<para>
Configuring the <code>&lt;scatter-gather&gt;</code> endpoint using the XML namespace:
</para>
<programlisting language="xml"><![CDATA[<scatter-gather
id="" ]]><co id="sg1" linkends="sg1-txt" /><![CDATA[
auto-startup="" ]]><co id="sg2" linkends="sg2-txt" /><![CDATA[
input-channel="" ]]><co id="sg3" linkends="sg3-txt" /><![CDATA[
output-channel="" ]]><co id="sg4" linkends="sg4-txt" /><![CDATA[
scatter-channel="" ]]><co id="sg5" linkends="sg5-txt" /><![CDATA[
gather-channel="" ]]><co id="sg6" linkends="sg6-txt" /><![CDATA[
order="" ]]><co id="sg7" linkends="sg7-txt" /><![CDATA[
phase="" ]]><co id="sg8" linkends="sg8-txt" /><![CDATA[
send-timeout="" ]]><co id="sg9" linkends="sg9-txt" /><![CDATA[
gather-timeout="" ]]><co id="sg10" linkends="sg10-txt" /><![CDATA[
requires-reply="" >]]><co id="sg11" linkends="sg11-txt" /><![CDATA[
<scatterer/> ]]><co id="sg12" linkends="sg12-txt" /><![CDATA[
<gatherer/> ]]><co id="sg13" linkends="sg13-txt" /><![CDATA[
</scatter-gather>]]></programlisting>

<calloutlist>
<callout arearefs="sg1" id="sg1-txt">
<para>
The id of the Endpoint.
The <classname>ScatterGatherHandler</classname> bean is registered with <code>id + '.handler'</code>
alias. The <classname>RecipientListRouter</classname> - with <code>id + '.scatterer'</code>.
And the <classname>AggregatingMessageHandler</classname> with <code>id + '.gatherer'</code>.
<emphasis>Optional</emphasis> (a default id is generated value by <interfacename>BeanFactory</interfacename>).
</para>
</callout>

<callout arearefs="sg2" id="sg2-txt">
<para>Lifecycle attribute signaling if the Endpoint should be started during Application Context
initialization. In addition, the <classname>ScatterGatherHandler</classname> also implements
<interfacename>Lifecycle</interfacename> and starts/stops the <code>gatherEndpoint</code>, which
is created internally if a <code>gather-channel</code> is provided.
<emphasis>Optional</emphasis> (default is <code>true</code>).</para>
</callout>

<callout arearefs="sg3" id="sg3-txt">
<para>The channel to receive request messages to handle them in the <classname>ScatterGatherHandler</classname>.
<emphasis>Required</emphasis>.</para>
</callout>

<callout arearefs="sg4" id="sg4-txt">
<para>The channel to which the Scatter-Gather will send the aggregation
results. <emphasis>Optional (because incoming messages can specify a
reply channel themselves via <code>replyChannel</code> Message Header)</emphasis>.</para>
</callout>

<callout arearefs="sg5" id="sg5-txt">
<para>The channel to send the scatter message for the <emphasis>Auction</emphasis> scenario.
<emphasis>Optional</emphasis>. Mutually exclusive with <code>&lt;scatterer&gt;</code> sub
-element.</para>
</callout>

<callout arearefs="sg6" id="sg6-txt">
<para>
The channel to receive replies from each supplier for the aggregation. is used as the
<code>replyChannel</code> header in the scatter message.
<emphasis>Optional</emphasis>. By default the <classname>FixedSubscriberChannel</classname> is
created.
</para>
</callout>

<callout arearefs="sg7" id="sg7-txt">
<para>Order of this component when more than one handler is subscribed to the same DirectChannel
(use for load balancing purposes).
<emphasis>Optional</emphasis>.</para>
</callout>

<callout arearefs="sg8" id="sg8-txt">
<para>Specify the phase in which the endpoint
should be started and stopped. The startup order proceeds
from lowest to highest, and the shutdown order is the
reverse of that. By default this value is Integer.MAX_VALUE
meaning that this container starts as late as possible and
stops as soon as possible.
<emphasis>Optional</emphasis>.</para>
</callout>

<callout arearefs="sg9" id="sg9-txt">
<para>The timeout interval to wait when sending a reply
<interfacename>Message</interfacename> to the <code>output-channel</code>.
By default the send will block for one second.
It applies only if the output channel has some 'sending' limitations, e.g. a <classname>QueueChannel</classname>
with a fixed 'capacity' and is full. In this case, a <classname>MessageDeliveryException</classname> is thrown.
The <code>send-timeout</code> is ignored in case of <classname>AbstractSubscribableChannel</classname> implementations.
In case of <code>group-timeout(-expression)</code> the <classname>MessageDeliveryException</classname>
from the scheduled expire task leads this task to be rescheduled.
<emphasis>Optional</emphasis>.</para>
</callout>

<callout arearefs="sg10" id="sg10-txt">
<para>Allows you to specify how long the Scatter-Gather will wait for the reply message
before returning. By default it will wait indefinitely. 'null' is returned
if the reply times out.
<emphasis>Optional</emphasis>. Defaults to <code>-1</code> - indefinitely.</para>
</callout>

<callout arearefs="sg11" id="sg11-txt">
<para>
Specify whether the Scatter-Gather must return a non-null value. This value is
<code>true</code> by default, hence a <classname>ReplyRequiredException</classname> will be thrown
when the underlying aggregator returns a null value after <code>gather-timeout</code>.
Note, if the <code>null</code> is a case, the <code>gather-timeout</code> should be specified
to avoid indefinite wait.
</para>
</callout>

<callout arearefs="sg12" id="sg12-txt">
<para>The <code>&lt;recipient-list-router&gt;</code> options.
<emphasis>Optional</emphasis>. Mutually exclusive with <code>scatter-channel</code>
attribute.</para>
</callout>

<callout arearefs="sg12" id="sg12-txt">
<para>The <code>&lt;aggregator&gt;</code> options.
<emphasis>Required</emphasis>. </para>
</callout>

</calloutlist>
</section>

</section>