Skip to content

Use type safe structures to configure the consumer #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spring-pulsar-docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ class DeadLetterPolicyConfig {

@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1" })
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.core;

import org.apache.pulsar.client.api.ConsumerBuilder;

/**
* The interface to customize a {@link ConsumerBuilder}.
*
* @param <T> The message payload type
* @author Christophe Bornet
*/
@FunctionalInterface
public interface ConsumerBuilderCustomizer<T> {

/**
* Customizes a {@link ConsumerBuilder}.
* @param consumerBuilder the builder to customize
*/
void customize(ConsumerBuilder<T> consumerBuilder);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.springframework.pulsar.core;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
Expand All @@ -30,6 +33,7 @@
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;

import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -38,75 +42,78 @@
* @param <T> underlying payload type for the consumer.
* @author Soby Chacko
* @author Alexander Preuß
* @author Christophe Bornet
*/
public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T> {

private final Map<String, Object> consumerConfig = new HashMap<>();
private final Map<String, Object> consumerConfig;

private final List<Consumer<T>> consumers = new ArrayList<>();

private PulsarClient pulsarClient;
private final PulsarClient pulsarClient;

public DefaultPulsarConsumerFactory(PulsarClient pulsarClient) {
this(pulsarClient, Collections.emptyMap());
}

public DefaultPulsarConsumerFactory(PulsarClient pulsarClient, Map<String, Object> consumerConfig) {
this.pulsarClient = pulsarClient;
if (!CollectionUtils.isEmpty(consumerConfig)) {
this.consumerConfig.putAll(consumerConfig);
}
this.consumerConfig = Collections.unmodifiableMap(consumerConfig);
}

@Override
public Consumer<T> createConsumer(Schema<T> schema, Map<String, Object> propertiesToOverride)
throws PulsarClientException {

final ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);

final Map<String, Object> properties = new HashMap<>(this.consumerConfig);
properties.putAll(propertiesToOverride);
public Consumer<T> createConsumer(Schema<T> schema) throws PulsarClientException {
return createConsumer(schema, null, null, Collections.emptyList());
}

if (!CollectionUtils.isEmpty(properties)) {
consumerBuilder.loadConf(properties);
}
Consumer<T> consumer = consumerBuilder.subscribe();
this.consumers.add(consumer);
return consumer;
@Override
public Consumer<T> createConsumer(Schema<T> schema, Collection<String> topics) throws PulsarClientException {
return createConsumer(schema, topics, null, Collections.emptyList());
}

@Override
public Consumer<T> createConsumer(Schema<T> schema, BatchReceivePolicy batchReceivePolicy,
Map<String, Object> propertiesToOverride) throws PulsarClientException {
public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics,
@Nullable Map<String, String> properties, @Nullable List<ConsumerBuilderCustomizer<T>> customizers)
throws PulsarClientException {
ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);
Map<String, Object> config = new HashMap<>(this.consumerConfig);

final ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);
final Map<String, Object> properties = new HashMap<>(this.consumerConfig);
properties.putAll(propertiesToOverride);
if (topics != null) {
config.put("topicNames", new HashSet<>(topics));
}
if (properties != null) {
config.put("properties", new TreeMap<>(properties));
}

// Remove deadLetterPolicy from the properties here and save it to re-apply after
// calling `loadConf` (https://github.com/apache/pulsar/issues/11646)
DeadLetterPolicy deadLetterPolicy = null;
if (properties.containsKey("deadLetterPolicy")) {
deadLetterPolicy = (DeadLetterPolicy) properties.remove("deadLetterPolicy");
if (config.containsKey("deadLetterPolicy")) {
deadLetterPolicy = (DeadLetterPolicy) config.remove("deadLetterPolicy");
}

if (!CollectionUtils.isEmpty(properties)) {
consumerBuilder.loadConf(properties);
}
consumerBuilder.loadConf(config);

if (deadLetterPolicy != null) {
consumerBuilder.deadLetterPolicy(deadLetterPolicy);
}

if (properties.containsKey("negativeAckRedeliveryBackoff")) {
final RedeliveryBackoff negativeAckRedeliveryBackoff = (RedeliveryBackoff) properties
if (config.containsKey("negativeAckRedeliveryBackoff")) {
RedeliveryBackoff negativeAckRedeliveryBackoff = (RedeliveryBackoff) config
.get("negativeAckRedeliveryBackoff");
consumerBuilder.negativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
}

if (properties.containsKey("ackTimeoutRedeliveryBackoff")) {
final RedeliveryBackoff ackTimeoutRedeliveryBackoff = (RedeliveryBackoff) properties
if (config.containsKey("ackTimeoutRedeliveryBackoff")) {
RedeliveryBackoff ackTimeoutRedeliveryBackoff = (RedeliveryBackoff) config
.get("ackTimeoutRedeliveryBackoff");
consumerBuilder.ackTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
}

consumerBuilder.batchReceivePolicy(batchReceivePolicy);
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}

Consumer<T> consumer = consumerBuilder.subscribe();
this.consumers.add(consumer);
return consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,67 @@

package org.springframework.pulsar.core;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

import org.springframework.lang.Nullable;

/**
* Pulsar consumer factory interface.
*
* @param <T> payload type for the consumer.
* @author Soby Chacko
* @author Christophe Bornet
*/
public interface PulsarConsumerFactory<T> {

Consumer<T> createConsumer(Schema<T> schema, Map<String, Object> propertiesToOverride) throws PulsarClientException;
/**
* Create a consumer.
* @param schema the schema of the messages to be sent
* @return the consumer
* @throws PulsarClientException if any error occurs
*/
Consumer<T> createConsumer(Schema<T> schema) throws PulsarClientException;

/**
* Create a consumer.
* @param schema the schema of the messages to be sent
* @param topics the topics the consumer will subscribe to
* @return the consumer
* @throws PulsarClientException if any error occurs
*/
Consumer<T> createConsumer(Schema<T> schema, Collection<String> topics) throws PulsarClientException;

Consumer<T> createConsumer(Schema<T> schema, BatchReceivePolicy batchReceivePolicy,
Map<String, Object> propertiesToOverride) throws PulsarClientException;
/**
* Create a consumer.
* @param schema the schema of the messages to be sent
* @param topics the topics the consumer will subscribe to overriding the default ones
* or {@code null} to use the default topics. Beware that using
* {@link ConsumerBuilder#topic} or {@link ConsumerBuilder#topics} will add to the
* default topics, not override them.
* @param properties the properties to set to the consumer overriding the default ones
* or {@code null} to use the default properties. Beware that using
* {@link ConsumerBuilder#property} or {@link ConsumerBuilder#properties} will add to
* the default properties, not override them.
* @param customizers the optional list of customizers to apply to the consumer
* builder
* @return the consumer
* @throws PulsarClientException if any error occurs
*/
Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics,
@Nullable Map<String, String> properties, @Nullable List<ConsumerBuilderCustomizer<T>> customizers)
throws PulsarClientException;

/**
* Return the configuration options to use when creating consumers.
* @return the configuration options
*/
Map<String, Object> getConsumerConfig();

}
Loading