Skip to content

Commit af19bbf

Browse files
artembilangaryrussell
authored andcommitted
INT-3538: Docs for Scatter-Gather
JIRA: https://jira.spring.io/browse/INT-3538 Phase #1: `Scatter-Gather` S-G Doc Polishing INT-3538 Add `requires-reply` to `scatter-gather` Final S-G Doc Polish
1 parent 6f85509 commit af19bbf

File tree

4 files changed

+229
-6
lines changed

4 files changed

+229
-6
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/xml/ScatterGatherParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
111111

112112
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "gather-channel");
113113
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "gather-timeout");
114+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "requires-reply");
114115

115116
return builder;
116117
}

spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4296,8 +4296,16 @@ The list of component name patterns you want to track (e.g., tracked-components
42964296
<xsd:documentation>
42974297
Allows to specify how long the Scatter-Gather will wait for reply Messages for gathering.
42984298
By default it will wait indefinitely. Value is specified in milliseconds.
4299-
It will be applied only if the 'gather-channel' is specified and it is some blocking channel,
4300-
e.g. 'QueueChannel'.
4299+
</xsd:documentation>
4300+
</xsd:annotation>
4301+
</xsd:attribute>
4302+
<xsd:attribute name="requires-reply" type="xsd:string" use="optional" default="true">
4303+
<xsd:annotation>
4304+
<xsd:documentation>
4305+
Specify whether the Scatter-Gather must return a non-null value. This value is
4306+
'true' by default, hence a ReplyRequiredException will be thrown when
4307+
the underlying aggregator returns a null value after 'gather-timeout'.
4308+
Note, if 'null' is a possibility, the 'gather-timeout' should be specified to avoid an indefinite wait.
43014309
</xsd:documentation>
43024310
</xsd:annotation>
43034311
</xsd:attribute>

spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherParserTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public void testAuction() {
6464

6565
Object reaper = this.beanFactory.getBean("reaper");
6666
assertSame(gatherer.getMessageStore(), TestUtils.getPropertyValue(reaper, "messageGroupStore"));
67+
assertTrue(TestUtils.getPropertyValue(scatterGather, "requiresReply", Boolean.class));
6768
}
6869

6970
@Test

src/reference/docbook/scatter-gather.xml

Lines changed: 217 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,235 @@
55
<section id="scatter-gather-introduction">
66
<title>Introduction</title>
77
<para>
8-
TBD
8+
Starting with <emphasis>version 4.1</emphasis>, Spring Integration provides an implementation
9+
of the <ulink url="http://www.eaipatterns.com/BroadcastAggregate.html">Scatter-Gather</ulink>
10+
Enterprise Integration Pattern. It is a compound endpoint, where the goal is to send a message
11+
to the recipients and aggregate the results. Quoting the EIP Book, it is a component for scenarios like
12+
<emphasis>best quote</emphasis>, when we need to request information from several suppliers
13+
and decide which one provides us with the best term for the requested item.
14+
</para>
15+
<para>
16+
Previously, the pattern could be configured using discrete components, this enhancement brings
17+
more convenient configuration.
18+
</para>
19+
<para>
20+
The <classname>ScatterGatherHandler</classname> is a <emphasis>request-reply</emphasis> endpoint
21+
that combines a
22+
<classname>PublishSubscribeChannel</classname> (or <classname>RecipientListRouter</classname>)
23+
and an <classname>AggregatingMessageHandler</classname>. The request message is sent to the
24+
<code>scatter</code> channel and the <classname>ScatterGatherHandler</classname> waits for the reply
25+
from the aggregator to sends to the <code>outputChannel</code>.
926
</para>
1027
</section>
1128

1229
<section id="scatter-gather-functionality">
1330
<title>Functionality</title>
1431
<para>
15-
TBD
32+
The <code>Scatter-Gather</code> pattern suggests two scenarios - <emphasis>Auction</emphasis> and
33+
<emphasis>Distribution</emphasis>. In both cases, the <code>aggregation</code> function is the same and
34+
provides all options available for the <classname>AggregatingMessageHandler</classname>. Actually the
35+
<classname>ScatterGatherHandler</classname> just requires an <classname>AggregatingMessageHandler</classname>
36+
as a constructor argument. See <xref linkend="aggregator"/> for more information.
37+
</para>
38+
<para><emphasis>Auction</emphasis></para>
39+
<para>
40+
The <emphasis>Auction</emphasis> <code>Scatter-Gather</code> variant uses
41+
<code>publish-subscribe</code> logic for the request message, where the
42+
<code>scatter</code> channel is a <classname>PublishSubscribeChannel</classname> with
43+
<code>apply-sequence="true"</code>. However, this channel can be any
44+
<interfacename>MessageChannel</interfacename> implementation as is the case with the <code>request-channel</code>
45+
in the <code>ContentEnricher</code> (see <xref linkend="content-enricher"/>) but, in this case, the end-user
46+
should support his own custom <code>correlationStrategy</code> for the <code>aggregation</code> function.
47+
</para>
48+
<para><emphasis>Distribution</emphasis></para>
49+
<para>
50+
The <emphasis>Distribution</emphasis> <code>Scatter-Gather</code> variant is based on the
51+
<classname>RecipientListRouter</classname> (see <xref linkend="router-implementations-recipientlistrouter"/>)
52+
with all available options for the <classname>RecipientListRouter</classname>. This is the second
53+
<classname>ScatterGatherHandler</classname> constructor argument. If you want to rely just on the default
54+
<code>correlationStrategy</code> for the <code>recipient-list-router</code> and the
55+
<code>aggregator</code>, you should specify <code>apply-sequence="true"</code>. Otherwise, a custom
56+
<code>correlationStrategy</code> should be supplied for the <code>aggregator</code>.
57+
Unlike the <classname>PublishSubscribeChannel</classname> (<emphasis>Auction</emphasis>) variant, having a
58+
<code>recipient-list-router</code> <code>selector</code> option, we can <emphasis>filter</emphasis>
59+
target suppliers based on the message. With <code>apply-sequence="true"</code> the default
60+
<code>sequenceSize</code> will be supplied and the <code>aggregator</code> will be able to release the group
61+
correctly. The <emphasis>Distribution</emphasis> option is mutually exclusive with the
62+
<emphasis>Auction</emphasis> option.
63+
</para>
64+
<para>
65+
In both cases, the request (<emphasis>scatter</emphasis>) message is enriched with the
66+
<code>gatherResultChannel</code> <classname>QueueChannel</classname> header, to wait for a reply message from
67+
the <code>aggregator</code>.
68+
</para>
69+
<para>
70+
By default, all suppliers should send their result to the <code>replyChannel</code> header
71+
(usually by omitting the <code>output-channel</code> from the ultimate endpoint).
72+
However, the <code>gatherChannel</code> option is also provided, allowing suppliers to send their
73+
reply to that channel for the aggregation.
1674
</para>
1775
</section>
1876

1977
<section id="scatter-gather-namespace">
20-
<title>Configuring a Scatter-Gather</title>
78+
<title>Configuring a Scatter-Gather Endpoint</title>
79+
<para>
80+
For Java and Annotation configuration, the bean definition for the <code>Scatter-Gather</code>
81+
is:
82+
</para>
83+
<programlisting language="java"><![CDATA[@Bean
84+
public MessageHandler distributor() {
85+
RecipientListRouter router = new RecipientListRouter();
86+
router.setApplySequence(true);
87+
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
88+
distributionChannel3()));
89+
return router;
90+
}
91+
92+
@Bean
93+
public MessageHandler gatherer() {
94+
return new AggregatingMessageHandler(
95+
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
96+
new SimpleMessageStore(),
97+
new HeaderAttributeCorrelationStrategy(
98+
IntegrationMessageHeaderAccessor.CORRELATION_ID),
99+
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
100+
}
101+
102+
@Bean
103+
@ServiceActivator(inputChannel = "distributionChannel")
104+
public MessageHandler scatterGatherDistribution() {
105+
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
106+
handler.setOutputChannel(output());
107+
return handler;
108+
}]]></programlisting>
21109
<para>
22-
TBD
110+
Here, we configure the <classname>RecipientListRouter</classname> <code>distributor</code> bean, with
111+
<code>applySequence="true"</code> and the list of recipient channels. The next bean is for an
112+
<classname>AggregatingMessageHandler</classname>. Finally, we inject both those beans into the
113+
<classname>ScatterGatherHandler</classname> bean definition and mark it as a
114+
<classname>@ServiceActivator</classname> to wire the Scatter-Gather component into the integration flow.
23115
</para>
116+
<para>
117+
Configuring the <code>&lt;scatter-gather&gt;</code> endpoint using the XML namespace:
118+
</para>
119+
<programlisting language="xml"><![CDATA[<scatter-gather
120+
id="" ]]><co id="sg1" linkends="sg1-txt" /><![CDATA[
121+
auto-startup="" ]]><co id="sg2" linkends="sg2-txt" /><![CDATA[
122+
input-channel="" ]]><co id="sg3" linkends="sg3-txt" /><![CDATA[
123+
output-channel="" ]]><co id="sg4" linkends="sg4-txt" /><![CDATA[
124+
scatter-channel="" ]]><co id="sg5" linkends="sg5-txt" /><![CDATA[
125+
gather-channel="" ]]><co id="sg6" linkends="sg6-txt" /><![CDATA[
126+
order="" ]]><co id="sg7" linkends="sg7-txt" /><![CDATA[
127+
phase="" ]]><co id="sg8" linkends="sg8-txt" /><![CDATA[
128+
send-timeout="" ]]><co id="sg9" linkends="sg9-txt" /><![CDATA[
129+
gather-timeout="" ]]><co id="sg10" linkends="sg10-txt" /><![CDATA[
130+
requires-reply="" >]]><co id="sg11" linkends="sg11-txt" /><![CDATA[
131+
<scatterer/> ]]><co id="sg12" linkends="sg12-txt" /><![CDATA[
132+
<gatherer/> ]]><co id="sg13" linkends="sg13-txt" /><![CDATA[
133+
</scatter-gather>]]></programlisting>
134+
135+
<calloutlist>
136+
<callout arearefs="sg1" id="sg1-txt">
137+
<para>
138+
The id of the Endpoint.
139+
The <classname>ScatterGatherHandler</classname> bean is registered with <code>id + '.handler'</code>
140+
alias. The <classname>RecipientListRouter</classname> - with <code>id + '.scatterer'</code>.
141+
And the <classname>AggregatingMessageHandler</classname> with <code>id + '.gatherer'</code>.
142+
<emphasis>Optional</emphasis> (a default id is generated value by <interfacename>BeanFactory</interfacename>).
143+
</para>
144+
</callout>
145+
146+
<callout arearefs="sg2" id="sg2-txt">
147+
<para>Lifecycle attribute signaling if the Endpoint should be started during Application Context
148+
initialization. In addition, the <classname>ScatterGatherHandler</classname> also implements
149+
<interfacename>Lifecycle</interfacename> and starts/stops the <code>gatherEndpoint</code>, which
150+
is created internally if a <code>gather-channel</code> is provided.
151+
<emphasis>Optional</emphasis> (default is <code>true</code>).</para>
152+
</callout>
153+
154+
<callout arearefs="sg3" id="sg3-txt">
155+
<para>The channel to receive request messages to handle them in the <classname>ScatterGatherHandler</classname>.
156+
<emphasis>Required</emphasis>.</para>
157+
</callout>
158+
159+
<callout arearefs="sg4" id="sg4-txt">
160+
<para>The channel to which the Scatter-Gather will send the aggregation
161+
results. <emphasis>Optional (because incoming messages can specify a
162+
reply channel themselves via <code>replyChannel</code> Message Header)</emphasis>.</para>
163+
</callout>
164+
165+
<callout arearefs="sg5" id="sg5-txt">
166+
<para>The channel to send the scatter message for the <emphasis>Auction</emphasis> scenario.
167+
<emphasis>Optional</emphasis>. Mutually exclusive with <code>&lt;scatterer&gt;</code> sub
168+
-element.</para>
169+
</callout>
170+
171+
<callout arearefs="sg6" id="sg6-txt">
172+
<para>
173+
The channel to receive replies from each supplier for the aggregation. is used as the
174+
<code>replyChannel</code> header in the scatter message.
175+
<emphasis>Optional</emphasis>. By default the <classname>FixedSubscriberChannel</classname> is
176+
created.
177+
</para>
178+
</callout>
179+
180+
<callout arearefs="sg7" id="sg7-txt">
181+
<para>Order of this component when more than one handler is subscribed to the same DirectChannel
182+
(use for load balancing purposes).
183+
<emphasis>Optional</emphasis>.</para>
184+
</callout>
185+
186+
<callout arearefs="sg8" id="sg8-txt">
187+
<para>Specify the phase in which the endpoint
188+
should be started and stopped. The startup order proceeds
189+
from lowest to highest, and the shutdown order is the
190+
reverse of that. By default this value is Integer.MAX_VALUE
191+
meaning that this container starts as late as possible and
192+
stops as soon as possible.
193+
<emphasis>Optional</emphasis>.</para>
194+
</callout>
195+
196+
<callout arearefs="sg9" id="sg9-txt">
197+
<para>The timeout interval to wait when sending a reply
198+
<interfacename>Message</interfacename> to the <code>output-channel</code>.
199+
By default the send will block for one second.
200+
It applies only if the output channel has some 'sending' limitations, e.g. a <classname>QueueChannel</classname>
201+
with a fixed 'capacity' and is full. In this case, a <classname>MessageDeliveryException</classname> is thrown.
202+
The <code>send-timeout</code> is ignored in case of <classname>AbstractSubscribableChannel</classname> implementations.
203+
In case of <code>group-timeout(-expression)</code> the <classname>MessageDeliveryException</classname>
204+
from the scheduled expire task leads this task to be rescheduled.
205+
<emphasis>Optional</emphasis>.</para>
206+
</callout>
207+
208+
<callout arearefs="sg10" id="sg10-txt">
209+
<para>Allows you to specify how long the Scatter-Gather will wait for the reply message
210+
before returning. By default it will wait indefinitely. 'null' is returned
211+
if the reply times out.
212+
<emphasis>Optional</emphasis>. Defaults to <code>-1</code> - indefinitely.</para>
213+
</callout>
214+
215+
<callout arearefs="sg11" id="sg11-txt">
216+
<para>
217+
Specify whether the Scatter-Gather must return a non-null value. This value is
218+
<code>true</code> by default, hence a <classname>ReplyRequiredException</classname> will be thrown
219+
when the underlying aggregator returns a null value after <code>gather-timeout</code>.
220+
Note, if <code>null</code> is a possibility, the <code>gather-timeout</code> should be specified
221+
to avoid an indefinite wait.
222+
</para>
223+
</callout>
224+
225+
<callout arearefs="sg12" id="sg12-txt">
226+
<para>The <code>&lt;recipient-list-router&gt;</code> options.
227+
<emphasis>Optional</emphasis>. Mutually exclusive with <code>scatter-channel</code>
228+
attribute.</para>
229+
</callout>
230+
231+
<callout arearefs="sg13" id="sg13-txt">
232+
<para>The <code>&lt;aggregator&gt;</code> options.
233+
<emphasis>Required</emphasis>. </para>
234+
</callout>
235+
236+
</calloutlist>
24237
</section>
25238

26239
</section>

0 commit comments

Comments
 (0)