Skip to content

No Issue : Internal Refactoring to improve code readability. #3422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* @author Pawel Lozinski
* @author Adrian Chlebosz
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 3.1
*/
Expand Down Expand Up @@ -532,7 +533,7 @@ public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd
List<String> notEmbedded = Arrays.stream(topicsToConsume)
.filter(topic -> !this.topics.contains(topic))
.collect(Collectors.toList());
if (notEmbedded.size() > 0) {
if (!notEmbedded.isEmpty()) {
throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
}
final AtomicReference<Collection<TopicPartition>> assigned = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
* @author Pawel Lozinski
* @author Adrian Chlebosz
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 2.2
*/
Expand Down Expand Up @@ -734,7 +735,7 @@ public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd
List<String> notEmbedded = Arrays.stream(topicsToConsume)
.filter(topic -> !this.topics.contains(topic))
.collect(Collectors.toList());
if (notEmbedded.size() > 0) {
if (!notEmbedded.isEmpty()) {
throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
}
final AtomicReference<Collection<TopicPartition>> assigned = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
* @author Filip Halemba
* @author Tomaz Fernandes
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @see KafkaListener
* @see KafkaListenerErrorHandler
Expand Down Expand Up @@ -349,7 +350,7 @@ private void buildEnhancer() {
if (this.applicationContext != null) {
Map<String, AnnotationEnhancer> enhancersMap =
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
if (enhancersMap.size() > 0) {
if (!enhancersMap.isEmpty()) {
List<AnnotationEnhancer> enhancers = enhancersMap.values()
.stream()
.sorted(new OrderComparator())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* Detect and register Avro types for Apache Kafka listeners.
*
* @author Gary Russell
* @author Sagnhyeok An
* @since 3.0
*
*/
Expand Down Expand Up @@ -80,7 +81,7 @@ public BeanRegistrationAotContribution processAheadOfTime(RegisteredBean registe
}
}, method -> method.getName().equals("onMessage"));
}
if (avroTypes.size() > 0) {
if (!avroTypes.isEmpty()) {
return (generationContext, beanRegistrationCode) -> {
ReflectionHints reflectionHints = generationContext.getRuntimeHints().reflection();
avroTypes.forEach(type -> reflectionHints.registerType(type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* {@link Optional#empty()} indicating the broker defaults will be applied.
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.3
*
*/
Expand Down Expand Up @@ -132,7 +133,7 @@ public NewTopic build() {
NewTopic topic = this.replicasAssignments == null
? new NewTopic(this.name, this.partitions, this.replicas)
: new NewTopic(this.name, this.replicasAssignments);
if (this.configs.size() > 0) {
if (!this.configs.isEmpty()) {
topic.configs(this.configs);
}
return topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* @author Chris Gilbert
* @author Adrian Gygax
* @author Yaniv Nahoum
* @author Sanghyeok An
*/
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
implements ConsumerFactory<K, V>, BeanNameAware, ApplicationContextAware {
Expand Down Expand Up @@ -398,7 +399,7 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
if (groupId == null
&& (properties == null || properties.stringPropertyNames().size() == 0)
&& (properties == null || properties.stringPropertyNames().isEmpty())
&& !shouldModifyClientId) {
return createKafkaConsumer(new HashMap<>(this.configs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Adrian Gygax
* @author Sanghyeok An
*
* @since 1.3
*/
Expand Down Expand Up @@ -239,7 +240,7 @@ public void afterSingletonsInstantiated() {
*/
public final boolean initialize() {
Collection<NewTopic> newTopics = newTopics();
if (newTopics.size() > 0) {
if (!newTopics.isEmpty()) {
AdminClient adminClient = null;
try {
adminClient = createAdmin();
Expand Down Expand Up @@ -399,7 +400,7 @@ protected Map<String, Object> getAdminConfig() {
}

private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
if (topics.size() > 0) {
if (!topics.isEmpty()) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
DescribeTopicsResult topicInfo = adminClient
Expand All @@ -409,10 +410,10 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTo
List<NewTopic> topicsToAdd = new ArrayList<>();
Map<String, NewPartitions> topicsWithPartitionMismatches =
checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
if (topicsToAdd.size() > 0) {
if (!topicsToAdd.isEmpty()) {
addTopics(adminClient, topicsToAdd);
}
if (topicsWithPartitionMismatches.size() > 0) {
if (!topicsWithPartitionMismatches.isEmpty()) {
createMissingPartitions(adminClient, topicsWithPartitionMismatches);
}
if (this.modifyTopicConfigs) {
Expand Down Expand Up @@ -457,7 +458,7 @@ private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
configMismatchesEntries.add(actualConfigParameter);
}
}
if (configMismatchesEntries.size() > 0) {
if (!configMismatchesEntries.isEmpty()) {
configMismatches.put(topicConfig.getKey(), configMismatchesEntries);
}
}
Expand Down Expand Up @@ -491,7 +492,7 @@ private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic
desiredConfigs.get(mismatchConfigEntry.name())),
OpType.SET));
}
if (alterConfigOperations.size() > 0) {
if (!alterConfigOperations.isEmpty()) {
try {
AlterConfigsResult alterConfigsResult = adminClient
.incrementalAlterConfigs(Map.of(topicConfigResource, alterConfigOperations));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* @author Tomaz Fernandes
* @author Wang Zhiyang
* @author Soby Chacko
* @author Sanghyeok An
*/
public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
Expand Down Expand Up @@ -570,7 +571,7 @@ protected void checkTopics() {
catch (Exception e) {
this.logger.error(e, "Failed to check topic existence");
}
if (missing != null && missing.size() > 0) {
if (missing != null && !missing.isEmpty()) {
throw new IllegalStateException(
"Topic(s) " + missing.toString()
+ " is/are not present and missingTopicsFatal is true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Common consumer properties.
*
* @author Gary Russell
* @author Sagnhyeok An
* @since 2.3
*
*/
Expand Down Expand Up @@ -520,7 +521,7 @@ protected final String renderProperties() {
+ (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "")
+ "\n syncCommits=" + this.syncCommits
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
+ (this.kafkaConsumerProperties.size() > 0 ? "\n properties=" + this.kafkaConsumerProperties : "")
+ (!this.kafkaConsumerProperties.isEmpty() ? "\n properties=" + this.kafkaConsumerProperties : "")
+ "\n authExceptionRetryInterval=" + this.authExceptionRetryInterval
+ "\n commitRetries=" + this.commitRetries
+ "\n fixTxOffsets" + this.fixTxOffsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* idle.
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.7.3
*
*/
Expand Down Expand Up @@ -186,7 +187,7 @@ public void initialize() {
for (String group : this.groupNames) {
this.groups.add(this.applicationContext.getBean(group + ".group", ContainerGroup.class));
}
if (this.groups.size() > 0) {
if (!this.groups.isEmpty()) {
this.iterator = this.groups.iterator();
this.currentGroup = this.iterator.next();
this.groups.forEach(grp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 1.3.5
*
Expand Down Expand Up @@ -210,7 +211,7 @@ public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()),
ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
if (!offsets.isEmpty() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
}
clearThreadState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @author Artem Bilan
* @author Venil Noronha
* @author Wang ZhiYang
* @author Sanghyeok An
* @since 1.1
*/
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
Expand Down Expand Up @@ -95,6 +96,9 @@ public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
if (recordMessageConverter != null) {
setMessageConverter(recordMessageConverter);
}
else {
logger.warn("No batch message converter is set. because record message converter is null.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* @param <R> the reply data type.
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.3
*
*/
Expand Down Expand Up @@ -162,7 +163,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
}
}
});
if (completed.size() > 0) {
if (!completed.isEmpty()) {
super.onMessage(completed);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @author Sanghyeok An
* @since 2.7
*
*/
Expand All @@ -46,8 +47,7 @@ public DestinationTopic(String destinationName, Properties properties) {
}

public DestinationTopic(String destinationName, DestinationTopic sourceDestinationtopic, String suffix, Type type) {
this.destinationName = destinationName;
this.properties = new Properties(sourceDestinationtopic.properties, suffix, type);
this(destinationName, new Properties(sourceDestinationtopic.properties, suffix, type));
}

public Long getDestinationDelay() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Sanghyeok An
*
* @since 2.1.3
*
Expand Down Expand Up @@ -216,7 +217,7 @@ protected boolean matchesForInbound(String header) {
if (this.outbound) {
return true;
}
if (this.matchers.size() == 0) {
if (this.matchers.isEmpty()) {
return true;
}
return doesMatch(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
* @author Gary Russell
* @author Dariusz Szablinski
* @author Biju Kunjummen
* @author Sanghyeok An
* @since 1.1
*/
public class BatchMessagingMessageConverter implements BatchMessageConverter {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

@Nullable
Copy link
Contributor Author

@chickenchickenlove chickenchickenlove Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI

RecordMessageConverter can be null. Please refer to codes below.

// BatchMessagingMessageConverter.java
	public BatchMessagingMessageConverter() {
		this(null);
	}

	public BatchMessagingMessageConverter(RecordMessageConverter recordConverter) {
		this.recordConverter = recordConverter;
		if (JacksonPresent.isJackson2Present()) {
			this.headerMapper = new DefaultKafkaHeaderMapper();
		}
	}

Then, getRecordMessageConverter() can return null.
BatchMessagingMessageListenerAdapter#setBatchMessageConverter() call getRecordMessageConverter() internally to set messageConverter.

If user created BatchMessagingMessageConverter by using default constructor, there is no message converter even if BatchMessagingMessageListenerAdapter#setBatchMessageConverter() is completed without any warning logs.

private final RecordMessageConverter recordConverter;

private boolean generateMessageId = false;
Expand Down Expand Up @@ -123,6 +125,7 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}

@Nullable
@Override
public RecordMessageConverter getRecordMessageConverter() {
return this.recordConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*
* @author Gary Russell
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 2.8
*
Expand Down Expand Up @@ -108,7 +109,7 @@ public void setCaseSensitive(boolean caseSensitive) {

@SuppressWarnings(UNCHECKED)
protected void configure(Map<String, ?> configs, boolean isKey) {
if (this.delegates.size() > 0) {
if (!this.delegates.isEmpty()) {
this.delegates.values().forEach(delegate -> configureDelegate(configs, isKey, delegate));
}
this.forKeys = isKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
* @author Wang Zhiyang
* @author Mikael Carlstedt
* @author Borahm Lee
* @author Sanghyeok An
*/
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
Expand Down Expand Up @@ -815,7 +816,7 @@ else if (polled.get() == 2) {
latch1.countDown();
latch2.countDown();
acks.add(ack);
if (latch1.getCount() == 0 && records1.values().size() > 0
if (latch1.getCount() == 0 && !records1.isEmpty()
&& records1.values().iterator().next().size() == 4) {
acks.get(3).acknowledge();
acks.get(2).acknowledge();
Expand Down