Skip to content

Remove final keyword in methods #213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
}

@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Map<Method, Set<PulsarListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
}

@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Map<Method, Set<ReactivePulsarListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,22 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageListener(PulsarM
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
PulsarMessagingMessageListenerAdapter<V> messageListener = createMessageListenerInstance(messageConverter);
final HandlerAdapter handlerMethod = configureListenerAdapter(messageListener);
HandlerAdapter handlerMethod = configureListenerAdapter(messageListener);
messageListener.setHandlerMethod(handlerMethod);

// Since we have access to the handler method here, check if we can type infer the
// Schema used.

// TODO: filter out the payload type by excluding Consumer, Message, Messages etc.

final MethodParameter[] methodParameters = handlerMethod.getInvokerHandlerMethod().getMethodParameters();
MethodParameter[] methodParameters = handlerMethod.getInvokerHandlerMethod().getMethodParameters();
MethodParameter messageParameter = null;
final Optional<MethodParameter> parameter = Arrays.stream(methodParameters)
Optional<MethodParameter> parameter = Arrays.stream(methodParameters)
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
|| !methodParameter1.getParameterType().equals(Acknowledgement.class)
|| !methodParameter1.hasParameterAnnotation(Header.class))
.findFirst();
final long count = Arrays.stream(methodParameters)
long count = Arrays.stream(methodParameters)
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
&& !methodParameter1.getParameterType().equals(Acknowledgement.class)
&& !methodParameter1.hasParameterAnnotation(Header.class))
Expand All @@ -145,9 +145,9 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageListener(PulsarM
messageParameter = parameter.get();
}

final ConcurrentPulsarMessageListenerContainer<?> containerInstance = (ConcurrentPulsarMessageListenerContainer<?>) container;
final PulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties();
final SchemaType schemaType = pulsarContainerProperties.getSchemaType();
ConcurrentPulsarMessageListenerContainer<?> containerInstance = (ConcurrentPulsarMessageListenerContainer<?>) container;
PulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties();
SchemaType schemaType = pulsarContainerProperties.getSchemaType();
if (schemaType != SchemaType.NONE) {
switch (schemaType) {
case STRING -> pulsarContainerProperties.setSchema(Schema.STRING);
Expand Down Expand Up @@ -193,7 +193,7 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageListener(PulsarM
}
}
}
final SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
pulsarContainerProperties.setSchemaType(type);

container.setNegativeAckRedeliveryBackoff(this.negativeAckRedeliveryBackoff);
Expand All @@ -206,7 +206,7 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageListener(PulsarM

private Schema<?> getMessageSchema(MethodParameter messageParameter, Function<Class<?>, Schema<?>> schemaFactory) {
ResolvableType messageType = resolvableType(messageParameter);
final Class<?> messageClass = messageType.getRawClass();
Class<?> messageClass = messageType.getRawClass();
return schemaFactory.apply(messageClass);
}

Expand All @@ -221,7 +221,7 @@ private Schema<?> getMessageKeyValueSchema(MethodParameter messageParameter) {

private ResolvableType resolvableType(MethodParameter methodParameter) {
ResolvableType resolvableType = ResolvableType.forMethodParameter(methodParameter);
final Class<?> rawClass = resolvableType.getRawClass();
Class<?> rawClass = resolvableType.getRawClass();
if (rawClass != null && isContainerType(rawClass)) {
resolvableType = resolvableType.getGeneric(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,22 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageHandler(
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
PulsarMessagingMessageListenerAdapter<V> messageListener = createMessageListenerInstance(messageConverter);
final HandlerAdapter handlerMethod = configureListenerAdapter(messageListener);
HandlerAdapter handlerMethod = configureListenerAdapter(messageListener);
messageListener.setHandlerMethod(handlerMethod);

// Since we have access to the handler method here, check if we can type infer the
// Schema used.

// TODO: filter out the payload type by excluding Consumer, Message, Messages etc.

final MethodParameter[] methodParameters = handlerMethod.getInvokerHandlerMethod().getMethodParameters();
MethodParameter[] methodParameters = handlerMethod.getInvokerHandlerMethod().getMethodParameters();
MethodParameter messageParameter = null;
final Optional<MethodParameter> parameter = Arrays.stream(methodParameters)
Optional<MethodParameter> parameter = Arrays.stream(methodParameters)
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
|| !methodParameter1.getParameterType().equals(Acknowledgement.class)
|| !methodParameter1.hasParameterAnnotation(Header.class))
.findFirst();
final long count = Arrays.stream(methodParameters)
long count = Arrays.stream(methodParameters)
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
&& !methodParameter1.getParameterType().equals(Acknowledgement.class)
&& !methodParameter1.hasParameterAnnotation(Header.class))
Expand All @@ -139,10 +139,9 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageHandler(
messageParameter = parameter.get();
}

final DefaultReactivePulsarMessageListenerContainer<?> containerInstance = (DefaultReactivePulsarMessageListenerContainer<?>) container;
final ReactivePulsarContainerProperties<?> pulsarContainerProperties = containerInstance
.getContainerProperties();
final SchemaType schemaType = pulsarContainerProperties.getSchemaType();
DefaultReactivePulsarMessageListenerContainer<?> containerInstance = (DefaultReactivePulsarMessageListenerContainer<?>) container;
ReactivePulsarContainerProperties<?> pulsarContainerProperties = containerInstance.getContainerProperties();
SchemaType schemaType = pulsarContainerProperties.getSchemaType();
if (schemaType != SchemaType.NONE) {
switch (schemaType) {
case STRING -> pulsarContainerProperties.setSchema((Schema) Schema.STRING);
Expand Down Expand Up @@ -188,7 +187,7 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageHandler(
}
}
}
final SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
pulsarContainerProperties.setSchemaType(type);

ReactiveMessageConsumerBuilderCustomizer<V> customizer1 = b -> b.deadLetterPolicy(this.deadLetterPolicy);
Expand All @@ -204,7 +203,7 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageHandler(

private Schema<?> getMessageSchema(MethodParameter messageParameter, Function<Class<?>, Schema<?>> schemaFactory) {
ResolvableType messageType = resolvableType(messageParameter);
final Class<?> messageClass = messageType.getRawClass();
Class<?> messageClass = messageType.getRawClass();
return schemaFactory.apply(messageClass);
}

Expand All @@ -219,7 +218,7 @@ private Schema<?> getMessageKeyValueSchema(MethodParameter messageParameter) {

private ResolvableType resolvableType(MethodParameter methodParameter) {
ResolvableType resolvableType = ResolvableType.forMethodParameter(methodParameter);
final Class<?> rawClass = resolvableType.getRawClass();
Class<?> rawClass = resolvableType.getRawClass();
if (rawClass != null && isContainerType(rawClass)) {
resolvableType = resolvableType.getGeneric(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public CachingPulsarProducerFactory(PulsarClient pulsarClient, Map<String, Objec
@Override
protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
final String topicName = ProducerUtils.resolveTopicName(topic, this);
String topicName = ProducerUtils.resolveTopicName(topic, this);
ProducerCacheKey<T> producerCacheKey = new ProducerCacheKey<>(schema, topicName,
encryptionKeys == null ? null : new HashSet<>(encryptionKeys), customizers);
return this.producerCache.get(producerCacheKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ private CompletableFuture<MessageId> doSendAsync(@Nullable String topic,
@Nullable Collection<String> encryptionKeys, T message,
@Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer,
@Nullable ProducerBuilderCustomizer<T> producerCustomizer) throws PulsarClientException {
final String topicName = ProducerUtils.resolveTopicName(topic, this.producerFactory);
String topicName = ProducerUtils.resolveTopicName(topic, this.producerFactory);
this.logger.trace(() -> String.format("Sending msg to '%s' topic", topicName));

PulsarMessageSenderContext senderContext = PulsarMessageSenderContext.newContext(topicName, this.beanName);
Observation observation = newObservation(senderContext);
try {
observation.start();
final Producer<T> producer = prepareProducerForSend(topic, message, encryptionKeys, producerCustomizer);
Producer<T> producer = prepareProducerForSend(topic, message, encryptionKeys, producerCustomizer);
TypedMessageBuilder<T> messageBuilder = producer.newMessage().value(message);
if (typedMessageBuilderCustomizer != null) {
typedMessageBuilderCustomizer.customize(messageBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public ReactiveMessageSender<T> createSender(String topic, Schema<T> schema,

private ReactiveMessageSender<T> doCreateReactiveMessageSender(String topic, Schema<T> schema,
List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
final String resolvedTopic = ReactiveMessageSenderUtils.resolveTopicName(topic, this);
String resolvedTopic = ReactiveMessageSenderUtils.resolveTopicName(topic, this);
this.logger.trace(() -> String.format("Creating reactive message sender for '%s' topic", resolvedTopic));
final ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
sender.applySpec(this.reactiveMessageSenderSpec);
sender.topic(resolvedTopic);
if (this.reactiveMessageSenderCache != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void setSchema(Schema<T> schema) {
private Mono<MessageId> doSend(String topic, T message,
MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer,
ReactiveMessageSenderBuilderCustomizer<T> customizer) {
final String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory);
String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory);
this.logger.trace(() -> String.format("Sending reactive message to '%s' topic", topicName));
ReactiveMessageSender<T> sender = createMessageSender(topic, message, customizer);
return sender.sendOne(getMessageSpec(messageSpecBuilderCustomizer, message)).doOnError(
Expand All @@ -100,7 +100,7 @@ private Mono<MessageId> doSend(String topic, T message,
}

private Flux<MessageId> doSendMany(String topic, Flux<T> messages) {
final String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory);
String topicName = ReactiveMessageSenderUtils.resolveTopicName(topic, this.reactiveMessageSenderFactory);
this.logger.trace(() -> String.format("Sending reactive messages to '%s' topic", topicName));

if (this.schema != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public DefaultPulsarConsumerErrorHandler(PulsarMessageRecovererFactory<T> pulsar

@Override
public boolean shouldRetryMessage(Exception exception, Message<T> message) {
final Pair pair = this.backOffExecutionThreadLocal.get();
Pair pair = this.backOffExecutionThreadLocal.get();
long nextBackOff;
BackOffExecution backOffExecution;
if (pair != null && pair.message.equals(message)) {
Expand Down Expand Up @@ -85,7 +85,7 @@ public void recoverMessage(Consumer<T> consumer, Message<T> message, Exception e
@SuppressWarnings("unchecked")
public Message<T> currentMessage() {
// there is only one message tracked at any time.
final Pair pair = this.backOffExecutionThreadLocal.get();
Pair pair = this.backOffExecutionThreadLocal.get();
if (pair == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,28 +274,28 @@ private Map<String, Object> extractDirectConsumerProperties() {

private void populateAllNecessaryPropertiesIfNeedBe(Map<String, Object> currentProperties) {
if (currentProperties.containsKey("topicNames")) {
final String topicsFromMap = (String) currentProperties.get("topicNames");
final String[] topicNames = StringUtils.delimitedListToStringArray(topicsFromMap, ",");
final Set<String> propertiesDefinedTopics = Set.of(topicNames);
String topicsFromMap = (String) currentProperties.get("topicNames");
String[] topicNames = StringUtils.delimitedListToStringArray(topicsFromMap, ",");
Set<String> propertiesDefinedTopics = Set.of(topicNames);
if (!propertiesDefinedTopics.isEmpty()) {
currentProperties.put("topicNames", propertiesDefinedTopics);
}
}
if (!currentProperties.containsKey("subscriptionType")) {
final SubscriptionType subscriptionType = this.containerProperties.getSubscriptionType();
SubscriptionType subscriptionType = this.containerProperties.getSubscriptionType();
if (subscriptionType != null) {
currentProperties.put("subscriptionType", subscriptionType);
}
}
if (!currentProperties.containsKey("topicNames")) {
final String[] topics = this.containerProperties.getTopics();
final Set<String> listenerDefinedTopics = new HashSet<>(Arrays.stream(topics).toList());
String[] topics = this.containerProperties.getTopics();
Set<String> listenerDefinedTopics = new HashSet<>(Arrays.stream(topics).toList());
if (!listenerDefinedTopics.isEmpty()) {
currentProperties.put("topicNames", listenerDefinedTopics);
}
}
if (!currentProperties.containsKey("topicsPattern")) {
final String topicsPattern = this.containerProperties.getTopicsPattern();
String topicsPattern = this.containerProperties.getTopicsPattern();
if (topicsPattern != null) {
currentProperties.put("topicsPattern", topicsPattern);
}
Expand All @@ -305,15 +305,15 @@ private void populateAllNecessaryPropertiesIfNeedBe(Map<String, Object> currentP
currentProperties.put("subscriptionName", this.containerProperties.getSubscriptionName());
}
}
final RedeliveryBackoff negativeAckRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.negativeAckRedeliveryBackoff;
RedeliveryBackoff negativeAckRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.negativeAckRedeliveryBackoff;
if (negativeAckRedeliveryBackoff != null) {
currentProperties.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
}
final RedeliveryBackoff ackTimeoutRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.ackTimeoutRedeliveryBackoff;
RedeliveryBackoff ackTimeoutRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.ackTimeoutRedeliveryBackoff;
if (ackTimeoutRedeliveryBackoff != null) {
currentProperties.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
}
final DeadLetterPolicy deadLetterPolicy = DefaultPulsarMessageListenerContainer.this.deadLetterPolicy;
DeadLetterPolicy deadLetterPolicy = DefaultPulsarMessageListenerContainer.this.deadLetterPolicy;
if (deadLetterPolicy != null) {
currentProperties.put("deadLetterPolicy", deadLetterPolicy);
}
Expand Down Expand Up @@ -380,8 +380,7 @@ public void run() {
this.consumer.acknowledge(messages);
}
else {
final Stream<Message<T>> stream = StreamSupport.stream(messages.spliterator(),
true);
Stream<Message<T>> stream = StreamSupport.stream(messages.spliterator(), true);
Message<T> last = stream.reduce((a, b) -> b).orElse(null);
this.consumer.acknowledgeCumulative(last);
}
Expand Down Expand Up @@ -493,7 +492,7 @@ private List<Message<T>> invokeBatchListenerErrorHandler(AtomicBoolean inRetryMo

PulsarBatchListenerFailedException pulsarBatchListenerFailedException = (PulsarBatchListenerFailedException) exception;
Message<T> pulsarMessage = getPulsarMessageCausedTheException(pulsarBatchListenerFailedException);
final Message<T> theCurrentPulsarMessageTracked = this.pulsarConsumerErrorHandler.currentMessage();
Message<T> theCurrentPulsarMessageTracked = this.pulsarConsumerErrorHandler.currentMessage();
// Previous message in error handled during retry but another msg in sublist
// caused error;
// resetting state in order to track it
Expand All @@ -507,10 +506,10 @@ private List<Message<T>> invokeBatchListenerErrorHandler(AtomicBoolean inRetryMo
// handled on the retry. Otherwise, if we are out of retries then the sublist
// does not include
// the message in error (it instead gets recovered).
final int indexOfFailedMessage = messageList.indexOf(pulsarMessage);
int indexOfFailedMessage = messageList.indexOf(pulsarMessage);
messageList = messageList.subList(indexOfFailedMessage, messageList.size());
final boolean toBeRetried = this.pulsarConsumerErrorHandler
.shouldRetryMessage(pulsarBatchListenerFailedException, pulsarMessage);
boolean toBeRetried = this.pulsarConsumerErrorHandler.shouldRetryMessage(pulsarBatchListenerFailedException,
pulsarMessage);
if (toBeRetried) {
inRetryMode.set(true);
}
Expand All @@ -535,7 +534,7 @@ private List<Message<T>> invokeBatchListenerErrorHandler(AtomicBoolean inRetryMo
}

private void invokeRecordListenerErrorHandler(AtomicBoolean inRetryMode, Message<T> message, Exception e) {
final boolean toBeRetried = this.pulsarConsumerErrorHandler.shouldRetryMessage(e, message);
boolean toBeRetried = this.pulsarConsumerErrorHandler.shouldRetryMessage(e, message);
if (toBeRetried) {
inRetryMode.set(true);
}
Expand Down Expand Up @@ -576,7 +575,7 @@ private void handleAcks(Messages<T> messages) {
this.consumer.acknowledge(messages);
}
else {
final Stream<Message<T>> stream = StreamSupport.stream(messages.spliterator(), true);
Stream<Message<T>> stream = StreamSupport.stream(messages.spliterator(), true);
Message<T> last = stream.reduce((a, b) -> b).orElse(null);
this.consumer.acknowledgeCumulative(last);
}
Expand Down
Loading