Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.utils.FunctionCommon;

import java.time.Duration;
import java.util.AbstractMap;
Expand All @@ -59,7 +58,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -277,9 +275,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
TopicName.get(topic).getLocalName(),
partitionIndex);
TopicPartition tp = normalizedTopicPartition(topic, partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
Expand All @@ -291,9 +287,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = new TopicPartition(
TopicName.get(topic).getLocalName(),
0);
TopicPartition tp = normalizedTopicPartition(topic, 0);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
Expand Down Expand Up @@ -327,6 +321,15 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
}
}

private TopicPartition normalizedTopicPartition(TopicPartition tp) {
return normalizedTopicPartition(tp.topic(), tp.partition());
}

private TopicPartition normalizedTopicPartition(String topic, int partition) {
String name = TopicName.get(topic).getPartitionedTopicName();
return new TopicPartition(name, partition);
}

@Override
public void assign(Collection<TopicPartition> partitions) {
Set<String> topics = partitions.stream().map(p -> p.topic()).collect(Collectors.toSet());
Expand Down Expand Up @@ -372,7 +375,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {

while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getLocalName();
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
Expand Down Expand Up @@ -504,12 +507,15 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo
}

private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
Preconditions.checkNotNull(offsets);
List<CompletableFuture<Void>> futures = new ArrayList<>();

applyConsumerInterceptorsOnCommit(interceptors, offsets);
offsets.forEach((topicPartition, offsetAndMetadata) -> {
offsets.forEach((tp, offsetAndMetadata) -> {
TopicPartition topicPartition = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);

lastCommittedOffset.put(tp, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
});

Expand Down Expand Up @@ -566,7 +572,8 @@ private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> i
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
TopicPartition topicPartition = normalizedTopicPartition(partition);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}
Expand Down Expand Up @@ -594,12 +601,14 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
lastReceivedOffset.clear();

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
TopicPartition normalizedTp = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.earliest));
unpolledPartitions.add(tp);
}
}

Expand All @@ -617,12 +626,15 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
lastReceivedOffset.clear();

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
TopicPartition normalizedTp = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);

if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.latest));
unpolledPartitions.add(tp);
}
}

Expand Down Expand Up @@ -712,7 +724,8 @@ public Set<TopicPartition> paused() {
@Override
public void pause(Collection<TopicPartition> partitions) {
partitions.forEach(p -> {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
TopicPartition topicPartition = normalizedTopicPartition(p);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c != null) {
c.pause();
}
Expand All @@ -722,7 +735,8 @@ public void pause(Collection<TopicPartition> partitions) {
@Override
public void resume(Collection<TopicPartition> partitions) {
partitions.forEach(p -> {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
TopicPartition topicPartition = normalizedTopicPartition(p);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c != null) {
c.resume();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void testPulsarKafkaProducerWithSerializer(int partitions) throws Excepti
producer.send(message);
}
producer.close();
Thread.sleep(500);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to avoid using sleep in the test, could you please use Awaitility instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it would be nice, but I suggest we merge this to unblock the release + add an issue to fix it later (it will take time to figure out what to wait for specifically, especially given that this test failed only on CI and passed on local runs) + there are other tests that rely on sleep: https://github.com/apache/pulsar-adapters/search?q=Thread.Sleep

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine for keeping Sleep for now.
it is not clear what we have to wait for at the moment


// (2) Consume using simple consumer
PulsarKafkaSimpleConsumer consumer = new PulsarKafkaSimpleConsumer(serviceUrl, 0, 0, 0, "clientId");
Expand All @@ -158,6 +159,7 @@ public void testPulsarKafkaProducerWithSerializer(int partitions) throws Excepti
.build();
FetchResponse fetchResponse = consumer.fetch(fReq);

Thread.sleep(500);
long lastOffset = 0;
MessageId offset = null;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topicName, partition)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -48,7 +49,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
private static final String EXPECTED_MESSAGE = "pulsar-spark test message";

@Test(dataProvider = "ServiceUrls")
public void testReceivedMessage(String serviceUrl) throws Exception {
public void testReceivedMessage(Supplier<String> serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();

Set<String> set = new HashSet<>();
Expand All @@ -68,14 +69,14 @@ public void received(Consumer consumer, Message msg) {
consConf.setMessageListener(msgListener);

SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

receiver.onStart();
waitForTransmission();

PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
producer.send(EXPECTED_MESSAGE.getBytes());

Expand All @@ -85,7 +86,7 @@ public void received(Consumer consumer, Message msg) {
}

@Test(dataProvider = "ServiceUrls")
public void testDefaultSettingsOfReceiver(String serviceUrl) {
public void testDefaultSettingsOfReceiver(Supplier<String> serviceUrl) {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();

Set<String> set = new HashSet<>();
Expand All @@ -94,7 +95,7 @@ public void testDefaultSettingsOfReceiver(String serviceUrl) {
consConf.setSubscriptionName(SUBS);

SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

Expand All @@ -103,7 +104,7 @@ public void testDefaultSettingsOfReceiver(String serviceUrl) {
}

@Test(dataProvider = "ServiceUrls")
public void testSharedSubscription(String serviceUrl) throws Exception {
public void testSharedSubscription(Supplier<String> serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();

Set<String> set = new HashSet<>();
Expand All @@ -120,20 +121,20 @@ public void testSharedSubscription(String serviceUrl) throws Exception {
});

SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

receiver1.onStart();
receiver2.onStart();
waitForTransmission();

PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
for (int i = 0; i < 10; i++) {
producer.send(EXPECTED_MESSAGE.getBytes());
Expand All @@ -149,8 +150,8 @@ public void testSharedSubscription(String serviceUrl) throws Exception {
@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
dataProvider = "ServiceUrls")
public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
new SparkStreamingPulsarReceiver(serviceUrl, null, new AuthenticationDisabled());
public void testReceiverWhenClientConfigurationIsNull(Supplier<String> serviceUrl) {
new SparkStreamingPulsarReceiver(serviceUrl.get(), null, new AuthenticationDisabled());
}

private static void waitForTransmission() {
Expand Down