From bce10bdccfe0230653229b7705719ce0023d9dc7 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Fri, 13 Sep 2024 10:32:03 -0500 Subject: [PATCH] Ensure default topics are fully qualified (#849) * Ensure default topics are fully qualified Prior to this commit, only the topics specified on the `@PulsarListener` were being fully qualified in the `DefaultPulsarConsumerFactory`. With this change all topics (including the default topics driven by the Spring Boot `spring.pulsar.consumer.topics` config prop) are fully-qualified. --- .../PulsarListenerIntegrationTests.java | 5 +++-- .../core/DefaultPulsarConsumerFactory.java | 18 ++++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java b/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java index f8854919..25bce16c 100644 --- a/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java +++ b/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -160,7 +160,8 @@ static class ConfigPropsDrivenListenerConfig { public void listen(String ignored, Consumer consumer) { assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class)) .satisfies((conf) -> { - assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev"); + assertThat(conf.getSingleTopic()) + .isEqualTo("persistent://public/default/plit-config-props-topic-dev"); assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev"); }); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index fb2827e4..88c4b017 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -101,10 +101,11 @@ public Consumer createConsumer(Schema schema, @Nullable Collection Objects.requireNonNull(schema, "Schema must be specified"); ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(schema); - // Apply the default config customizer (preserve the topic) + // Apply the default config customizer if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) { this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder))); } + // Preserve the passed in topics (don't let default config customizer win) if (topics != null) { replaceTopicsOnBuilder(consumerBuilder, topics); } @@ -117,6 +118,9 @@ public Consumer createConsumer(Schema schema, @Nullable Collection if (!CollectionUtils.isEmpty(customizers)) { customizers.forEach(customizer -> customizer.customize(consumerBuilder)); } + if (this.topicBuilder != null) { + this.ensureTopicNamesFullyQualified(consumerBuilder); + } try { return consumerBuilder.subscribe(); } @@ -126,9 +130,6 @@ public Consumer createConsumer(Schema schema, @Nullable Collection } private void replaceTopicsOnBuilder(ConsumerBuilder builder, Collection topics) { - if (this.topicBuilder != null) { - topics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); - } var builderImpl = (ConsumerBuilderImpl) builder; builderImpl.getConf().setTopicNames(new HashSet<>(topics)); } @@ -139,4 +140,13 @@ private void replaceMetadataPropertiesOnBuilder(ConsumerBuilder builder, builderImpl.getConf().setProperties(new TreeMap<>(metadataProperties)); } + protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { + var builderImpl = (ConsumerBuilderImpl) builder; + var topics = builderImpl.getConf().getTopicNames(); + if (!CollectionUtils.isEmpty(topics)) { + var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); + builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics)); + } + } + }