Skip to content

Commit a167170

Browse files
mjd507artembilan
authored andcommitted
GH-10083: Apply Nullability to core handler package
Related to: #10083 `MessageProcessorMessageSource` now provides a fake message to the underlying messageProcessor since the contract requires. add `@Nullable` on `CheckedCallable.call()`, since in `LockRequestHandlerAdvice.doInvoke()`, there is a lambda `ExecutionCallback::execute` which will be running inside the `call()`, and its Nullable. revert `@Nullable` on Throwable parameter in `RecoveryCallback.recover()` revert `@Nullable` on `CheckedCallable.call()` as it is nullable from generic type definition, for `LockRequestHandlerAdvice`, add NullAway since `ExecutionCallback.execute` is Nullable as well. move fakeMessage to a static final constant in `MessageProcessorMessageSource` add NullAway on `RequestHandlerRetryAdvice.recoveryCallback.recover()`, need rework on retry component use `Map.computeIfAbsent` in `PayloadsArgumentResolver` and `PayloadExpressionArgumentResolver` add more detailed reason (critical path, dataflow analysis) on NullAway method. Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
1 parent b683f9b commit a167170

File tree

47 files changed

+357
-272
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+357
-272
lines changed

spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
7070
this.errorMessageStrategy = errorMessageStrategy;
7171
}
7272

73-
public final void setChannel(MessageChannel channel) {
73+
public final void setChannel(@Nullable MessageChannel channel) {
7474
this.channel = channel;
7575
}
7676

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProcessorMessageSource.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.jspecify.annotations.Nullable;
2020

2121
import org.springframework.integration.handler.MessageProcessor;
22+
import org.springframework.integration.support.MutableMessageBuilder;
23+
import org.springframework.messaging.Message;
2224

2325
/**
2426
* The {@link org.springframework.integration.core.MessageSource} strategy implementation
@@ -27,11 +29,15 @@
2729
*
2830
* @author Artem Bilan
2931
* @author Gary Russell
32+
* @author Jiandong Ma
3033
*
3134
* @since 5.0
3235
*/
3336
public class MessageProcessorMessageSource extends AbstractMessageSource<Object> {
3437

38+
// provide a fake message since the contract of processMessage requires a NonNull Message.
39+
static final Message<Object> FAKE_MESSAGE = MutableMessageBuilder.withPayload(new Object(), false).build();
40+
3541
private final MessageProcessor<?> messageProcessor;
3642

3743
public MessageProcessorMessageSource(MessageProcessor<?> messageProcessor) {
@@ -45,7 +51,7 @@ public String getComponentType() {
4551

4652
@Override
4753
protected @Nullable Object doReceive() {
48-
return this.messageProcessor.processMessage(null);
54+
return this.messageProcessor.processMessage(FAKE_MESSAGE);
4955
}
5056

5157
}

spring-integration-core/src/main/java/org/springframework/integration/filter/MessageFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public boolean isRunning() {
176176
}
177177

178178
@Override
179-
public Object postProcess(Message<?> message, Object result) {
179+
public @Nullable Object postProcess(Message<?> message, @Nullable Object result) {
180180
if (result == null) {
181181
MessageChannel channelToDiscard = getDiscardChannel();
182182
if (channelToDiscard != null) {

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void setObservationConvention(@Nullable MessageReceiverObservationConvent
5555
this.observationConvention = observationConvention;
5656
}
5757

58-
@Override // NOSONAR
58+
@Override
5959
public void handleMessage(Message<?> message) {
6060
Assert.notNull(message, "Message must not be null");
6161
if (isLoggingEnabled()) {

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
public abstract class AbstractMessageProducingHandler extends AbstractMessageHandler
7777
implements MessageProducer, HeaderPropagationAware {
7878

79-
protected final MessagingTemplate messagingTemplate = new MessagingTemplate(); // NOSONAR final
79+
protected final MessagingTemplate messagingTemplate = new MessagingTemplate();
8080

8181
private boolean async;
8282

@@ -88,7 +88,7 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan
8888
@Nullable
8989
private MessageChannel outputChannel;
9090

91-
private String[] notPropagatedHeaders;
91+
private String @Nullable [] notPropagatedHeaders;
9292

9393
private boolean selectiveHeaderPropagation;
9494

@@ -113,7 +113,7 @@ public void setOutputChannel(MessageChannel outputChannel) {
113113
@Override
114114
public void setOutputChannelName(String outputChannelName) {
115115
Assert.hasText(outputChannelName, "'outputChannelName' must not be empty");
116-
this.outputChannelName = outputChannelName; //NOSONAR (inconsistent sync)
116+
this.outputChannelName = outputChannelName;
117117
}
118118

119119
/**
@@ -213,12 +213,10 @@ public void addNotPropagatedHeaders(String... headers) {
213213
@Override
214214
protected void onInit() {
215215
super.onInit();
216-
Assert.state(!(this.outputChannelName != null && this.outputChannel != null), //NOSONAR (inconsistent sync)
216+
Assert.state(!(this.outputChannelName != null && this.outputChannel != null),
217217
"'outputChannelName' and 'outputChannel' are mutually exclusive.");
218218
BeanFactory beanFactory = getBeanFactory();
219-
if (beanFactory != null) {
220-
this.messagingTemplate.setBeanFactory(beanFactory);
221-
}
219+
this.messagingTemplate.setBeanFactory(beanFactory);
222220
this.messagingTemplate.setDestinationResolver(getChannelResolver());
223221
setAsyncIfCan();
224222
if (!this.sendTimeoutSet) {
@@ -244,9 +242,9 @@ public MessageChannel getOutputChannel() {
244242
return this.outputChannel;
245243
}
246244

247-
protected void sendOutputs(Object result, Message<?> requestMessage) {
248-
if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
249-
for (Object o : (Iterable<?>) result) {
245+
protected void sendOutputs(@Nullable Object result, Message<?> requestMessage) {
246+
if (result instanceof Iterable<?> iterableResult && shouldSplitOutput(iterableResult)) {
247+
for (Object o : iterableResult) {
250248
produceOutput(o, requestMessage);
251249
}
252250
}
@@ -294,13 +292,12 @@ protected void produceOutput(Object replyArg, final Message<?> requestMessage) {
294292
private Map<?, ?> obtainRoutingSlipHeader(MessageHeaders requestHeaders, Object reply) {
295293
Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
296294
if (routingSlipHeader == null) {
297-
if (reply instanceof Message) {
298-
routingSlipHeader = ((Message<?>) reply).getHeaders()
295+
if (reply instanceof Message<?> replyMessage) {
296+
routingSlipHeader = replyMessage.getHeaders()
299297
.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
300298
}
301-
else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
302-
routingSlipHeader = ((AbstractIntegrationMessageBuilder<?>) reply)
303-
.getHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
299+
else if (reply instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
300+
routingSlipHeader = messageBuilder.getHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
304301
}
305302
}
306303
return routingSlipHeader;
@@ -310,12 +307,11 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
310307
private Object obtainReplyChannel(MessageHeaders requestHeaders, Object reply) {
311308
Object replyChannel = requestHeaders.getReplyChannel();
312309
if (replyChannel == null) {
313-
if (reply instanceof Message) {
314-
replyChannel = ((Message<?>) reply).getHeaders().getReplyChannel();
310+
if (reply instanceof Message<?> replyMessage) {
311+
replyChannel = replyMessage.getHeaders().getReplyChannel();
315312
}
316-
else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
317-
replyChannel = ((AbstractIntegrationMessageBuilder<?>) reply)
318-
.getHeader(MessageHeaders.REPLY_CHANNEL, Object.class);
313+
else if (reply instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
314+
replyChannel = messageBuilder.getHeader(MessageHeaders.REPLY_CHANNEL, Object.class);
319315
}
320316
}
321317
return replyChannel;
@@ -438,15 +434,15 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
438434
return builder;
439435
}
440436

441-
private Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
437+
private @Nullable Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
442438
AtomicInteger routingSlipIndex) {
443439

444440
if (routingSlipIndex.get() >= routingSlip.size()) {
445441
return null;
446442
}
447443

448444
Object path = routingSlip.get(routingSlipIndex.get());
449-
Object routingSlipPathValue = null;
445+
Object routingSlipPathValue;
450446

451447
if (path instanceof String string) {
452448
routingSlipPathValue = getBeanFactory().getBean(string);
@@ -558,7 +554,7 @@ protected boolean shouldCopyRequestHeaders() {
558554
protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
559555
Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
560556
Throwable result = ex;
561-
if (!(ex instanceof MessagingException)) {
557+
if (!(result instanceof MessagingException)) {
562558
result = new MessageHandlingException(requestMessage, ex);
563559
}
564560
if (errorChannel == null) {
@@ -578,7 +574,7 @@ protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
578574
}
579575
}
580576

581-
protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
577+
protected @Nullable Object resolveErrorChannel(final MessageHeaders requestHeaders) {
582578
Object errorChannel = requestHeaders.getErrorChannel();
583579
if (errorChannel == null) {
584580
try {
@@ -594,12 +590,10 @@ protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
594590
protected void setupMessageProcessor(MessageProcessor<?> processor) {
595591
if (processor instanceof AbstractMessageProcessor<?> abstractMessageProcessor) {
596592
ConversionService conversionService = getConversionService();
597-
if (conversionService != null) {
598-
abstractMessageProcessor.setConversionService(conversionService);
599-
}
593+
abstractMessageProcessor.setConversionService(conversionService);
600594
}
601595
BeanFactory beanFactory = getBeanFactory();
602-
if (processor instanceof BeanFactoryAware beanFactoryAware && beanFactory != null) {
596+
if (processor instanceof BeanFactoryAware beanFactoryAware) {
603597
beanFactoryAware.setBeanFactory(beanFactory);
604598
}
605599
if (processor instanceof InitializingBean initializingBean) {
@@ -628,7 +622,7 @@ private final class ReplyFutureCallback implements BiConsumer<Object, Throwable>
628622
}
629623

630624
@Override
631-
public void accept(Object result, Throwable exception) {
625+
public void accept(@Nullable Object result, @Nullable Throwable exception) {
632626
if (result != null) {
633627
Message<?> replyMessage = null;
634628
try {
@@ -637,7 +631,7 @@ public void accept(Object result, Throwable exception) {
637631
}
638632
catch (Exception ex) {
639633
Exception exceptionToLogAndSend = ex;
640-
if (!(ex instanceof MessagingException)) { // NOSONAR
634+
if (!(ex instanceof MessagingException)) {
641635
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage, ex);
642636
if (replyMessage != null) {
643637
exceptionToLogAndSend = new MessagingException(replyMessage, exceptionToLogAndSend);
@@ -648,7 +642,8 @@ public void accept(Object result, Throwable exception) {
648642
}
649643
}
650644
else if (exception != null) {
651-
onFailure(exception instanceof CompletionException ? exception.getCause() : exception);
645+
onFailure(exception instanceof CompletionException && exception.getCause() != null
646+
? exception.getCause() : exception);
652647
}
653648
}
654649

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.springframework.integration.handler.advice.HandleMessageAdvice;
3232
import org.springframework.messaging.Message;
3333
import org.springframework.util.Assert;
34-
import org.springframework.util.ClassUtils;
3534

3635
/**
3736
* Base class for MessageHandlers that are capable of producing replies.
@@ -53,11 +52,12 @@ public abstract class AbstractReplyProducingMessageHandler extends AbstractMessa
5352

5453
private final List<Advice> adviceChain = new LinkedList<>();
5554

56-
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
55+
@SuppressWarnings("NullAway.Init")
56+
private ClassLoader beanClassLoader;
5757

5858
private boolean requiresReply = false;
5959

60-
private volatile RequestHandler advisedRequestHandler;
60+
private volatile @Nullable RequestHandler advisedRequestHandler;
6161

6262
/**
6363
* Flag whether a reply is required. If true an incoming message MUST result in a reply message being sent.
@@ -160,8 +160,8 @@ else if (!isAsync() && isLoggingEnabled()) {
160160
}
161161
}
162162

163-
@Nullable
164-
protected Object doInvokeAdvisedRequestHandler(Message<?> message) {
163+
@SuppressWarnings("NullAway") // dataflow analysis limitation
164+
protected @Nullable Object doInvokeAdvisedRequestHandler(Message<?> message) {
165165
return this.advisedRequestHandler.handleRequestMessage(message);
166166
}
167167

spring-integration-core/src/main/java/org/springframework/integration/handler/BeanNameMessageProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@ public class BeanNameMessageProcessor<T> implements MessageProcessor<T>, BeanFac
3737

3838
private final String beanName;
3939

40-
private final String methodName;
40+
private final @Nullable String methodName;
4141

42+
@SuppressWarnings("NullAway.Init")
4243
private MessageProcessor<T> delegate;
4344

45+
@SuppressWarnings("NullAway.Init")
4446
private BeanFactory beanFactory;
4547

46-
public BeanNameMessageProcessor(String object, String methodName) {
48+
public BeanNameMessageProcessor(String object, @Nullable String methodName) {
4749
this.beanName = object;
4850
this.methodName = methodName;
4951
}

spring-integration-core/src/main/java/org/springframework/integration/handler/ControlBusMessageProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.List;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.expression.Expression;
2224
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2325
import org.springframework.integration.IntegrationPattern;
@@ -41,6 +43,7 @@
4143
public class ControlBusMessageProcessor extends AbstractMessageProcessor<Object>
4244
implements IntegrationPattern {
4345

46+
@SuppressWarnings("NullAway.Init")
4447
private ControlBusCommandRegistry controlBusCommandRegistry;
4548

4649
public ControlBusMessageProcessor() {
@@ -69,7 +72,7 @@ protected void onInit() {
6972
}
7073

7174
@Override
72-
public Object processMessage(Message<?> message) {
75+
public @Nullable Object processMessage(Message<?> message) {
7376
String command = message.getPayload().toString();
7477
@SuppressWarnings("unchecked")
7578
List<Object> arguments =

0 commit comments

Comments
 (0)