Skip to content

Apply Nullability to spring-integration-kafka #10138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
Expand All @@ -42,7 +44,7 @@ public abstract class AbstractKafkaChannel extends AbstractMessageChannel {

protected final String topic; // NOSONAR final

private String groupId;
private @Nullable String groupId;

/**
* Construct an instance with the provided {@link KafkaOperations} and topic.
Expand All @@ -64,7 +66,7 @@ public void setGroupId(String groupId) {
this.groupId = groupId;
}

protected String getGroupId() {
protected @Nullable String getGroupId() {
return this.groupId;
}

Expand All @@ -82,7 +84,8 @@ protected boolean doSend(Message<?> message, long timeout) {
return false;
}
catch (ExecutionException e) {
this.logger.error(e.getCause(), () -> "Interrupted while waiting for send result for: " + message);
Throwable cause = e.getCause() != null ? e.getCause() : e;
this.logger.error(cause, () -> "Interrupted while waiting for send result for: " + message);
return false;
}
catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,12 +20,13 @@
import java.util.Deque;
import java.util.List;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.support.management.metrics.CounterFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
Expand All @@ -47,7 +48,7 @@ public class PollableKafkaChannel extends AbstractKafkaChannel

private final KafkaMessageSource<?, ?> source;

private CounterFacade receiveCounter;
private @Nullable CounterFacade receiveCounter;

private volatile int executorInterceptorsSize;

Expand Down Expand Up @@ -197,8 +198,8 @@ public boolean hasExecutorInterceptors() {

private static String topic(KafkaMessageSource<?, ?> source) {
Assert.notNull(source, "'source' cannot be null");
String[] topics = source.getConsumerProperties().getTopics();
Assert.isTrue(topics != null && topics.length == 1, "Only one topic is allowed");
@Nullable String @Nullable [] topics = source.getConsumerProperties().getTopics();
Assert.isTrue(topics != null && topics.length == 1 && topics[0] != null, "Only one topic is allowed");
return topics[0];
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;

import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
Expand Down Expand Up @@ -56,8 +57,10 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su

private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();

@SuppressWarnings("NullAway.Init")
private MessageDispatcher dispatcher;

@SuppressWarnings("NullAway.Init")
private MessageListenerContainer container;

private boolean autoStartup = true;
Expand Down Expand Up @@ -183,8 +186,8 @@ private class IntegrationRecordMessageListener extends RecordMessagingMessageLis
}

@Override
public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
public void onMessage(ConsumerRecord<Object, Object> record, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {

SubscribableKafkaChannel.this.dispatcher.dispatch(toMessagingMessage(record, acknowledgment, consumer));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/**
* Provides classes related to message channel implementations for Apache Kafka.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.integration.kafka.channel;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.integration.kafka.config.xml;

import org.jspecify.annotations.Nullable;
import org.w3c.dom.Element;

import org.springframework.beans.factory.support.BeanDefinitionBuilder;
Expand All @@ -38,7 +39,7 @@
public class KafkaChannelParser extends AbstractChannelParser {

@Override
protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) {
protected @Nullable BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) {
BeanDefinitionBuilder builder;
String factory = element.getAttribute("container-factory");
boolean hasFactory = StringUtils.hasText(factory);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/**
* Provides parser classes to provide Xml namespace support for the Apache Kafka components.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.integration.kafka.config.xml;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,9 +16,10 @@

package org.springframework.integration.kafka.dsl;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.dsl.MessageChannelSpec;
import org.springframework.integration.kafka.channel.AbstractKafkaChannel;
import org.springframework.lang.Nullable;

/**
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.jspecify.annotations.Nullable;

import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.integration.dsl.IntegrationComponentSpec;
Expand All @@ -28,7 +29,6 @@
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;

/**
* A helper class in the Builder pattern style to delegate options to the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,13 @@

package org.springframework.integration.kafka.dsl;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.dsl.IntegrationComponentSpec;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;

/**
* An {@link IntegrationComponentSpec} implementation for the {@link KafkaTemplate}.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/**
* Provides Spring Integration Java DSL Components support for Apache Kafka.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
@org.jspecify.annotations.NullMarked
package org.springframework.integration.kafka.dsl;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;

import org.springframework.core.AttributeAccessor;
import org.springframework.kafka.KafkaException;
Expand Down Expand Up @@ -63,8 +64,8 @@ public interface KafkaInboundEndpoint {
* @param consumer the consumer.
* @param runnable the runnable.
*/
default void doWithRetry(RetryTemplate template, RecoveryCallback<?> callback, ConsumerRecord<?, ?> record,
Acknowledgment acknowledgment, Consumer<?, ?> consumer, Runnable runnable) {
default void doWithRetry(RetryTemplate template, @Nullable RecoveryCallback<?> callback, ConsumerRecord<?, ?> record,
@Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer, Runnable runnable) {

try {
template.execute(context -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.jspecify.annotations.Nullable;

import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
Expand All @@ -50,7 +51,6 @@
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
Expand Down Expand Up @@ -82,11 +82,11 @@ public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport

private final KafkaTemplate<K, R> kafkaTemplate;

private RetryTemplate retryTemplate;
private @Nullable RetryTemplate retryTemplate;

private RecoveryCallback<?> recoveryCallback;
private @Nullable RecoveryCallback<?> recoveryCallback;

private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
private @Nullable BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

private boolean bindSourceRecord;

Expand Down Expand Up @@ -271,7 +271,7 @@ private void setAttributesIfNecessary(Object record, @Nullable Message<?> messag
}

@Override
protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
if (attributes == null) {
return super.getErrorMessageAttributes(message);
Expand All @@ -295,7 +295,8 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
}

@Override
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {
Message<?> message = null;
try {
message = toMessagingMessage(record, acknowledgment, consumer);
Expand All @@ -321,8 +322,8 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
}
}

private void sendAndReceive(ConsumerRecord<K, V> record, Message<?> message, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
private void sendAndReceive(ConsumerRecord<K, V> record, Message<?> message,
@Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {

RetryTemplate template = KafkaInboundGateway.this.retryTemplate;
if (template != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.jspecify.annotations.Nullable;

import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
Expand Down Expand Up @@ -59,7 +60,6 @@
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.RecoveryCallback;
Expand Down Expand Up @@ -91,17 +91,17 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo

private final ListenerMode mode;

private RecordFilterStrategy<K, V> recordFilterStrategy;
private @Nullable RecordFilterStrategy<K, V> recordFilterStrategy;

private boolean ackDiscarded;

private RetryTemplate retryTemplate;
private @Nullable RetryTemplate retryTemplate;

private RecoveryCallback<?> recoveryCallback;
private @Nullable RecoveryCallback<?> recoveryCallback;

private boolean filterInRetry;

private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
private @Nullable BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

private boolean bindSourceRecord;

Expand Down Expand Up @@ -378,7 +378,7 @@ private void setAttributesIfNecessary(Object record, @Nullable Message<?> messag
}

@Override
protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
if (attributes == null) {
return super.getErrorMessageAttributes(message);
Expand Down Expand Up @@ -436,7 +436,8 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
}

@Override
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {
Message<?> message;
try {
message = toMessagingMessage(record, acknowledgment, consumer);
Expand Down Expand Up @@ -524,8 +525,8 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {

Message<?> message = null;
if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry) {
Expand All @@ -537,8 +538,8 @@ public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowl
}

@Nullable
private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
private Message<?> toMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {

Message<?> message = null;
try {
Expand Down
Loading
Loading