Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*/
public class AggregatorSpec extends CorrelationHandlerSpec<AggregatorSpec, AggregatingMessageHandler> {

private Function<MessageGroup, Map<String, Object>> headersFunction;
private @Nullable Function<MessageGroup, Map<String, Object>> headersFunction;

protected AggregatorSpec() {
super(new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor()));
Expand Down Expand Up @@ -123,7 +123,7 @@ public AggregatorSpec headersFunction(Function<MessageGroup, Map<String, Object>
}

@Override
public Map<Object, String> getComponentsToRegister() {
public Map<Object, @Nullable String> getComponentsToRegister() {
if (this.headersFunction != null) {
MessageGroupProcessor outputProcessor = this.handler.getOutputProcessor();
if (outputProcessor instanceof AbstractAggregatingMessageGroupProcessor abstractAggregatingMessageGroupProcessor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,13 @@ public class BarrierSpec extends ConsumerEndpointSpec<BarrierSpec, BarrierMessag
private CorrelationStrategy correlationStrategy =
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);

@Nullable
private MessageChannel discardChannel;
private @Nullable MessageChannel discardChannel;

@Nullable
private String discardChannelName;
private @Nullable String discardChannelName;

@Nullable
private Long triggerTimeout;
private @Nullable Long triggerTimeout;

protected BarrierSpec(long timeout) {
super(null);
this.timeout = timeout;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;

import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.AopInfrastructureBean;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
Expand Down Expand Up @@ -130,15 +129,15 @@ public abstract class BaseIntegrationFlowDefinition<B extends BaseIntegrationFlo

protected static final SpelExpressionParser PARSER = new SpelExpressionParser(); //NOSONAR - final

protected final Map<Object, String> integrationComponents = new LinkedHashMap<>(); //NOSONAR - final
protected final Map<Object, @Nullable String> integrationComponents = new LinkedHashMap<>(); //NOSONAR - final

private MessageChannel currentMessageChannel;
private @Nullable MessageChannel currentMessageChannel;

private Object currentComponent;
private @Nullable Object currentComponent;

private boolean implicitChannel;

private StandardIntegrationFlow integrationFlow;
private @Nullable StandardIntegrationFlow integrationFlow;

protected BaseIntegrationFlowDefinition() {
}
Expand All @@ -152,14 +151,14 @@ protected B addComponent(Object component, @Nullable String beanName) {
return _this();
}

protected B addComponents(Map<Object, String> components) {
protected B addComponents(Map<Object, @Nullable String> components) {
if (!CollectionUtils.isEmpty(components)) {
this.integrationComponents.putAll(components);
}
return _this();
}

protected Map<Object, String> getIntegrationComponents() {
protected Map<Object, @Nullable String> getIntegrationComponents() {
return this.integrationComponents;
}

Expand All @@ -168,8 +167,7 @@ protected B currentComponent(@Nullable Object component) {
return _this();
}

@Nullable
protected Object getCurrentComponent() {
protected @Nullable Object getCurrentComponent() {
return this.currentComponent;
}

Expand All @@ -178,8 +176,7 @@ protected B currentMessageChannel(@Nullable MessageChannel currentMessageChannel
return _this();
}

@Nullable
protected MessageChannel getCurrentMessageChannel() {
protected @Nullable MessageChannel getCurrentMessageChannel() {
return this.currentMessageChannel;
}

Expand Down Expand Up @@ -1591,7 +1588,7 @@ public B resequence(@Nullable Consumer<ResequencerSpec> resequencer) {
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
public B aggregate() {
return aggregate((Consumer<AggregatorSpec>) null);
return aggregate(null);
}

/**
Expand Down Expand Up @@ -1829,8 +1826,8 @@ protected <R extends AbstractMessageRouter, S extends AbstractRouterSpec<? super
BridgeHandler bridgeHandler = new BridgeHandler();
boolean registerSubflowBridge = false;

Map<Object, String> componentsToRegister = null;
Map<Object, String> routerComponents = routerSpec.getComponentsToRegister();
Map<Object, @Nullable String> componentsToRegister = null;
Map<Object, @Nullable String> routerComponents = routerSpec.getComponentsToRegister();
if (!CollectionUtils.isEmpty(routerComponents)) {
componentsToRegister = new LinkedHashMap<>(routerComponents);
routerComponents.clear();
Expand Down Expand Up @@ -2571,7 +2568,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {
protected <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) {
MessageChannel channelForPublisher = getCurrentMessageChannel();
Publisher<Message<T>> publisher;
Map<Object, String> components = getIntegrationComponents();
Map<Object, @Nullable String> components = getIntegrationComponents();
if (channelForPublisher instanceof Publisher) {
publisher = (Publisher<Message<T>>) channelForPublisher;
}
Expand Down Expand Up @@ -2700,7 +2697,7 @@ protected StandardIntegrationFlow get() {
"EIP-method in the 'IntegrationFlow' definition.");
}

Map<Object, String> components = getIntegrationComponents();
Map<Object, @Nullable String> components = getIntegrationComponents();
if (components.size() == 1) {
Object currComponent = getCurrentComponent();
if (currComponent != null) {
Expand Down Expand Up @@ -2736,17 +2733,8 @@ protected void checkReuse(MessageProducer replyHandler) {
REFERENCED_REPLY_PRODUCERS.add(replyHandler);
}

@Nullable
protected static Object extractProxyTarget(@Nullable Object target) {
if (!(target instanceof Advised advised)) {
return target;
}
try {
return extractProxyTarget(advised.getTargetSource().getTarget());
}
catch (Exception e) {
throw new BeanCreationException("Could not extract target", e);
}
protected static @Nullable Object extractProxyTarget(@Nullable Object target) {
return IntegrationFlow.extractProxyTarget(target);
}

public static final class ReplyProducerCleaner implements DestructionAwareBeanPostProcessor, AopInfrastructureBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.LinkedHashMap;
import java.util.Map;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
Expand All @@ -36,7 +38,7 @@ public class BroadcastPublishSubscribeSpec
extends IntegrationComponentSpec<BroadcastPublishSubscribeSpec, BroadcastCapableChannel>
implements ComponentsRegistration {

private final Map<Object, String> subscriberFlows = new LinkedHashMap<>();
private final Map<Object, @Nullable String> subscriberFlows = new LinkedHashMap<>();

private int order;

Expand Down Expand Up @@ -74,7 +76,7 @@ public BroadcastPublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
}

@Override
public Map<Object, String> getComponentsToRegister() {
public Map<Object, @Nullable String> getComponentsToRegister() {
return this.subscriberFlows;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ public abstract class ConsumerEndpointSpec<S extends ConsumerEndpointSpec<S, H>,
@Nullable
private Boolean async;

@Nullable
private String[] notPropagatedHeaders;
private String @Nullable [] notPropagatedHeaders;

protected ConsumerEndpointSpec() {
super(new ConsumerEndpointFactoryBean());
}

protected ConsumerEndpointSpec(@Nullable H messageHandler) {
protected ConsumerEndpointSpec(H messageHandler) {
super(messageHandler, new ConsumerEndpointFactoryBean());
}

Expand Down Expand Up @@ -373,6 +376,7 @@ protected Tuple2<ConsumerEndpointFactoryBean, H> doGet() {
.acceptIfNotEmpty(this.adviceChain, producingMessageHandler::setAdviceChain);
}

Assert.state(this.handler != null, "'this.handler' must not be null.");
this.endpointFactoryBean.setHandler(this.handler);
return super.doGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ public abstract class EndpointSpec<S extends EndpointSpec<S, F, H>, F extends Be
extends IntegrationComponentSpec<S, Tuple2<F, H>>
implements ComponentsRegistration {

protected final Map<Object, String> componentsToRegister = new LinkedHashMap<>(); // NOSONAR final
protected final Map<Object, @Nullable String> componentsToRegister = new LinkedHashMap<>(); // NOSONAR final

protected final F endpointFactoryBean; // NOSONAR final

protected H handler; // NOSONAR
@SuppressWarnings("NullAway.Init")
protected H handler;

protected EndpointSpec(@Nullable H handler, F endpointFactoryBean) {
protected EndpointSpec(F endpointFactoryBean) {
this.endpointFactoryBean = endpointFactoryBean;
}

protected EndpointSpec(H handler, F endpointFactoryBean) {
this.endpointFactoryBean = endpointFactoryBean;
this.handler = handler;
}
Expand Down Expand Up @@ -85,7 +90,7 @@ public S poller(Function<PollerFactory, PollerSpec> pollers) {
* @see PollerSpec
*/
public S poller(PollerSpec pollerMetadataSpec) {
Map<Object, String> components = pollerMetadataSpec.getComponentsToRegister();
Map<Object, @Nullable String> components = pollerMetadataSpec.getComponentsToRegister();
this.componentsToRegister.putAll(components);
return poller(pollerMetadataSpec.getObject());
}
Expand Down Expand Up @@ -122,17 +127,14 @@ public S poller(PollerSpec pollerMetadataSpec) {
public abstract S role(String role);

@Override
public Map<Object, String> getComponentsToRegister() {
public Map<Object, @Nullable String> getComponentsToRegister() {
return this.componentsToRegister;
}

@Override
protected Tuple2<F, H> doGet() {
return Tuples.of(this.endpointFactoryBean, this.handler);
}

protected void assertHandler() {
Assert.state(this.handler != null, "'this.handler' must not be null.");
return Tuples.of(this.endpointFactoryBean, this.handler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected FilterEndpointSpec(MessageFilter messageFilter) {
/**
* The default value is <code>false</code> meaning that rejected
* Messages will be quietly dropped or sent to the discard channel if
* available. Typically this value would not be <code>true</code> when
* available. Typically, this value would not be <code>true</code> when
* a discard channel is provided, but if so, it will still apply
* (in such a case, the Message will be sent to the discard channel,
* and <em>then</em> the exception will be thrown).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class HeaderEnricherSpec extends ConsumerEndpointSpec<HeaderEnricherSpec,
protected final HeaderEnricher headerEnricher = new HeaderEnricher(this.headerToAdd); // NOSONAR - final

protected HeaderEnricherSpec() {
super(null);
this.handler = new MessageTransformingHandler(this.headerEnricher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.factory.BeanInitializationException;
Expand Down Expand Up @@ -49,9 +48,10 @@ public abstract class IntegrationComponentSpec<S extends IntegrationComponentSpe

protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR - final

@SuppressWarnings("NullAway.Init")
protected volatile T target; // NOSONAR

private String id;
private @Nullable String id;

/**
* Configure the component identifier. Used as the {@code beanName} to register the
Expand All @@ -78,13 +78,14 @@ public Class<?> getObjectType() {
* !!! This method must not be called from the target configuration !!!
* @return the object backed by this factory bean.
*/
@NonNull
@Override
public T getObject() {
if (this.target == null) {
this.target = doGet();
T targetToReturn = this.target;
if (targetToReturn == null) {
targetToReturn = doGet();
this.target = targetToReturn;
}
return this.target;
return targetToReturn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
* .aggregate()
* .channel(MessageChannels.queue("routerTwoSubFlowsOutput"));
* }
*
* </pre>
* <p>
* Also, this interface can be implemented directly to encapsulate the integration logic
Expand Down Expand Up @@ -115,8 +114,7 @@ public interface IntegrationFlow {
* @return the channel.
* @since 5.0.4
*/
@Nullable
default MessageChannel getInputChannel() {
default @Nullable MessageChannel getInputChannel() {
return null;
}

Expand All @@ -125,8 +123,8 @@ default MessageChannel getInputChannel() {
* @return the map of integration components managed by this flow.
* @since 5.5.4
*/
default Map<Object, String> getIntegrationComponents() {
return Collections.emptyMap();
default Map<Object, @Nullable String> getIntegrationComponents() {
return Collections.<Object, @Nullable String>emptyMap();
}

/**
Expand Down Expand Up @@ -281,7 +279,7 @@ static IntegrationFlowBuilder from(MessageSource<?> messageSource) {
/**
* Populate the provided {@link MessageSource} object to the {@link IntegrationFlowBuilder} chain.
* The {@link IntegrationFlow} {@code startMessageSource}.
* In addition use {@link SourcePollingChannelAdapterSpec} to provide options for the underlying
* In addition, use {@link SourcePollingChannelAdapterSpec} to provide options for the underlying
* {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter} endpoint.
* @param messageSource the {@link MessageSource} to populate.
* @param endpointConfigurer the {@link Consumer} to provide more options for the
Expand Down Expand Up @@ -441,7 +439,7 @@ static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {
*/
@SuppressWarnings("overloads")
static IntegrationFlowBuilder from(IntegrationFlow other) {
Map<Object, String> integrationComponents = other.getIntegrationComponents();
Map<Object, @Nullable String> integrationComponents = other.getIntegrationComponents();
Assert.notEmpty(integrationComponents, () ->
"The provided integration flow to compose from '" + other +
"' must be declared as a bean in the application context");
Expand Down Expand Up @@ -499,9 +497,8 @@ private static IntegrationFlowBuilder registerComponents(Object spec) {
return null;
}

@Nullable
@SuppressWarnings("unchecked")
private static <T> T extractProxyTarget(@Nullable T target) {
static <T> @Nullable T extractProxyTarget(@Nullable T target) {
if (!(target instanceof Advised advised)) {
return target;
}
Expand Down
Loading