Skip to content

Commit

Permalink
[client] Cleanup consumer on multitopic subscribe failure (apache#9419)
Browse files Browse the repository at this point in the history
Currently, when a multi-topic subscribe fails (via a set of topics or a
regex) we can leave consumers connected, as the multitopic consumer
doesn't close any of the topics.

This means we rely on the client to call closeAsync, otherwise, the
consumer is left in partially open state.

This fix changes that, and ensures we call close in the case of an
exception

Co-authored-by: Sijie Guo <sijie@apache.org>
  • Loading branch information
addisonj and sijie authored Feb 12, 2021
1 parent 1318aa1 commit cbe9816
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
})
.exceptionally(ex -> {
log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage());
subscribeFuture.completeExceptionally(ex);
log.warn("[{}] Failed to subscribe topics: {}, closing consumer", topic, ex.getMessage());
closeAsync().whenComplete((res, closeEx) -> {
if (closeEx != null) {
log.error("[{}] Failed to unsubscribe after failed consumer creation: {}", topic, closeEx.getMessage());
}
subscribeFuture.completeExceptionally(ex);
});
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ static <T> CompletableFuture<T> createDelayedCompletedFuture(T result, int delay
return future;
}

static <T> CompletableFuture<T> createExceptionFuture(Throwable ex, int delayMillis) {
CompletableFuture<T> future = new CompletableFuture<>();
SCHEDULER.schedule(() -> future.completeExceptionally(ex), delayMillis, TimeUnit.MILLISECONDS);
return future;
}

public static ExecutorService createMockedExecutor() {
return mock(ExecutorService.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -31,13 +32,19 @@
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;

import static org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
import static org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
import static org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -137,4 +144,32 @@ public void testBatchReceiveAsyncCanBeCancelled() {
assertFalse(consumer.hasPendingBatchReceive());
}

@Test
public void testConsumerCleanupOnSubscribeFailure() throws InterruptedException, TimeoutException, ExecutionException {
ExecutorService listenerExecutor = mock(ExecutorService.class);
ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>();
consumerConfData.setSubscriptionName("subscriptionName");
consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", "c")));
int completionDelayMillis = 10;
Schema<byte[]> schema = Schema.BYTES;
PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture(
new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis));
CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>();
MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
completeFuture, schema, null, true);
// assert that we don't start in closed, then we move to closed and get an exception
// indicating that closeAsync was called
assertEquals(impl.getState(), HandlerState.State.Uninitialized);
try {
completeFuture.get(15, TimeUnit.MILLISECONDS);
} catch (Throwable ex) {
// just ignore the exception
}
assertTrue(completeFuture.isCompletedExceptionally());
assertEquals(impl.getConsumers().size(), 0);
assertEquals(impl.getState(), HandlerState.State.Closed);
verify(clientMock, times(1)).cleanupConsumer(any());
}

}

0 comments on commit cbe9816

Please sign in to comment.