Skip to content

Introduce NullMarked to some packages in spring-integration-core module #10097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Provides classes related to message acknowledgment.
*/
@org.springframework.lang.NonNullApi
@org.jspecify.annotations.NullMarked
package org.springframework.integration.acks;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,6 +56,7 @@ public abstract class AbstractAggregatingMessageGroupProcessor implements Messag

private boolean messageBuilderFactorySet;

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP

private boolean releaseStrategySet;

@SuppressWarnings("NullAway.Init")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct on this property.
It is initialized in the getDiscardChannel().
So, it has to be @Nullable and that getter has to be used instead in other places.

private MessageChannel discardChannel;

@Nullable
private String discardChannelName;

private boolean sendPartialResultOnExpiry;
Expand All @@ -143,18 +145,23 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP

private boolean releasePartialSequences;

@Nullable
private Expression groupTimeoutExpression;

@Nullable
private List<Advice> forceReleaseAdviceChain;

private long expireTimeout;

@Nullable
private Duration expireDuration;

private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();

@SuppressWarnings("NullAway.Init")
private EvaluationContext evaluationContext;

@Nullable
private ApplicationEventPublisher applicationEventPublisher;

private boolean expireGroupsUponTimeout = true;
Expand All @@ -165,10 +172,11 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP

private volatile boolean running;

@Nullable
private BiFunction<Message<?>, String, String> groupConditionSupplier;

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
@Nullable CorrelationStrategy correlationStrategy, @Nullable ReleaseStrategy releaseStrategy) {

Assert.notNull(processor, "'processor' must not be null");
Assert.notNull(store, "'store' must not be null");
Expand Down Expand Up @@ -504,6 +512,7 @@ public MessageChannel getDiscardChannel() {
return this.discardChannel;
}

@Nullable
protected String getDiscardChannelName() {
return this.discardChannelName;
}
Expand Down Expand Up @@ -532,6 +541,7 @@ protected boolean isReleasePartialSequences() {
return this.releasePartialSequences;
}

@Nullable
protected Expression getGroupTimeoutExpression() {
return this.groupTimeoutExpression;
}
Expand Down Expand Up @@ -753,7 +763,7 @@ private void discardMessage(Message<?> message) {
* @param group The group.
* @param completedMessages The completed messages.
*/
protected abstract void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages);
protected abstract void afterRelease(MessageGroup group, @Nullable Collection<Message<?>> completedMessages);

/**
* Subclasses may override if special action is needed because the group was released or discarded
Expand Down Expand Up @@ -912,14 +922,12 @@ protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock)
}

protected void completeGroup(Object correlationKey, MessageGroup group, Lock lock) {
Message<?> first = null;
if (group != null) {
first = group.getOne();
}
Message<?> first = group.getOne();
completeGroup(first, correlationKey, group, lock);
}

@SuppressWarnings("unchecked")
@Nullable
protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group,
Lock lock) {

Expand All @@ -929,6 +937,7 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl
this.logger.debug(() -> "Completing group with correlationKey [" + correlationKey + "]");

result = this.outputProcessor.processMessageGroup(group);
Assert.state(result != null, "The processorMessageGroup returned a null result. Null result is not expected.");
if (isResultCollectionOfMessages(result)) {
partialSequence = (Collection<Message<?>>) result;
}
Expand Down Expand Up @@ -988,6 +997,7 @@ private static boolean isResultCollectionOfMessages(Object result) {
return false;
}

@Nullable
protected Object obtainGroupTimeout(MessageGroup group) {
if (this.groupTimeoutExpression != null) {
Object timeout = this.groupTimeoutExpression.getValue(this.evaluationContext, group);
Expand Down Expand Up @@ -1062,6 +1072,7 @@ public void purgeOrphanedGroups() {

protected static class SequenceAwareMessageGroup extends SimpleMessageGroup {

@Nullable
private final SimpleMessageGroup sourceGroup;

public SequenceAwareMessageGroup(MessageGroup messageGroup) {
Expand Down Expand Up @@ -1124,6 +1135,7 @@ private class ForceReleaseMessageGroupProcessor implements MessageGroupProcessor
}

@Override
@Nullable
public Object processMessageGroup(MessageGroup group) {
forceComplete(group);
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@

import java.util.Collection;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
Expand Down Expand Up @@ -95,7 +97,7 @@ protected boolean shouldSplitOutput(Iterable<?> reply) {
* @param completedMessages The completed messages. Ignored in this implementation.
*/
@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
protected void afterRelease(MessageGroup messageGroup, @Nullable Collection<Message<?>> completedMessages) {
Object groupId = messageGroup.getGroupId();
MessageGroupStore messageStore = getMessageStore();
messageStore.completeGroup(groupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler

private final MessageGroupProcessor messageGroupProcessor;

@Nullable
private String discardChannelName;

@Nullable
private MessageChannel discardChannel;

/**
Expand Down Expand Up @@ -159,7 +161,7 @@ public BarrierMessageHandler(long requestTimeout, long triggerTimeout, Correlati
* @since 5.4
*/
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor,
CorrelationStrategy correlationStrategy) {
@Nullable CorrelationStrategy correlationStrategy) {

Assert.notNull(outputProcessor, "'messageGroupProcessor' cannot be null");
this.messageGroupProcessor = outputProcessor;
Expand Down Expand Up @@ -218,6 +220,7 @@ public IntegrationPatternType getIntegrationPatternType() {
}

@Override
@Nullable
protected Object handleRequestMessage(Message<?> requestMessage) {
Object key = this.correlationStrategy.getCorrelationKey(requestMessage);
if (key == null) {
Expand Down Expand Up @@ -247,6 +250,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
return null;
}

@Nullable
private Object processRelease(Object key, Message<?> requestMessage, Message<?> releaseMessage) {
this.suspensions.remove(key);
if (releaseMessage.getPayload() instanceof Throwable) {
Expand All @@ -266,6 +270,7 @@ private Object processRelease(Object key, Message<?> requestMessage, Message<?>
* @param releaseMessage the release message.
* @return the result.
*/
@Nullable
protected Object buildResult(Object key, Message<?> requestMessage, Message<?> releaseMessage) {
SimpleMessageGroup group = new SimpleMessageGroup(key);
group.add(requestMessage);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,13 +20,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.jspecify.annotations.Nullable;

import org.springframework.core.log.LogMessage;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/**
* This Endpoint serves as a barrier for messages that should not be processed yet. The decision when a message can be
Expand Down Expand Up @@ -58,8 +61,10 @@ public class CorrelatingMessageBarrier extends AbstractMessageHandler implements

private final MessageGroupStore store;

@Nullable
private CorrelationStrategy correlationStrategy;

@Nullable
private ReleaseStrategy releaseStrategy;

public CorrelatingMessageBarrier() {
Expand Down Expand Up @@ -88,7 +93,9 @@ public void setReleaseStrategy(ReleaseStrategy releaseStrategy) {

@Override
protected void handleMessageInternal(Message<?> message) {
Assert.notNull(this.correlationStrategy, "'correlationStrategy' must not be null");
Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
Assert.notNull(correlationKey, "The correlation key is required");
Object lock = getLock(correlationKey);
synchronized (lock) {
this.store.addMessagesToGroup(correlationKey, message);
Expand All @@ -103,12 +110,14 @@ private Object getLock(Object correlationKey) {

@SuppressWarnings("unchecked")
@Override
@Nullable
public Message<Object> receive() {
for (Object key : this.correlationLocks.keySet()) {
Object lock = getLock(key);
synchronized (lock) {
MessageGroup group = this.store.getMessageGroup(key);
//group might be removed by another thread
Assert.notNull(this.releaseStrategy, "'releaseStrategy' must not be null");
if (group != null && this.releaseStrategy.canRelease(group)) {
Message<?> nextMessage = null;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.integration.aggregator;

import org.jspecify.annotations.Nullable;

import org.springframework.messaging.Message;

/**
Expand All @@ -35,6 +37,7 @@ public interface CorrelationStrategy {
* @param message The message.
* @return The correlation key.
*/
@Nullable
Object getCorrelationKey(Message<?> message);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,7 @@ public class DelegatingMessageGroupProcessor implements MessageGroupProcessor, B

private volatile boolean messageBuilderFactorySet;

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;

public DelegatingMessageGroupProcessor(MessageGroupProcessor delegate,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.convert.ConversionService;
import org.springframework.integration.store.MessageGroup;
import org.springframework.util.Assert;

/**
* A {@link MessageGroupProcessor} implementation that evaluates a SpEL expression. The SpEL context root is the list of
Expand Down Expand Up @@ -59,7 +60,9 @@ public void setExpectedType(Class<?> expectedType) {
*/
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
return this.processor.process(group.getMessages());
Object object = this.processor.process(group.getMessages());
Assert.state(object != null, "The process returned a null result. Null result is not expected.");
return object;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@

import java.util.Collection;

import org.jspecify.annotations.Nullable;

import org.springframework.expression.Expression;
import org.springframework.expression.ParseException;
import org.springframework.integration.util.AbstractExpressionEvaluator;
Expand All @@ -37,6 +39,7 @@ public class ExpressionEvaluatingMessageListProcessor extends AbstractExpression

private final Expression expression;

@Nullable
private volatile Class<?> expectedType = null;

/**
Expand Down Expand Up @@ -102,7 +105,9 @@ public void setExpectedType(Class<?> expectedType) {
*/
@Override
public Object process(Collection<? extends Message<?>> messages) {
return this.evaluateExpression(this.expression, messages, this.expectedType);
Object object = this.evaluateExpression(this.expression, messages, this.expectedType);
Assert.state(object != null, "The evaluation of the expression returned a null. Null result is not expected." + this.expression);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you mean: Null result is not expected: " + this.expression ?

return object;
}

}
Loading