-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Spring Integration 6.0 to 6.1 Migration Guide
- Removal of MQTT
ConsumerStopAction
- Do Not Block By Default
- Filter WARN About Discarded Message
- No
IntegrationComponentSpec.get()
any more - Endpoint is async by default for output channel as
FluxMessageChannel
Previously deprecated org.springframework.integration.mqtt.core.ConsumerStopAction
and its functionality has been removed in favor of existing org.eclipse.paho.client.mqttv3.ConnectionOptions.cleanSession
option.
According to distributed systems design and bad experience of demo developing it is not OK to block forever.
Most of the timeouts in the framework are now 30
seconds by default.
They were previously -1
or Long.MIN_VALUE
which means wait for operation completion indefinitely.
Only one remained as 1
seconds is a PollingConsumer
where it is better to not block even for those 30
seconds when no messages in the queue, but let the polling task be rescheduled.
This fixed the problem with a single thread in a pool for auto-configured TaskScheduler
.
Now with a 1
second of wait time we are able to switch to other scheduled tasks even with only 1
thread in the pool.
The MessageFilter
now emits a WARN logging message when a Message
is dropped: no discardChannel
and throwExceptionOnRejection
is false
.
Essentially, a default behavior of the filter is to not be silent, but rather WARN in the logs.
If the previous behavior is acceptable, it is recommended to set a nullChannel
as a discardChannel
for filter endpoint.
The IntegrationComponentSpec
is a FactoryBean
by itself and therefore it follows its lifecycle requirements when it is declared as a bean.
In addition, the IntegrationComponentSpec
might not only be a wrapper around one target object: it may manage and expose other supporting components for target Spring Integration endpoints.
For this purpose a ComponentsRegistration
contract can be implemented on the IntegrationComponentSpec
.
Spring Integration Java DSL and its supporting BeanPostProcessor
handles an IntegrationComponentSpec
and its ComponentsRegistration
aspect, but for this task to be performed properly the IntegrationComponentSpec
must be provided for the framework as is.
Therefore we have deprecated a get()
method with removal in the next version.
For example, a bean definition like this:
@Bean(PollerMetadata.DEFAULT_POLLER)
PollerMetadata defaultPoller() {
return Pollers.fixedDelay(100).get();
}
Must migrate now to this:
@Bean(PollerMetadata.DEFAULT_POLLER)
PollerSpec defaultPoller() {
return Pollers.fixedDelay(100);
}
If the output channel of the AbstractMessageProducingHandler
is configured to a ReactiveStreamsSubscribableChannel
, the async
mode is turned on by default.
If the handler result is not a reactive type or CompletableFuture<?>
, then regular reply producing process happens despite the output channel type.
This code:
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.channel(MessageChannels.flux())
now simply can be migrated to:
.handle((p, h) -> p)
.channel(MessageChannels.flux())