Skip to content

Commit

Permalink
Internal Refactoring to improve code readability. (#3422)
Browse files Browse the repository at this point in the history
* Replace *.size() =|> 0 with isEmpty().
* Modify to reuse the constructor of DestinationTopic.
* Add @nullable annotation to potential null values.
* Add author.
  • Loading branch information
chickenchickenlove authored Aug 15, 2024
1 parent bcb462e commit 78d7724
Show file tree
Hide file tree
Showing 18 changed files with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* @author Pawel Lozinski
* @author Adrian Chlebosz
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 3.1
*/
Expand Down Expand Up @@ -532,7 +533,7 @@ public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd
List<String> notEmbedded = Arrays.stream(topicsToConsume)
.filter(topic -> !this.topics.contains(topic))
.collect(Collectors.toList());
if (notEmbedded.size() > 0) {
if (!notEmbedded.isEmpty()) {
throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
}
final AtomicReference<Collection<TopicPartition>> assigned = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
* @author Pawel Lozinski
* @author Adrian Chlebosz
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 2.2
*/
Expand Down Expand Up @@ -734,7 +735,7 @@ public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd
List<String> notEmbedded = Arrays.stream(topicsToConsume)
.filter(topic -> !this.topics.contains(topic))
.collect(Collectors.toList());
if (notEmbedded.size() > 0) {
if (!notEmbedded.isEmpty()) {
throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
}
final AtomicReference<Collection<TopicPartition>> assigned = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
* @author Filip Halemba
* @author Tomaz Fernandes
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @see KafkaListener
* @see KafkaListenerErrorHandler
Expand Down Expand Up @@ -349,7 +350,7 @@ private void buildEnhancer() {
if (this.applicationContext != null) {
Map<String, AnnotationEnhancer> enhancersMap =
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
if (enhancersMap.size() > 0) {
if (!enhancersMap.isEmpty()) {
List<AnnotationEnhancer> enhancers = enhancersMap.values()
.stream()
.sorted(new OrderComparator())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* Detect and register Avro types for Apache Kafka listeners.
*
* @author Gary Russell
* @author Sagnhyeok An
* @since 3.0
*
*/
Expand Down Expand Up @@ -80,7 +81,7 @@ public BeanRegistrationAotContribution processAheadOfTime(RegisteredBean registe
}
}, method -> method.getName().equals("onMessage"));
}
if (avroTypes.size() > 0) {
if (!avroTypes.isEmpty()) {
return (generationContext, beanRegistrationCode) -> {
ReflectionHints reflectionHints = generationContext.getRuntimeHints().reflection();
avroTypes.forEach(type -> reflectionHints.registerType(type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* {@link Optional#empty()} indicating the broker defaults will be applied.
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.3
*
*/
Expand Down Expand Up @@ -132,7 +133,7 @@ public NewTopic build() {
NewTopic topic = this.replicasAssignments == null
? new NewTopic(this.name, this.partitions, this.replicas)
: new NewTopic(this.name, this.replicasAssignments);
if (this.configs.size() > 0) {
if (!this.configs.isEmpty()) {
topic.configs(this.configs);
}
return topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* @author Chris Gilbert
* @author Adrian Gygax
* @author Yaniv Nahoum
* @author Sanghyeok An
*/
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
implements ConsumerFactory<K, V>, BeanNameAware, ApplicationContextAware {
Expand Down Expand Up @@ -398,7 +399,7 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
if (groupId == null
&& (properties == null || properties.stringPropertyNames().size() == 0)
&& (properties == null || properties.stringPropertyNames().isEmpty())
&& !shouldModifyClientId) {
return createKafkaConsumer(new HashMap<>(this.configs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Adrian Gygax
* @author Sanghyeok An
*
* @since 1.3
*/
Expand Down Expand Up @@ -239,7 +240,7 @@ public void afterSingletonsInstantiated() {
*/
public final boolean initialize() {
Collection<NewTopic> newTopics = newTopics();
if (newTopics.size() > 0) {
if (!newTopics.isEmpty()) {
AdminClient adminClient = null;
try {
adminClient = createAdmin();
Expand Down Expand Up @@ -399,7 +400,7 @@ protected Map<String, Object> getAdminConfig() {
}

private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
if (topics.size() > 0) {
if (!topics.isEmpty()) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
DescribeTopicsResult topicInfo = adminClient
Expand All @@ -409,10 +410,10 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTo
List<NewTopic> topicsToAdd = new ArrayList<>();
Map<String, NewPartitions> topicsWithPartitionMismatches =
checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
if (topicsToAdd.size() > 0) {
if (!topicsToAdd.isEmpty()) {
addTopics(adminClient, topicsToAdd);
}
if (topicsWithPartitionMismatches.size() > 0) {
if (!topicsWithPartitionMismatches.isEmpty()) {
createMissingPartitions(adminClient, topicsWithPartitionMismatches);
}
if (this.modifyTopicConfigs) {
Expand Down Expand Up @@ -457,7 +458,7 @@ private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
configMismatchesEntries.add(actualConfigParameter);
}
}
if (configMismatchesEntries.size() > 0) {
if (!configMismatchesEntries.isEmpty()) {
configMismatches.put(topicConfig.getKey(), configMismatchesEntries);
}
}
Expand Down Expand Up @@ -491,7 +492,7 @@ private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic
desiredConfigs.get(mismatchConfigEntry.name())),
OpType.SET));
}
if (alterConfigOperations.size() > 0) {
if (!alterConfigOperations.isEmpty()) {
try {
AlterConfigsResult alterConfigsResult = adminClient
.incrementalAlterConfigs(Map.of(topicConfigResource, alterConfigOperations));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* @author Tomaz Fernandes
* @author Wang Zhiyang
* @author Soby Chacko
* @author Sanghyeok An
*/
public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
Expand Down Expand Up @@ -570,7 +571,7 @@ protected void checkTopics() {
catch (Exception e) {
this.logger.error(e, "Failed to check topic existence");
}
if (missing != null && missing.size() > 0) {
if (missing != null && !missing.isEmpty()) {
throw new IllegalStateException(
"Topic(s) " + missing.toString()
+ " is/are not present and missingTopicsFatal is true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Common consumer properties.
*
* @author Gary Russell
* @author Sagnhyeok An
* @since 2.3
*
*/
Expand Down Expand Up @@ -520,7 +521,7 @@ protected final String renderProperties() {
+ (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "")
+ "\n syncCommits=" + this.syncCommits
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
+ (this.kafkaConsumerProperties.size() > 0 ? "\n properties=" + this.kafkaConsumerProperties : "")
+ (!this.kafkaConsumerProperties.isEmpty() ? "\n properties=" + this.kafkaConsumerProperties : "")
+ "\n authExceptionRetryInterval=" + this.authExceptionRetryInterval
+ "\n commitRetries=" + this.commitRetries
+ "\n fixTxOffsets" + this.fixTxOffsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* idle.
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.7.3
*
*/
Expand Down Expand Up @@ -186,7 +187,7 @@ public void initialize() {
for (String group : this.groupNames) {
this.groups.add(this.applicationContext.getBean(group + ".group", ContainerGroup.class));
}
if (this.groups.size() > 0) {
if (!this.groups.isEmpty()) {
this.iterator = this.groups.iterator();
this.currentGroup = this.iterator.next();
this.groups.forEach(grp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 1.3.5
*
Expand Down Expand Up @@ -210,7 +211,7 @@ public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()),
ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
if (!offsets.isEmpty() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
}
clearThreadState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @author Artem Bilan
* @author Venil Noronha
* @author Wang ZhiYang
* @author Sanghyeok An
* @since 1.1
*/
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
Expand Down Expand Up @@ -95,6 +96,9 @@ public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
if (recordMessageConverter != null) {
setMessageConverter(recordMessageConverter);
}
else {
logger.warn("No batch message converter is set. because record message converter is null.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* @param <R> the reply data type.
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.3
*
*/
Expand Down Expand Up @@ -162,7 +163,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
}
}
});
if (completed.size() > 0) {
if (!completed.isEmpty()) {
super.onMessage(completed);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @author Sanghyeok An
* @since 2.7
*
*/
Expand All @@ -46,8 +47,7 @@ public DestinationTopic(String destinationName, Properties properties) {
}

public DestinationTopic(String destinationName, DestinationTopic sourceDestinationtopic, String suffix, Type type) {
this.destinationName = destinationName;
this.properties = new Properties(sourceDestinationtopic.properties, suffix, type);
this(destinationName, new Properties(sourceDestinationtopic.properties, suffix, type));
}

public Long getDestinationDelay() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Sanghyeok An
*
* @since 2.1.3
*
Expand Down Expand Up @@ -216,7 +217,7 @@ protected boolean matchesForInbound(String header) {
if (this.outbound) {
return true;
}
if (this.matchers.size() == 0) {
if (this.matchers.isEmpty()) {
return true;
}
return doesMatch(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
* @author Gary Russell
* @author Dariusz Szablinski
* @author Biju Kunjummen
* @author Sanghyeok An
* @since 1.1
*/
public class BatchMessagingMessageConverter implements BatchMessageConverter {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

@Nullable
private final RecordMessageConverter recordConverter;

private boolean generateMessageId = false;
Expand Down Expand Up @@ -123,6 +125,7 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}

@Nullable
@Override
public RecordMessageConverter getRecordMessageConverter() {
return this.recordConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*
* @author Gary Russell
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 2.8
*
Expand Down Expand Up @@ -108,7 +109,7 @@ public void setCaseSensitive(boolean caseSensitive) {

@SuppressWarnings(UNCHECKED)
protected void configure(Map<String, ?> configs, boolean isKey) {
if (this.delegates.size() > 0) {
if (!this.delegates.isEmpty()) {
this.delegates.values().forEach(delegate -> configureDelegate(configs, isKey, delegate));
}
this.forKeys = isKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
* @author Wang Zhiyang
* @author Mikael Carlstedt
* @author Borahm Lee
* @author Sanghyeok An
*/
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
Expand Down Expand Up @@ -815,7 +816,7 @@ else if (polled.get() == 2) {
latch1.countDown();
latch2.countDown();
acks.add(ack);
if (latch1.getCount() == 0 && records1.values().size() > 0
if (latch1.getCount() == 0 && !records1.isEmpty()
&& records1.values().iterator().next().size() == 4) {
acks.get(3).acknowledge();
acks.get(2).acknowledge();
Expand Down

0 comments on commit 78d7724

Please sign in to comment.