Skip to content

Minor cleanup and refactoring #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.config.PulsarListenerConfigUtils;
import org.springframework.pulsar.config.PulsarListenerContainerFactoryImpl;
import org.springframework.pulsar.config.DefaultPulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerBeanNames;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.PulsarContainerProperties;

/**
* Configuration for Pulsar annotation-driven support.
*
* @author Soby Chacko
* @author Chris Bono
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnablePulsar.class)
Expand All @@ -44,9 +45,9 @@ public PulsarAnnotationDrivenConfiguration(PulsarProperties pulsarProperties) {

@Bean
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
PulsarListenerContainerFactoryImpl<?, ?> pulsarListenerContainerFactory(
DefaultPulsarListenerContainerFactory<?, ?> pulsarListenerContainerFactory(
ObjectProvider<PulsarConsumerFactory<Object>> pulsarConsumerFactory) {
PulsarListenerContainerFactoryImpl<Object, Object> factory = new PulsarListenerContainerFactoryImpl<>();
DefaultPulsarListenerContainerFactory<Object, Object> factory = new DefaultPulsarListenerContainerFactory<>();

final PulsarConsumerFactory<Object> pulsarConsumerFactory1 = pulsarConsumerFactory.getIfAvailable();
factory.setPulsarConsumerFactory(pulsarConsumerFactory1);
Expand All @@ -68,7 +69,7 @@ public PulsarAnnotationDrivenConfiguration(PulsarProperties pulsarProperties) {

@Configuration(proxyBeanMethods = false)
@EnablePulsar
@ConditionalOnMissingBean(name = PulsarListenerConfigUtils.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnMissingBean(name = PulsarListenerBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import org.springframework.pulsar.core.PulsarTemplate;

/**
* Tests for {@link PulsarListener}.
*
* @author Soby Chacko
* @author Chris Bono
*/
class PulsarListenerTests extends AbstractContainerBaseTests {

Expand Down Expand Up @@ -86,7 +89,7 @@ public void listen(String foo) {
@Import(PulsarAutoConfiguration.class)
public static class BatchListenerConfig {

@PulsarListener(subscriptionName = "test-exclusive-sub-2", topics = "hello-pulsar-exclusive", batch = "true")
@PulsarListener(subscriptionName = "test-exclusive-sub-2", topics = "hello-pulsar-exclusive", batch = true)
public void listen(List<String> foo) {
foo.forEach(t -> latch2.countDown());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@
import org.springframework.context.annotation.Import;

/**
* Enable Pulsar listener annotated endpoints that are created under the covers by a
* {@link org.springframework.pulsar.config.AbstractPulsarListenerContainerFactory}.
* Enables detection of {@link PulsarListener} annotations on any Spring-managed bean in the container.
*
* @author Soby Chacko
* @author Chris Bono
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(PulsarListenerConfigurationSelector.class)
public @interface EnablePulsar {
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.pulsar.config.PulsarListenerConfigUtils;
import org.springframework.pulsar.config.PulsarListenerBeanNames;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;

/**
Expand All @@ -32,6 +32,7 @@
* annotation.
*
* @author Soby Chacko
* @author Chris Bono
*
* @see PulsarListenerAnnotationBeanPostProcessor
* @see PulsarListenerEndpointRegistry
Expand All @@ -41,15 +42,13 @@ public class PulsarBootstrapConfiguration implements ImportBeanDefinitionRegistr

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
PulsarListenerConfigUtils.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

registry.registerBeanDefinition(PulsarListenerConfigUtils.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
if (!registry.containsBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(PulsarListenerAnnotationBeanPostProcessor.class));
}

if (!registry.containsBeanDefinition(PulsarListenerConfigUtils.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(PulsarListenerConfigUtils.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
if (!registry.containsBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(PulsarListenerEndpointRegistry.class));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;

/**
* Annotation that marks a method to be the target of a Pulsar message listener on the
Expand All @@ -43,6 +45,7 @@
* </p>
*
* @author Soby Chacko
* @author Chris Bono
*/
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
Expand All @@ -55,7 +58,7 @@
* <p>If none is specified an auto-generated id is used.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.pulsar.config.PulsarListenerEndpointRegistry#getListenerContainer(String)
* @see PulsarListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";

Expand All @@ -74,8 +77,13 @@
SchemaType schemaType() default SchemaType.NONE;

/**
* Specific container factory to use on this listener.
* @return {@code containerFactory} to use on this Pulsar listener.
* The bean name of the {@link PulsarListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>
* If not specified, the default container factory is used, if any. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* container factory instance or a bean name.
* @return the container factory bean name.
*/
String containerFactory() default "";

Expand Down Expand Up @@ -108,7 +116,7 @@
*
* @return whether this listener is in batch mode or not.
*/
String batch() default "";
boolean batch() default false;

/**
* A pseudo bean name used in SpEL expressions within this annotation to reference
Expand All @@ -120,5 +128,25 @@
*/
String beanRef() default "__listener";

/**
* Pulsar consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <p>
* <b>Supported Syntax</b>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* SpEL expressions must resolve to a {@link String}, a @{link String[]} or a
* {@code Collection<String>} where each member of the array or collection is a
* property name + value with the above formats.
* @return the properties.
*/
String[] properties() default {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
Expand All @@ -61,7 +60,6 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.OrderComparator;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
Expand All @@ -80,8 +78,9 @@
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.pulsar.config.MethodPulsarListenerEndpoint;
import org.springframework.pulsar.config.PulsarListenerConfigUtils;
import org.springframework.pulsar.config.PulsarListenerBeanNames;
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpoint;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
import org.springframework.util.Assert;
Expand All @@ -92,12 +91,12 @@
/**
* Bean post-processor that registers methods annotated with {@link PulsarListener}
* to be invoked by a Pulsar message listener container created under the covers
* by a {@link org.springframework.pulsar.config.PulsarListenerContainerFactory}
* by a {@link PulsarListenerContainerFactory}
* according to the parameters of the annotation.
*
* <p>Annotated methods can use flexible arguments as defined by {@link PulsarListener}.
*
* <p>This post-processor is automatically registered by Spring's {@link EnablePulsar}
* <p>This post-processor is automatically registered by the {@link EnablePulsar}
* annotation.
*
* <p>Auto-detect any {@link PulsarListenerConfigurer} instances in the container,
Expand All @@ -109,13 +108,14 @@
* @param <V> the value type.
*
* @author Soby Chacko
* @author Chris Bono
*
* @see PulsarListener
* @see EnablePulsar
* @see PulsarListenerConfigurer
* @see PulsarListenerEndpointRegistrar
* @see PulsarListenerEndpointRegistry
* @see org.springframework.pulsar.config.PulsarListenerEndpoint
* @see PulsarListenerEndpoint
* @see MethodPulsarListenerEndpoint
*/
public class PulsarListenerAnnotationBeanPostProcessor<K, V> implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {
Expand All @@ -136,17 +136,20 @@ public class PulsarListenerAnnotationBeanPostProcessor<K, V> implements BeanPost
private static final String GENERATED_ID_PREFIX = "org.springframework.Pulsar.PulsarListenerEndpointContainer#";

private ApplicationContext applicationContext;

private BeanFactory beanFactory;

private BeanExpressionResolver resolver;

private BeanExpressionContext expressionContext;

private PulsarListenerEndpointRegistry endpointRegistry;

private String defaultContainerFactoryBeanName = DEFAULT_PULSAR_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

private final PulsarListenerEndpointRegistrar registrar = new PulsarListenerEndpointRegistrar();
private final PulsarHandlerMethodFactoryAdapter messageHandlerMethodFactory =
new PulsarHandlerMethodFactoryAdapter();

private final PulsarHandlerMethodFactoryAdapter messageHandlerMethodFactory = new PulsarHandlerMethodFactoryAdapter();

private Charset charset = StandardCharsets.UTF_8;

Expand Down Expand Up @@ -179,23 +182,20 @@ public void setCharset(Charset charset) {
}

@Override
public void afterPropertiesSet() throws Exception {
public void afterPropertiesSet() {
buildEnhancer();
}

private void buildEnhancer() {
if (this.applicationContext != null) {
Map<String, AnnotationEnhancer> enhancersMap =
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
if (enhancersMap.size() > 0) {
List<AnnotationEnhancer> enhancers = enhancersMap.values()
.stream()
.sorted(new OrderComparator())
.collect(Collectors.toList());
List<AnnotationEnhancer> enhancers = this.applicationContext
.getBeanProvider(AnnotationEnhancer.class, false)
.orderedStream()
.toList();
if (!enhancers.isEmpty()) {
this.enhancer = (attrs, element) -> {
Map<String, Object> newAttrs = attrs;
for (AnnotationEnhancer enh : enhancers) {
newAttrs = enh.apply(newAttrs, element);
attrs = enh.apply(attrs, element);
}
return attrs;
};
Expand All @@ -207,20 +207,15 @@ private void buildEnhancer() {
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);

if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, PulsarListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(PulsarListenerConfigurer.class);
for (PulsarListenerConfigurer configurer : instances.values()) {
configurer.configurePulsarListeners(this.registrar);
}
}
this.beanFactory.getBeanProvider(PulsarListenerConfigurer.class)
.forEach(c -> c.configurePulsarListeners(this.registrar));

if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
PulsarListenerConfigUtils.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
PulsarListenerBeanNames.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
PulsarListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
Expand All @@ -230,7 +225,7 @@ public void afterSingletonsInstantiated() {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}

// Set the custom handler method factory once resolved by the configurer
// Set the custom handler method factory once resolved by the configurer - otherwise register default formatters
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
Expand All @@ -252,7 +247,6 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<PulsarListener> classLevelListeners = findListenerAnnotations(targetClass);
Map<Method, Set<PulsarListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<PulsarListener>>) method -> {
Set<PulsarListener> listenerMethods = findListenerAnnotations(method);
Expand Down Expand Up @@ -360,9 +354,7 @@ private void processPulsarListenerAnnotation(MethodPulsarListenerEndpoint<?> end
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
}
resolvePulsarProperties(endpoint, pulsarListener.properties());
if (StringUtils.hasText(pulsarListener.batch())) {
endpoint.setBatchListener(Boolean.parseBoolean(pulsarListener.batch()));
}
endpoint.setBatchListener(pulsarListener.batch());
endpoint.setBeanFactory(this.beanFactory);
}

Expand Down Expand Up @@ -588,26 +580,9 @@ private PulsarListener enhance(AnnotatedElement element, PulsarListener ann) {


private void addFormatters(FormatterRegistry registry) {
for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
registry.addConverter(converter);
}
for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
registry.addConverter(converter);
}
for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {
registry.addFormatter(formatter);
}
}

private <T> Collection<T> getBeansOfType(Class<T> type) {
if (PulsarListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
return ((ListableBeanFactory) PulsarListenerAnnotationBeanPostProcessor.this.beanFactory)
.getBeansOfType(type)
.values();
}
else {
return Collections.emptySet();
}
this.beanFactory.getBeanProvider(Converter.class).forEach(registry::addConverter);
this.beanFactory.getBeanProvider(GenericConverter.class).forEach(registry::addConverter);
this.beanFactory.getBeanProvider(Formatter.class).forEach(registry::addFormatter);
}

@Override
Expand Down Expand Up @@ -802,4 +777,3 @@ public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, Anno


}

Loading