Skip to content

Commit 2ffe7c4

Browse files
author
Attila Tóth
committed
[pulsar-spark] added option for configuring Pulsar client
1 parent 0372947 commit 2ffe7c4

File tree

4 files changed

+103
-30
lines changed

4 files changed

+103
-30
lines changed

pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.concurrent.ConcurrentHashMap;
5757
import java.util.concurrent.TimeUnit;
5858
import java.util.concurrent.atomic.AtomicInteger;
59+
import java.util.function.Function;
5960

6061
import static org.mockito.Mockito.any;
6162
import static org.mockito.Mockito.mock;
@@ -557,6 +558,16 @@ public void seek(long timestamp) throws PulsarClientException {
557558

558559
}
559560

561+
@Override
562+
public void seek(Function<String, Object> function) throws PulsarClientException {
563+
564+
}
565+
566+
@Override
567+
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
568+
return null;
569+
}
570+
560571
@Override
561572
public CompletableFuture<Void> seekAsync(MessageId messageId) {
562573
return null;

pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@
2222
import static com.google.common.base.Preconditions.checkNotNull;
2323

2424
import java.io.Serializable;
25+
import java.util.HashMap;
26+
import java.util.Map;
2527

26-
import org.apache.pulsar.client.api.Authentication;
27-
import org.apache.pulsar.client.api.Consumer;
28-
import org.apache.pulsar.client.api.MessageListener;
29-
import org.apache.pulsar.client.api.PulsarClient;
30-
import org.apache.pulsar.client.api.PulsarClientException;
28+
import org.apache.pulsar.client.api.*;
3129
import org.apache.pulsar.client.impl.PulsarClientImpl;
30+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
3231
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
3332
import org.apache.spark.storage.StorageLevel;
3433
import org.apache.spark.streaming.receiver.Receiver;
@@ -43,34 +42,52 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
4342
private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
4443

4544
private String serviceUrl;
46-
private ConsumerConfigurationData<byte[]> conf;
45+
private Map<String,Object> clientConfig;
46+
private ConsumerConfigurationData<byte[]> consumerConfig;
4747
private Authentication authentication;
4848
private PulsarClient pulsarClient;
4949
private Consumer<byte[]> consumer;
5050

5151
public SparkStreamingPulsarReceiver(
5252
String serviceUrl,
53-
ConsumerConfigurationData<byte[]> conf,
53+
ConsumerConfigurationData<byte[]> consumerConfig,
5454
Authentication authentication) {
55-
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf, authentication);
55+
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, new HashMap<>(), consumerConfig, authentication);
56+
}
57+
58+
public SparkStreamingPulsarReceiver(
59+
String serviceUrl,
60+
Map<String,Object> clientConfig,
61+
ConsumerConfigurationData<byte[]> consumerConfig,
62+
Authentication authentication) {
63+
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, clientConfig, consumerConfig, authentication);
64+
}
65+
66+
public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
67+
String serviceUrl,
68+
ConsumerConfigurationData<byte[]> consumerConf,
69+
Authentication authentication) {
70+
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, new HashMap<>(), consumerConf, authentication);
5671
}
5772

5873
public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
5974
String serviceUrl,
60-
ConsumerConfigurationData<byte[]> conf,
75+
Map<String,Object> clientConfig,
76+
ConsumerConfigurationData<byte[]> consumerConfig,
6177
Authentication authentication) {
6278
super(storageLevel);
6379

6480
checkNotNull(serviceUrl, "serviceUrl must not be null");
65-
checkNotNull(conf, "ConsumerConfigurationData must not be null");
66-
checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set a value.");
67-
checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be null");
81+
checkNotNull(consumerConfig, "ConsumerConfigurationData must not be null");
82+
checkNotNull(clientConfig, "Client configuration map must not be null");
83+
checkArgument(consumerConfig.getTopicNames().size() > 0, "TopicNames must be set a value.");
84+
checkNotNull(consumerConfig.getSubscriptionName(), "SubscriptionName must not be null");
6885

6986
this.serviceUrl = serviceUrl;
7087
this.authentication = authentication;
7188

72-
if (conf.getMessageListener() == null) {
73-
conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
89+
if (consumerConfig.getMessageListener() == null) {
90+
consumerConfig.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
7491
try {
7592
store(msg.getData());
7693
consumer.acknowledgeAsync(msg);
@@ -80,13 +97,18 @@ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
8097
}
8198
});
8299
}
83-
this.conf = conf;
100+
this.clientConfig = clientConfig;
101+
this.consumerConfig = consumerConfig;
84102
}
85103

86104
public void onStart() {
87105
try {
88-
pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
89-
consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join();
106+
ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication);
107+
if (!clientConfig.isEmpty()) {
108+
builder.loadConf(clientConfig);
109+
}
110+
pulsarClient = builder.build();
111+
consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(consumerConfig).join();
90112
} catch (Exception e) {
91113
LOG.error("Failed to start subscription : {}", e.getMessage());
92114
restart("Restart a consumer");

tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
* under the License.
1818
*/
1919
package org.apache.pulsar.tests.integration.compat.kafka;
20-
2120
import org.apache.kafka.clients.producer.KafkaProducer;
2221
import org.apache.kafka.clients.producer.Producer;
2322
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -38,7 +37,7 @@ private String getPlainTextServiceUrl() {
3837
}
3938

4039
@BeforeTest
41-
private void setup() {
40+
private void setupThreadSafeTest() {
4241
Properties producerProperties = new Properties();
4342
producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
4443
producerProperties.put("key.serializer", IntegerSerializer.class.getName());

tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.HashSet;
2828
import java.util.Map;
2929
import java.util.Set;
30+
import java.util.function.Supplier;
3031

3132
import org.apache.commons.lang3.mutable.MutableInt;
3233
import org.apache.pulsar.client.api.Consumer;
@@ -48,7 +49,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
4849
private static final String EXPECTED_MESSAGE = "pulsar-spark test message";
4950

5051
@Test(dataProvider = "ServiceUrls")
51-
public void testReceivedMessage(String serviceUrl) throws Exception {
52+
public void testReceivedMessage(Supplier<String> serviceUrl) throws Exception {
5253
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
5354

5455
Set<String> set = new HashSet<>();
@@ -68,14 +69,14 @@ public void received(Consumer consumer, Message msg) {
6869
consConf.setMessageListener(msgListener);
6970

7071
SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
71-
serviceUrl,
72+
serviceUrl.get(),
7273
consConf,
7374
new AuthenticationDisabled());
7475

7576
receiver.onStart();
7677
waitForTransmission();
7778

78-
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
79+
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
7980
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
8081
producer.send(EXPECTED_MESSAGE.getBytes());
8182

@@ -85,7 +86,7 @@ public void received(Consumer consumer, Message msg) {
8586
}
8687

8788
@Test(dataProvider = "ServiceUrls")
88-
public void testDefaultSettingsOfReceiver(String serviceUrl) {
89+
public void testDefaultSettingsOfReceiver(Supplier<String> serviceUrl) {
8990
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
9091

9192
Set<String> set = new HashSet<>();
@@ -94,7 +95,7 @@ public void testDefaultSettingsOfReceiver(String serviceUrl) {
9495
consConf.setSubscriptionName(SUBS);
9596

9697
SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
97-
serviceUrl,
98+
serviceUrl.get(),
9899
consConf,
99100
new AuthenticationDisabled());
100101

@@ -103,7 +104,7 @@ public void testDefaultSettingsOfReceiver(String serviceUrl) {
103104
}
104105

105106
@Test(dataProvider = "ServiceUrls")
106-
public void testSharedSubscription(String serviceUrl) throws Exception {
107+
public void testSharedSubscription(Supplier<String> serviceUrl) throws Exception {
107108
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
108109

109110
Set<String> set = new HashSet<>();
@@ -120,20 +121,20 @@ public void testSharedSubscription(String serviceUrl) throws Exception {
120121
});
121122

122123
SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
123-
serviceUrl,
124+
serviceUrl.get(),
124125
consConf,
125126
new AuthenticationDisabled());
126127

127128
SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
128-
serviceUrl,
129+
serviceUrl.get(),
129130
consConf,
130131
new AuthenticationDisabled());
131132

132133
receiver1.onStart();
133134
receiver2.onStart();
134135
waitForTransmission();
135136

136-
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
137+
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
137138
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
138139
for (int i = 0; i < 10; i++) {
139140
producer.send(EXPECTED_MESSAGE.getBytes());
@@ -149,8 +150,48 @@ public void testSharedSubscription(String serviceUrl) throws Exception {
149150
@Test(expectedExceptions = NullPointerException.class,
150151
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
151152
dataProvider = "ServiceUrls")
152-
public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
153-
new SparkStreamingPulsarReceiver(serviceUrl, null, new AuthenticationDisabled());
153+
public void testReceiverWhenConsumerConfigurationIsNull(Supplier<String> serviceUrl) {
154+
new SparkStreamingPulsarReceiver(
155+
serviceUrl.get(),
156+
null,
157+
new AuthenticationDisabled());
158+
}
159+
160+
@Test(dataProvider = "ServiceUrls")
161+
public void testOverrideServiceUrlWithClientConfiguration(Supplier<String> serviceUrl) {
162+
Map<String,Object> testClientConfig = new HashMap<>();
163+
testClientConfig.put("serviceUrl",serviceUrl.get());
164+
165+
ConsumerConfigurationData<byte[]> testConsumerConfig = new ConsumerConfigurationData<>();
166+
Set<String> set = new HashSet<>();
167+
set.add(TOPIC);
168+
testConsumerConfig.setTopicNames(set);
169+
testConsumerConfig.setSubscriptionName(SUBS);
170+
testConsumerConfig.setSubscriptionType(SubscriptionType.Shared);
171+
testConsumerConfig.setReceiverQueueSize(1);
172+
173+
String deliberatelyWrongServiceUrl = "http://invalid.service.url:1234";
174+
175+
SparkStreamingPulsarReceiver testReceiver = new SparkStreamingPulsarReceiver(
176+
deliberatelyWrongServiceUrl,
177+
testClientConfig,
178+
testConsumerConfig,
179+
new AuthenticationDisabled());
180+
181+
testReceiver.onStart();
182+
waitForTransmission();
183+
testReceiver.onStop();
184+
}
185+
186+
@Test(expectedExceptions = NullPointerException.class,
187+
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
188+
dataProvider = "ServiceUrls")
189+
public void testReceiverWhenClientConfigurationIsNull(Supplier<String> serviceUrl) {
190+
new SparkStreamingPulsarReceiver(
191+
serviceUrl.get(),
192+
null,
193+
null,
194+
new AuthenticationDisabled());
154195
}
155196

156197
private static void waitForTransmission() {

0 commit comments

Comments
 (0)