Skip to content

Commit

Permalink
Fix NPE for void functions (apache#258)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie committed Mar 4, 2018
1 parent ab93a18 commit 2fe4a38
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,22 @@ public void run() {
}
}

// before processing the message, we have a producer connection setup for producing results.
Producer producer = null;
while (null == producer) {
try {
producer = sinkProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
} catch (PulsarClientException e) {
// `ProducerBusy` is thrown when an producer with same name is still connected.
// This can happen when a active consumer is changed for a given source topic partition
// so we need to wait until the old active consumer release the produce connection.
if (!(e instanceof ProducerBusyException)) {
log.error("Failed to get a producer for producing results computed from source topic {}",
msg.getTopicName());
if (null != sinkProducer) {
// before processing the message, we have a producer connection setup for producing results.
Producer producer = null;
while (null == producer) {
try {
producer = sinkProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
} catch (PulsarClientException e) {
// `ProducerBusy` is thrown when an producer with same name is still connected.
// This can happen when a active consumer is changed for a given source topic partition
// so we need to wait until the old active consumer release the produce connection.
if (!(e instanceof ProducerBusyException)) {
log.error("Failed to get a producer for producing results computed from source topic {}",
msg.getTopicName());
}
TimeUnit.MILLISECONDS.sleep(500);
}
TimeUnit.MILLISECONDS.sleep(500);
}
}

Expand Down Expand Up @@ -320,20 +322,24 @@ private void loadJars() throws Exception {
public void becameActive(Consumer consumer, int partitionId) {
// if the instance becomes active for a given topic partition,
// open a producer for the results computed from this topic partition.
try {
this.sinkProducer.getProducer(consumer.getTopic(), partitionId);
} catch (PulsarClientException e) {
// this can be ignored, because producer can be lazily created when accessing it.
log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
consumer.getTopic(), partitionId);
if (null != sinkProducer) {
try {
this.sinkProducer.getProducer(consumer.getTopic(), partitionId);
} catch (PulsarClientException e) {
// this can be ignored, because producer can be lazily created when accessing it.
log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
consumer.getTopic(), partitionId);
}
}
}

@Override
public void becameInactive(Consumer consumer, int partitionId) {
// if I lost the ownership of a partition, close its corresponding topic partition.
// this is to allow the new active consumer be able to produce to the result topic.
this.sinkProducer.closeProducer(consumer.getTopic(), partitionId);
if (null != sinkProducer) {
// if I lost the ownership of a partition, close its corresponding topic partition.
// this is to allow the new active consumer be able to produce to the result topic.
this.sinkProducer.closeProducer(consumer.getTopic(), partitionId);
}
}

private void setupStateTable() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducers;
Expand All @@ -86,6 +87,7 @@
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockObjectFactory;
import org.powermock.reflect.Whitebox;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
Expand All @@ -96,7 +98,7 @@
* Test the processing logic of a {@link JavaInstanceRunnable}.
*/
@Slf4j
@PrepareForTest({ JavaInstanceRunnable.class, StorageClientBuilder.class, MessageBuilder.class })
@PrepareForTest({ JavaInstanceRunnable.class, StorageClientBuilder.class, MessageBuilder.class, Reflections.class })
@PowerMockIgnore({ "javax.management.*", "org.apache.pulsar.common.api.proto.*", "org.apache.logging.log4j.*" })
public class JavaInstanceRunnableProcessTest {

Expand Down Expand Up @@ -132,6 +134,16 @@ public String process(String input, Context context) throws Exception {
}
}

private static class TestVoidFunction implements PulsarFunction<String, Void> {

@Override
public Void process(String input, Context context) throws Exception {
log.info("process input '{}'", input);
voidFunctionQueue.put(input);
return null;
}
}

@Data
private static class ConsumerInstance {
private final Consumer consumer;
Expand Down Expand Up @@ -195,6 +207,7 @@ public synchronized void addSendFuture(CompletableFuture<MessageId> future) {


private static final String TEST_STORAGE_SERVICE_URL = "127.0.0.1:4181";
private static final LinkedBlockingQueue<String> voidFunctionQueue = new LinkedBlockingQueue<>();

private FunctionConfig fnConfig;
private InstanceConfig config;
Expand Down Expand Up @@ -938,4 +951,56 @@ public void testEffectivelyOnceProcessingFailures() throws Exception {
.acknowledgeCumulativeAsync(same(msgs[1]));
}
}

@Test
public void testVoidFunction() throws Exception {
FunctionConfig newFnConfig = FunctionConfig.newBuilder(fnConfig)
.setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
.setClassName(TestVoidFunction.class.getName())
.build();
config.setFunctionConfig(newFnConfig);

@Cleanup("shutdown")
ExecutorService executorService = Executors.newSingleThreadExecutor();

try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
config,
fnCache,
"test-jar-file",
mockClient,
null)) {

executorService.submit(runnable);

Pair<String, String> consumerId = Pair.of(
newFnConfig.getInputs(0),
FunctionConfigUtils.getFullyQualifiedName(newFnConfig));
ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
while (null == consumerInstance) {
TimeUnit.MILLISECONDS.sleep(20);
consumerInstance = mockConsumers.get(consumerId);
}

// once we get consumer id, simulate receiving 10 messages from consumer
for (int i = 0; i < 10; i++) {
Message msg = mock(Message.class);
when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
when(msg.getMessageId())
.thenReturn(new MessageIdImpl(1L, i, 0));
consumerInstance.addMessage(msg);
consumerInstance.getConf().getMessageListener()
.received(consumerInstance.getConsumer(), msg);
}

// wait until all the messages are published
for (int i = 0; i < 10; i++) {
String msg = voidFunctionQueue.take();
log.info("Processed message {}", msg);
assertEquals("message-" + i, msg);
}

// no producer should be initialized
assertTrue(mockProducers.isEmpty());
}
}
}

0 comments on commit 2fe4a38

Please sign in to comment.