Skip to content

Use enum for PulsarListener subscriptionType #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions spring-pulsar-docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -536,17 +536,17 @@ Here is an example.
====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
Expand All @@ -563,12 +563,12 @@ Here is an example.
====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "shared")
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "shared")
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,19 +58,19 @@ ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
}

@PulsarListener(subscriptionName = "failover-subscription-demo", topics = "failover-demo-topic",
subscriptionType = "failover")
subscriptionType = SubscriptionType.Failover)
void listen1(String foo) {
this.logger.info("failover-listen1 : " + foo);
}

@PulsarListener(subscriptionName = "failover-subscription-demo", topics = "failover-demo-topic",
subscriptionType = "failover")
subscriptionType = SubscriptionType.Failover)
void listen2(String foo) {
this.logger.info("failover-listen2 : " + foo);
}

@PulsarListener(subscriptionName = "failover-subscription-demo", topics = "failover-demo-topic",
subscriptionType = "failover")
subscriptionType = SubscriptionType.Failover)
void listen(String foo) {
this.logger.info("failover-listen3 : " + foo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.messaging.handler.annotation.MessageMapping;
Expand Down Expand Up @@ -76,7 +77,7 @@
* Pulsar subscription type for this listener.
* @return the {@code subscriptionType} for this listener
*/
String subscriptionType() default "";
SubscriptionType subscriptionType() default SubscriptionType.Exclusive;

SchemaType schemaType() default SchemaType.NONE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.SubscriptionType;

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
Expand Down Expand Up @@ -347,7 +345,7 @@ private void processPulsarListenerAnnotation(MethodPulsarListenerEndpoint<?> end
endpoint.setId(getEndpointId(pulsarListener));
endpoint.setTopics(topics);
endpoint.setTopicPattern(topicPattern);
endpoint.setSubscriptionType(getEndpointSubscriptionType(pulsarListener));
endpoint.setSubscriptionType(pulsarListener.subscriptionType());
endpoint.setSchemaType(pulsarListener.schemaType());
endpoint.setAckMode(pulsarListener.ackMode());

Expand Down Expand Up @@ -511,32 +509,14 @@ private String getEndpointSubscriptionName(PulsarListener pulsarListener) {
if (StringUtils.hasText(pulsarListener.subscriptionName())) {
return resolveExpressionAsString(pulsarListener.subscriptionName(), "subscriptionName");
}
else {
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
}
}

private SubscriptionType getEndpointSubscriptionType(PulsarListener pulsarListener) {
final String subscriptionType = pulsarListener.subscriptionType().toLowerCase();
if (StringUtils.hasText(subscriptionType)) {
return switch (subscriptionType) {
case "exclusive" -> SubscriptionType.Exclusive;
case "failover" -> SubscriptionType.Failover;
case "shared" -> SubscriptionType.Shared;
case "key_shared" -> SubscriptionType.Key_Shared;
default -> SubscriptionType.Exclusive;
};
}
return null;
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
}

private String getEndpointId(PulsarListener pulsarListener) {
if (StringUtils.hasText(pulsarListener.id())) {
return resolveExpressionAsString(pulsarListener.id(), "id");
}
else {
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
}
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
}

private String getTopicPattern(PulsarListener pulsarListener) {
Expand Down Expand Up @@ -642,8 +622,7 @@ private Collection<PulsarListener> findListenerAnnotations(Class<?> clazz) {
}
PulsarListeners anns = AnnotationUtils.findAnnotation(clazz, PulsarListeners.class);
if (anns != null) {
listeners
.addAll(Arrays.stream(anns.value()).map(anno -> enhance(clazz, anno)).collect(Collectors.toList()));
listeners.addAll(Arrays.stream(anns.value()).map(anno -> enhance(clazz, anno)).toList());
}
return listeners;
}
Expand All @@ -657,8 +636,7 @@ private Set<PulsarListener> findListenerAnnotations(Method method) {
}
PulsarListeners anns = AnnotationUtils.findAnnotation(method, PulsarListeners.class);
if (anns != null) {
listeners.addAll(
Arrays.stream(anns.value()).map(anno -> enhance(method, anno)).collect(Collectors.toList()));
listeners.addAll(Arrays.stream(anns.value()).map(anno -> enhance(method, anno)).toList());
}
return listeners;
}
Expand All @@ -667,11 +645,8 @@ private PulsarListener enhance(AnnotatedElement element, PulsarListener ann) {
if (this.enhancer == null) {
return ann;
}
else {
return AnnotationUtils.synthesizeAnnotation(
this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), PulsarListener.class,
null);
}
return AnnotationUtils.synthesizeAnnotation(
this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), PulsarListener.class, null);
}

private void addFormatters(FormatterRegistry registry) {
Expand Down Expand Up @@ -817,9 +792,7 @@ public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {
|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)
|| target.equals(Short.class) || target.equals(Byte.class);
}
else {
return false;
}
return false;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
Expand Down Expand Up @@ -224,7 +225,7 @@ void listen1(String message) {
}

@PulsarListener(id = "bar", topics = "concurrency-on-pl", subscriptionName = "subscription-2",
subscriptionType = "failover", concurrency = "3")
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen2(String message) {
latch1.countDown();
}
Expand Down Expand Up @@ -264,7 +265,7 @@ static class NegativeAckRedeliveryConfig {

@PulsarListener(id = "withNegRedeliveryBackoff", subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = "Shared")
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
nackRedeliveryBackoffLatch.countDown();
throw new RuntimeException("fail " + msg);
Expand Down Expand Up @@ -300,8 +301,8 @@ static class AckTimeoutRedeliveryConfig {
@PulsarListener(id = "withAckTimeoutRedeliveryBackoff",
subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff", subscriptionType = "Shared",
properties = { "ackTimeoutMillis=1" })
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1" })
void listen(String msg) {
ackTimeoutRedeliveryBackoffLatch.countDown();
throw new RuntimeException();
Expand Down Expand Up @@ -344,8 +345,7 @@ void listen(String msg) {
throw new RuntimeException("fail " + msg);
}

@PulsarListener(id = "pceh-dltListener", subscriptionType = "dltListenerSubscription",
topics = "pceht-topic-pceht-subscription-DLT")
@PulsarListener(id = "pceh-dltListener", topics = "pceht-topic-pceht-subscription-DLT")
void listenDlq(String msg) {
dltLatch.countDown();
}
Expand Down Expand Up @@ -381,14 +381,14 @@ void pulsarListenerWithDeadLetterPolicy() throws Exception {
static class DeadLetterPolicyConfig {

@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "dlpt-topic-1", deadLetterPolicy = "deadLetterPolicy", subscriptionType = "Shared",
properties = { "ackTimeoutMillis=1" })
topics = "dlpt-topic-1", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1" })
void listen(String msg) {
latch.countDown();
throw new RuntimeException("fail " + msg);
}

@PulsarListener(id = "dlqListener", subscriptionType = "dlqListenerSubscription", topics = "dlpt-dlq-topic")
@PulsarListener(id = "dlqListener", topics = "dlpt-dlq-topic")
void listenDlq(String msg) {
dlqLatch.countDown();
}
Expand Down