Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import static com.google.common.base.Preconditions.checkNotNull;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.spark.storage.StorageLevel;
Expand All @@ -43,34 +46,52 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);

private String serviceUrl;
private ConsumerConfigurationData<byte[]> conf;
private Map<String,Object> clientConfig;
private ConsumerConfigurationData<byte[]> consumerConfig;
private Authentication authentication;
private PulsarClient pulsarClient;
private Consumer<byte[]> consumer;

public SparkStreamingPulsarReceiver(
String serviceUrl,
ConsumerConfigurationData<byte[]> conf,
ConsumerConfigurationData<byte[]> consumerConfig,
Authentication authentication) {
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf, authentication);
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, new HashMap<>(), consumerConfig, authentication);
}

public SparkStreamingPulsarReceiver(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we are going to introduce so many new builders?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind adding 2 new constructors to the 2 existing ones is I wanted to allow users of the interface to be able to configure Pulsar clients regardless whether they want to specify a storage level or leave it to default. Also, I wanted to keep previous constructors as well to keep backward compatibility.

Please let me know what do you think about this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be happy to remove eg. the one having a default storage level:

public SparkStreamingPulsarReceiver(
            String serviceUrl,
            Map<String,Object> clientConfig,
            ConsumerConfigurationData<byte[]> consumerConfig,
            Authentication authentication) {
        this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, clientConfig, consumerConfig, authentication);
    }

In that way we might have broader functionality (we just enforce the user to set the storage level whenever client configuration is provided).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie What do you think about this proposal? Shall I change the code like this?

String serviceUrl,
Map<String,Object> clientConfig,
ConsumerConfigurationData<byte[]> consumerConfig,
Authentication authentication) {
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, clientConfig, consumerConfig, authentication);
}

public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are introducing a new parameter storageLevel. But it was not used.

Copy link
Author

@atezs82 atezs82 Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the finding, corrected this in the upcoming patchset.

String serviceUrl,
ConsumerConfigurationData<byte[]> consumerConf,
Authentication authentication) {
this(storageLevel, serviceUrl, new HashMap<>(), consumerConf, authentication);
}

public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
String serviceUrl,
ConsumerConfigurationData<byte[]> conf,
Map<String,Object> clientConfig,
ConsumerConfigurationData<byte[]> consumerConfig,
Authentication authentication) {
super(storageLevel);

checkNotNull(serviceUrl, "serviceUrl must not be null");
checkNotNull(conf, "ConsumerConfigurationData must not be null");
checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set a value.");
checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be null");
checkNotNull(consumerConfig, "ConsumerConfigurationData must not be null");
checkNotNull(clientConfig, "Client configuration map must not be null");
checkArgument(consumerConfig.getTopicNames().size() > 0, "TopicNames must be set a value.");
checkNotNull(consumerConfig.getSubscriptionName(), "SubscriptionName must not be null");

this.serviceUrl = serviceUrl;
this.authentication = authentication;

if (conf.getMessageListener() == null) {
conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
if (consumerConfig.getMessageListener() == null) {
consumerConfig.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
try {
store(msg.getData());
consumer.acknowledgeAsync(msg);
Expand All @@ -80,13 +101,18 @@ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
}
});
}
this.conf = conf;
this.clientConfig = clientConfig;
this.consumerConfig = consumerConfig;
}

public void onStart() {
try {
pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join();
ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication);
if (!clientConfig.isEmpty()) {
builder.loadConf(clientConfig);
}
pulsarClient = builder.build();
consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(consumerConfig).join();
} catch (Exception e) {
LOG.error("Failed to start subscription : {}", e.getMessage());
restart("Restart a consumer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,51 @@ public void testSharedSubscription(Supplier<String> serviceUrl) throws Exception
assertEquals(receveidCounts.size(), 2);
}

@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
dataProvider = "ServiceUrls")
public void testReceiverWhenConsumerConfigurationIsNull(Supplier<String> serviceUrl) {
new SparkStreamingPulsarReceiver(
serviceUrl.get(),
null,
new AuthenticationDisabled());
}

@Test(dataProvider = "ServiceUrls")
public void testOverrideServiceUrlWithClientConfiguration(Supplier<String> serviceUrl) {
Map<String,Object> testClientConfig = new HashMap<>();
testClientConfig.put("serviceUrl",serviceUrl.get());

ConsumerConfigurationData<byte[]> testConsumerConfig = new ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
set.add(TOPIC);
testConsumerConfig.setTopicNames(set);
testConsumerConfig.setSubscriptionName(SUBS);
testConsumerConfig.setSubscriptionType(SubscriptionType.Shared);
testConsumerConfig.setReceiverQueueSize(1);

String deliberatelyWrongServiceUrl = "http://invalid.service.url:1234";

SparkStreamingPulsarReceiver testReceiver = new SparkStreamingPulsarReceiver(
deliberatelyWrongServiceUrl,
testClientConfig,
testConsumerConfig,
new AuthenticationDisabled());

testReceiver.onStart();
waitForTransmission();
testReceiver.onStop();
}

@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
dataProvider = "ServiceUrls")
public void testReceiverWhenClientConfigurationIsNull(Supplier<String> serviceUrl) {
new SparkStreamingPulsarReceiver(serviceUrl.get(), null, new AuthenticationDisabled());
new SparkStreamingPulsarReceiver(
serviceUrl.get(),
null,
null,
new AuthenticationDisabled());
}

private static void waitForTransmission() {
Expand Down