|
5 | 5 | <section id="scatter-gather-introduction"> |
6 | 6 | <title>Introduction</title> |
7 | 7 | <para> |
8 | | - TBD |
| 8 | + Starting with <emphasis>version 4.1</emphasis> the Spring Integration provides the |
| 9 | + <ulink url="http://www.eaipatterns.com/BroadcastAggregate.html/">Scatter-Gather</ulink> |
| 10 | + Enterprise Integration Pattern implementation. It is a compound endpoint, which goal is to send a message |
| 11 | + to the recipients and aggregate results. Quoting EIP Book, it is a component for scenarios like |
| 12 | + <emphasis>best quote</emphasis>, when we need to request an info from several suppliers |
| 13 | + and decide which one provides us with the best term for the requested item. |
| 14 | + </para> |
| 15 | + <para> |
| 16 | + The <classname>ScatterGatherHandler</classname> is a <emphasis>request-reply</emphasis> combination of the |
| 17 | + <classname>PublishSubscribeChannel</classname> (or <classname>RecipientListRouter</classname>) |
| 18 | + and <classname>AggregatingMessageHandler</classname>. The request message is send to the |
| 19 | + <code>scatter</code> channel and <classname>ScatterGatherHandler</classname> waits for the reply |
| 20 | + from the aggregator to send it further to the <code>outputChannel</code>. |
9 | 21 | </para> |
10 | 22 | </section> |
11 | 23 |
|
12 | 24 | <section id="scatter-gather-functionality"> |
13 | 25 | <title>Functionality</title> |
14 | 26 | <para> |
15 | | - TBD |
| 27 | + The <code>Scatter-Gather</code> suggests two scenarios - <emphasis>Auction</emphasis> and |
| 28 | + <emphasis>Distribution</emphasis>. In both cases the <code>aggregation</code> function is the same and |
| 29 | + provides all options available for the <classname>AggregatingMessageHandler</classname>. Actually the |
| 30 | + <classname>ScatterGatherHandler</classname> just requires <classname>AggregatingMessageHandler</classname> |
| 31 | + as a constructor argument. See <xref linkend="aggregator"/> for more information. |
| 32 | + </para> |
| 33 | + <para><emphasis>Auction</emphasis></para> |
| 34 | + <para> |
| 35 | + The <emphasis>Auction</emphasis> <code>Scatter-Gather</code> variant presumes the |
| 36 | + <code>publish-subscribe</code> logic for the request message, when in classical case the |
| 37 | + <code>scatter</code> channel is <classname>PublishSubscribeChannel</classname> with |
| 38 | + <code>apply-sequence="true"</code>. However this channel can be any |
| 39 | + <interfacename>MessageChannel</interfacename> implementation as it is with <code>request-channel</code> |
| 40 | + in the <code>ContentEnricher</code> (see <xref linkend="content-enricher"/>), but in this case end-user |
| 41 | + should support his own custom <code>correlationStrategy</code> for the <code>aggregation</code> function. |
| 42 | + </para> |
| 43 | + <para><emphasis>Distribution</emphasis></para> |
| 44 | + <para> |
| 45 | + The <emphasis>Distribution</emphasis> <code>Scatter-Gather</code> variant is based on the |
| 46 | + <classname>RecipientListRouter</classname> (see <xref linkend="router-implementations-recipientlistrouter"/>) |
| 47 | + with all available options for the <classname>RecipientListRouter</classname>. This is the second |
| 48 | + <classname>ScatterGatherHandler</classname> constructor argument. If you want to rely just on the default |
| 49 | + <code>correlationStrategy</code> for the <code>recipient-list-router</code> and the next |
| 50 | + <code>aggregator</code> you should specify <code>apply-sequence="true"</code>. Otherwise the custom |
| 51 | + <code>correlationStrategy</code> should be supplied for the <code>aggregator</code>. |
| 52 | + Unlike the <classname>PublishSubscribeChannel</classname> (<emphasis>Auction</emphasis>), having the |
| 53 | + <code>recipient-list-router</code> <code>selector</code> option we can <emphasis>filter</emphasis> |
| 54 | + target suppliers from message to message. Anyway with <code>apply-sequence="true"</code> the default |
| 55 | + <code>sequenceSize</code> will be supplied and <code>aggregator</code> will be able to release group |
| 56 | + correctly.The <emphasis>Distribution</emphasis> is mutually exclusive with <emphasis>Auction</emphasis>. |
| 57 | + </para> |
| 58 | + <para> |
| 59 | + In both cases the request (<emphasis>scatter</emphasis>) message is enriched with the |
| 60 | + <code>gatherResultChannel</code> <classname>QueueChannel</classname> header to wait a reply message from |
| 61 | + the <code>aggregator</code>. |
| 62 | + </para> |
| 63 | + <para> |
| 64 | + By default default all suppliers should send their result to the <code>replyChannel</code> from header, |
| 65 | + however the <code>gatherChannel</code> option is also provided, assuming that suppliers will send their |
| 66 | + reply to that channel for the aggregation. |
16 | 67 | </para> |
17 | 68 | </section> |
18 | 69 |
|
19 | 70 | <section id="scatter-gather-namespace"> |
20 | 71 | <title>Configuring a Scatter-Gather</title> |
21 | 72 | <para> |
22 | | - TBD |
| 73 | + For Java and Annotations configuration the bean definition for the <code>Scatter-Gather</code> |
| 74 | + transparently: |
23 | 75 | </para> |
| 76 | + <programlisting language="java"><![CDATA[@Bean |
| 77 | +public MessageHandler distributor() { |
| 78 | + RecipientListRouter router = new RecipientListRouter(); |
| 79 | + router.setApplySequence(true); |
| 80 | + router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(), |
| 81 | + distributionChannel3())); |
| 82 | + return router; |
| 83 | +} |
| 84 | +
|
| 85 | +@Bean |
| 86 | +public MessageHandler gatherer() { |
| 87 | + return new AggregatingMessageHandler( |
| 88 | + new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"), |
| 89 | + new SimpleMessageStore(), |
| 90 | + new HeaderAttributeCorrelationStrategy( |
| 91 | + IntegrationMessageHeaderAccessor.CORRELATION_ID), |
| 92 | + new ExpressionEvaluatingReleaseStrategy("size() == 2")); |
| 93 | +} |
| 94 | +
|
| 95 | +@Bean |
| 96 | +@ServiceActivator(inputChannel = "distributionChannel") |
| 97 | +public MessageHandler scatterGatherDistribution() { |
| 98 | + ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer()); |
| 99 | + handler.setOutputChannel(output()); |
| 100 | + return handler; |
| 101 | +}]]></programlisting> |
| 102 | + <para> |
| 103 | + We configure here the <classname>RecipientListRouter</classname> <code>distributor</code> bean, with the |
| 104 | + <code>applySequence="true"</code> option and the list of recipient channels. The next bean is for |
| 105 | + <classname>AggregatingMessageHandler</classname>. In the end we inject both those beans to the |
| 106 | + <classname>ScatterGatherHandler</classname> bean definition and mark it with |
| 107 | + <classname>@ServiceActivator</classname> to wire the Scatter-Gather component to the integration flow. |
| 108 | + </para> |
| 109 | + <para> |
| 110 | + The whole list of <code><scatter-gather></code> option for the xml configuration: |
| 111 | + </para> |
| 112 | + <programlisting language="xml"><![CDATA[<scatter-gather |
| 113 | + id="" ]]><co id="sg1" linkends="sg1-txt" /><![CDATA[ |
| 114 | + auto-startup="" ]]><co id="sg2" linkends="sg2-txt" /><![CDATA[ |
| 115 | + input-channel="" ]]><co id="sg3" linkends="sg3-txt" /><![CDATA[ |
| 116 | + output-channel="" ]]><co id="sg4" linkends="sg4-txt" /><![CDATA[ |
| 117 | + scatter-channel="" ]]><co id="sg5" linkends="sg5-txt" /><![CDATA[ |
| 118 | + gather-channel="" ]]><co id="sg6" linkends="sg6-txt" /><![CDATA[ |
| 119 | + order="" ]]><co id="sg7" linkends="sg7-txt" /><![CDATA[ |
| 120 | + phase="" ]]><co id="sg8" linkends="sg8-txt" /><![CDATA[ |
| 121 | + send-timeout="" ]]><co id="sg9" linkends="sg9-txt" /><![CDATA[ |
| 122 | + gather-timeout="" >]]><co id="sg10" linkends="sg10-txt" /><![CDATA[ |
| 123 | + <scatterer/> ]]><co id="sg11" linkends="sg11-txt" /><![CDATA[ |
| 124 | + <gatherer/> ]]><co id="sg12" linkends="sg12-txt" /><![CDATA[ |
| 125 | +</scatter-gather>]]></programlisting> |
| 126 | + |
| 127 | + <calloutlist> |
| 128 | + <callout arearefs="sg1" id="sg1-txt"> |
| 129 | + <para> |
| 130 | + The id of the Endpoint. |
| 131 | + The <classname>ScatterGatherHandler</classname> bean is registered with <code>id + '.handler'</code> |
| 132 | + alias. The <classname>RecipientListRouter</classname> - with <code>id + '.scatterer'</code>. |
| 133 | + And the <classname>AggregatingMessageHandler</classname> with <code>id + '.gatherer'</code>. |
| 134 | + <emphasis>Optional</emphasis> (default is generated value by <interfacename>BeanFactory</interfacename>). |
| 135 | + </para> |
| 136 | + </callout> |
| 137 | + |
| 138 | + <callout arearefs="sg2" id="sg2-txt"> |
| 139 | + <para>Lifecycle attribute signaling if the Endpoint should be started during Application Context |
| 140 | + startup. In addition the <classname>ScatterGatherHandler</classname> also implements |
| 141 | + <interfacename>Lifecycle</interfacename> and starts/stops the <code>gatherEndpoint</code>, which |
| 142 | + is created internally if <code>gather-channel</code> is provided. |
| 143 | + <emphasis>Optional</emphasis> (default is <code>true</code>).</para> |
| 144 | + </callout> |
| 145 | + |
| 146 | + <callout arearefs="sg3" id="sg3-txt"> |
| 147 | + <para>The channel to receive request messages to handle them in the <classname>ScatterGatherHandler</classname>. |
| 148 | + <emphasis>Required</emphasis>.</para> |
| 149 | + </callout> |
| 150 | + |
| 151 | + <callout arearefs="sg4" id="sg4-txt"> |
| 152 | + <para>The channel to which the Scatter-Gather will send the aggregation |
| 153 | + results. <emphasis>Optional (because incoming messages can specify a |
| 154 | + reply channel themselves via <code>replyChannel</code> Message Header)</emphasis>.</para> |
| 155 | + </callout> |
| 156 | + |
| 157 | + <callout arearefs="sg5" id="sg5-txt"> |
| 158 | + <para>The channel to send the scatter message for the <emphasis>Auction</emphasis> scenario. |
| 159 | + <emphasis>Optional</emphasis>. Mutually exclusive with <code><scatterer></code> sub |
| 160 | + -element.</para> |
| 161 | + </callout> |
| 162 | + |
| 163 | + <callout arearefs="sg6" id="sg6-txt"> |
| 164 | + <para> |
| 165 | + The channel to receive replies from each supplier for the aggregation. is used as the |
| 166 | + <code>replyChannel</code> header in the scatter message. |
| 167 | + <emphasis>Optional</emphasis>. By default the <classname>FixedSubscriberChannel</classname> is |
| 168 | + created. |
| 169 | + </para> |
| 170 | + </callout> |
| 171 | + |
| 172 | + <callout arearefs="sg7" id="sg7-txt"> |
| 173 | + <para>Order of this component when more than one handle is subscribed to the same DirectChannel |
| 174 | + (use for load balancing purposes). |
| 175 | + <emphasis>Optional</emphasis>.</para> |
| 176 | + </callout> |
| 177 | + |
| 178 | + <callout arearefs="sg8" id="sg8-txt"> |
| 179 | + <para>Specify the phase in which the endpoint |
| 180 | + should be started and stopped. The startup order proceeds |
| 181 | + from lowest to highest, and the shutdown order is the |
| 182 | + reverse of that. By default this value is Integer.MAX_VALUE |
| 183 | + meaning that this container starts as late as possible and |
| 184 | + stops as soon as possible. |
| 185 | + <emphasis>Optional</emphasis>.</para> |
| 186 | + </callout> |
| 187 | + |
| 188 | + <callout arearefs="sg9" id="sg9-txt"> |
| 189 | + <para>The timeout interval to wait when sending a reply |
| 190 | + <interfacename>Message</interfacename> to the <code>output-channel</code>. |
| 191 | + By default the send will block for one second. |
| 192 | + It is applied only if the output channel has some 'sending' limitations, e.g. <classname>QueueChannel</classname> |
| 193 | + with a fixed 'capacity'. In this case a <classname>MessageDeliveryException</classname> is thrown. |
| 194 | + The <code>send-timeout</code> is ignored in case of <classname>AbstractSubscribableChannel</classname> implementations. |
| 195 | + In case of <code>group-timeout(-expression)</code> the <classname>MessageDeliveryException</classname> |
| 196 | + from the scheduled expire task leads this task to be rescheduled. |
| 197 | + <emphasis>Optional</emphasis>.</para> |
| 198 | + </callout> |
| 199 | + |
| 200 | + <callout arearefs="sg10" id="sg10-txt"> |
| 201 | + <para>Allows you to specify how long the Scatter-Gather will wait for the reply message |
| 202 | + before returning. By default it will wait indefinitely. 'null' is returned |
| 203 | + if the gateway times out. |
| 204 | + <emphasis>Optional</emphasis>. Defaults to <code>-1</code> - indefinitely.</para> |
| 205 | + </callout> |
| 206 | + |
| 207 | + <callout arearefs="sg11" id="sg11-txt"> |
| 208 | + <para>The <code><recipient-list-router></code> options. |
| 209 | + <emphasis>Optional</emphasis>. Mutually exclusive with <code>scatter-channel</code> |
| 210 | + attribute.</para> |
| 211 | + </callout> |
| 212 | + |
| 213 | + <callout arearefs="sg12" id="sg12-txt"> |
| 214 | + <para>The <code><aggregator></code> options. |
| 215 | + <emphasis>Required</emphasis>. </para> |
| 216 | + </callout> |
| 217 | + |
| 218 | + </calloutlist> |
24 | 219 | </section> |
25 | 220 |
|
26 | 221 | </section> |
0 commit comments