diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java index 48dcf259edb1b..625d27329d329 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java @@ -61,6 +61,8 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler { private final PulsarService pulsarService; private final PulsarClientImpl pulsarClient; + private final int randomKeyForSelectConnection; + private static final AtomicIntegerFieldUpdater REQUEST_CREDITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits"); private volatile int requestCredits; @@ -74,6 +76,7 @@ public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTime this.operationTimeoutInMills = operationTimeoutInMills; this.timer = timer; this.requestCredits = Math.max(100, maxConcurrentRequests); + this.randomKeyForSelectConnection = pulsarClient.getCnxPool().genRandomKeyToSelectCon(); } @Override @@ -296,7 +299,7 @@ protected OpRequestSend newObject(Handle handle) { } public CompletableFuture getClientCnxWithLookup(String topic) { - return pulsarClient.getConnection(topic); + return pulsarClient.getConnection(topic, randomKeyForSelectConnection); } public CompletableFuture getClientCnx(String topic) { @@ -317,7 +320,8 @@ public CompletableFuture getClientCnx(String topic) { } InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()); - return pulsarClient.getConnection(brokerAddress, brokerAddress); + return pulsarClient.getConnection(brokerAddress, brokerAddress, + randomKeyForSelectConnection); } else { // Bundle is unloading, lookup topic return getClientCnxWithLookup(topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 294a9b341ec69..f8034c37971cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; @@ -62,8 +63,11 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { final PulsarService pulsar = mock(PulsarService.class); final BrokerService broker = mock(BrokerService.class); final Topic localTopic = mock(Topic.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); final PulsarClientImpl localClient = mock(PulsarClientImpl.class); + when(localClient.getCnxPool()).thenReturn(connectionPool); final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class); + when(remoteClient.getCnxPool()).thenReturn(connectionPool); final ProducerBuilder producerBuilder = mock(ProducerBuilder.class); final ConcurrentOpenHashMap>> topics = new ConcurrentOpenHashMap<>(); when(broker.executor()).thenReturn(eventLoopGroup); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 02b10dd09a271..22b9949b7ee6c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -105,6 +105,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -1706,6 +1707,8 @@ public void testAtomicReplicationRemoval() throws Exception { ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); PulsarClientImpl pulsarClientMock = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClientMock.getCnxPool()).thenReturn(connectionPool); when(pulsarClientMock.newProducer(any())).thenAnswer( invocation -> { ProducerBuilderImpl producerBuilder = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java index af365ed31934f..945a4c7ea2534 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java @@ -20,6 +20,7 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.util.Collections; @@ -42,6 +43,7 @@ import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.TopicName; @@ -76,7 +78,10 @@ public NonStartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration c throw new RuntimeException(e); } setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class)); - setClient(mock(PulsarClientImpl.class)); + PulsarClientImpl mockClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(mockClient.getCnxPool()).thenReturn(connectionPool); + setClient(mockClient); this.namespaceService = mock(NamespaceService.class); try { startNamespaceService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index 2cfc9f46f0ebe..3873d9d37b20b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -51,6 +53,7 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClientException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.NamespaceName; @@ -253,14 +256,21 @@ public void testTransactionBufferMetrics() throws Exception { assertEquals(pending.size(), 1); } + /** + * This is a flaky test. + */ @Test public void testTransactionBufferClientTimeout() throws Exception { PulsarService pulsarService = pulsarServiceList.get(0); - PulsarClient mockClient = mock(PulsarClientImpl.class); + PulsarClientImpl mockClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(mockClient.getCnxPool()).thenReturn(connectionPool); CompletableFuture completableFuture = new CompletableFuture<>(); ClientCnx clientCnx = mock(ClientCnx.class); completableFuture.complete(clientCnx); when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture); + when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture); + when(((PulsarClientImpl)mockClient).getConnection(any(), any(), anyInt())).thenReturn(completableFuture); ChannelHandlerContext cnx = mock(ChannelHandlerContext.class); when(clientCnx.ctx()).thenReturn(cnx); Channel channel = mock(Channel.class); @@ -287,7 +297,9 @@ public PulsarClient answer(InvocationOnMock invocation) throws Throwable { ConcurrentSkipListMap outstandingRequests = (ConcurrentSkipListMap) field.get(transactionBufferHandler); - assertEquals(outstandingRequests.size(), 1); + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(outstandingRequests.size(), 1); + }); Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> { if (outstandingRequests.size() == 0) { @@ -307,11 +319,13 @@ public PulsarClient answer(InvocationOnMock invocation) throws Throwable { @Test public void testTransactionBufferChannelUnActive() throws PulsarServerException { PulsarService pulsarService = pulsarServiceList.get(0); - PulsarClient mockClient = mock(PulsarClientImpl.class); + PulsarClientImpl mockClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(mockClient.getCnxPool()).thenReturn(connectionPool); CompletableFuture completableFuture = new CompletableFuture<>(); ClientCnx clientCnx = mock(ClientCnx.class); completableFuture.complete(clientCnx); - when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture); + when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture); ChannelHandlerContext cnx = mock(ChannelHandlerContext.class); when(clientCnx.ctx()).thenReturn(cnx); Channel channel = mock(Channel.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java index d6ec092c4456f..278cdbac1f09d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java @@ -24,6 +24,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -31,8 +32,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -46,7 +47,9 @@ public class TransactionBufferHandlerImplTest { @Test public void testRequestCredits() throws PulsarServerException { - PulsarClient pulsarClient = mock(PulsarClientImpl.class); + PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); PulsarService pulsarService = mock(PulsarService.class); NamespaceService namespaceService = mock(NamespaceService.class); when(pulsarService.getNamespaceService()).thenReturn(namespaceService); @@ -54,7 +57,10 @@ public void testRequestCredits() throws PulsarServerException { when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))); Optional opData = Optional.empty(); when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData)); - when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))); + when(((PulsarClientImpl)pulsarClient).getConnection(anyString(), anyInt())) + .thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))); + when(((PulsarClientImpl)pulsarClient).getConnection(anyString())) + .thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))); TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000)); doNothing().when(handler).endTxn(any()); doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString()); @@ -75,7 +81,9 @@ public void testRequestCredits() throws PulsarServerException { @Test public void testMinRequestCredits() throws PulsarServerException { - PulsarClient pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); PulsarService pulsarService = mock(PulsarService.class); when(pulsarService.getClient()).thenReturn(pulsarClient); TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 50, 3000)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java index fb564bd5083c1..e7613879d5152 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java @@ -31,9 +31,13 @@ import java.util.function.Supplier; import java.util.stream.IntStream; import io.netty.util.concurrent.Promise; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -80,6 +84,36 @@ public void testSingleIpAddress() throws Exception { eventLoop.shutdownGracefully(); } + @Test + public void testSelectConnectionForSameProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + final CommandCloseProducer commandCloseProducer = new CommandCloseProducer(); + // 10 connection per broker. + final PulsarClient clientWith10ConPerBroker = PulsarClient.builder().connectionsPerBroker(10) + .serviceUrl(lookupUrl.toString()).build(); + ProducerImpl producer = (ProducerImpl) clientWith10ConPerBroker.newProducer().topic(topicName).create(); + commandCloseProducer.setProducerId(producer.producerId); + // An error will be reported when the Producer reconnects using a different connection. + // If no error is reported, the same connection was used when reconnecting. + for (int i = 0; i < 20; i++) { + // Trigger reconnect + ClientCnx cnx = producer.getClientCnx(); + if (cnx != null) { + cnx.handleCloseProducer(commandCloseProducer); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(producer.getState().toString(), HandlerState.State.Ready.toString(), + "The producer uses a different connection when reconnecting") + ); + } + } + + // cleanup. + producer.close(); + clientWith10ConPerBroker.close(); + admin.topics().delete(topicName); + } + @Test public void testDoubleIpAddress() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); @@ -200,14 +234,16 @@ protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws ClientCnx cnx = pool.getConnection( InetSocketAddress.createUnresolved("proxy", 9999), - InetSocketAddress.createUnresolved("proxy", 9999)).get(); + InetSocketAddress.createUnresolved("proxy", 9999), + pool.genRandomKeyToSelectCon()).get(); Assert.assertEquals(cnx.remoteHostName, "proxy"); Assert.assertNull(cnx.proxyToTargetBrokerAddress); cnx.close(); cnx = pool.getConnection( InetSocketAddress.createUnresolved("broker", 9999), - InetSocketAddress.createUnresolved("proxy", 9999)).get(); + InetSocketAddress.createUnresolved("proxy", 9999), + pool.genRandomKeyToSelectCon()).get(); Assert.assertEquals(cnx.remoteHostName, "proxy"); Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999"); cnx.close(); @@ -215,7 +251,8 @@ protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws cnx = pool.getConnection( InetSocketAddress.createUnresolved("broker", 9999), - InetSocketAddress.createUnresolved("broker", 9999)).get(); + InetSocketAddress.createUnresolved("broker", 9999), + pool.genRandomKeyToSelectCon()).get(); Assert.assertEquals(cnx.remoteHostName, "broker"); Assert.assertNull(cnx.proxyToTargetBrokerAddress); cnx.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java index 0ba8511c0ca74..4f7ac6e8de942 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java @@ -122,7 +122,7 @@ public CompletableFuture getConnection(String topic) { result.completeExceptionally(new IOException("New connections are rejected.")); return result; } else { - return super.getConnection(topic); + return super.getConnection(topic, getCnxPool().genRandomKeyToSelectCon()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 5f83ef97d32ca..464f05186fa30 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.compaction; import static org.apache.pulsar.client.impl.RawReaderTest.extractKey; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; @@ -258,7 +261,10 @@ public void testCompactEmptyTopic() throws Exception { public void testPhaseOneLoopTimeConfiguration() { ServiceConfiguration configuration = new ServiceConfiguration(); configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60); - TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class), + PulsarClientImpl mockClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(mockClient.getCnxPool()).thenReturn(connectionPool); + TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, mockClient, Mockito.mock(BookKeeper.class), compactionScheduler); Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 263507dac1dc6..fc7c89c3ce693 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -43,6 +43,7 @@ public class ConnectionHandler { private volatile long epoch = -1L; protected volatile long lastConnectionClosedTimestamp = 0L; private final AtomicBoolean duringConnect = new AtomicBoolean(false); + protected final int randomKeyForSelectConnection; interface Connection { @@ -58,6 +59,7 @@ default void connectionFailed(PulsarClientException e) { protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) { this.state = state; + this.randomKeyForSelectConnection = state.client.getCnxPool().genRandomKeyToSelectCon(); this.connection = connection; this.backoff = backoff; CLIENT_CNX_UPDATER.set(this, null); @@ -88,11 +90,11 @@ protected void grabCnx() { if (state.redirectedClusterURI != null) { InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(), state.redirectedClusterURI.getPort()); - cnxFuture = state.client.getConnection(address, address); + cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection); } else if (state.topic == null) { cnxFuture = state.client.getConnectionToServiceUrl(); } else { - cnxFuture = state.client.getConnection(state.topic); // + cnxFuture = state.client.getConnection(state.topic, randomKeyForSelectConnection); } cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx)) .thenAccept(__ -> duringConnect.set(false)) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 1420d81c688ee..ef3a9249c820a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -165,8 +165,18 @@ private static AddressResolver createAddressResolver(ClientCo private static final Random random = new Random(); + public int genRandomKeyToSelectCon() { + if (maxConnectionsPerHosts == 0) { + return -1; + } + return signSafeMod(random.nextInt(), maxConnectionsPerHosts); + } + public CompletableFuture getConnection(final InetSocketAddress address) { - return getConnection(address, address); + if (maxConnectionsPerHosts == 0) { + return getConnection(address, address, -1); + } + return getConnection(address, address, signSafeMod(random.nextInt(), maxConnectionsPerHosts)); } void closeAllConnections() { @@ -204,14 +214,12 @@ void closeAllConnections() { * @return a future that will produce the ClientCnx object */ public CompletableFuture getConnection(InetSocketAddress logicalAddress, - InetSocketAddress physicalAddress) { + InetSocketAddress physicalAddress, final int randomKey) { if (maxConnectionsPerHosts == 0) { // Disable pooling return createConnection(logicalAddress, physicalAddress, -1); } - final int randomKey = signSafeMod(random.nextInt(), maxConnectionsPerHosts); - final ConcurrentMap> innerPool = pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>()); CompletableFuture completableFuture = innerPool diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 6c749a8cf4354..fdabb5fa8cfa5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -944,10 +944,20 @@ public void updateTlsTrustStorePathAndPassword(String tlsTrustStorePath, String conf.setTlsTrustStorePassword(tlsTrustStorePassword); } + public CompletableFuture getConnection(final String topic, int randomKeyForSelectConnection) { + TopicName topicName = TopicName.get(topic); + return lookup.getBroker(topicName) + .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), randomKeyForSelectConnection)); + } + + /** + * Only for test. + */ + @VisibleForTesting public CompletableFuture getConnection(final String topic) { TopicName topicName = TopicName.get(topic); return lookup.getBroker(topicName) - .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight())); + .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon())); } public CompletableFuture getConnectionToServiceUrl() { @@ -956,12 +966,13 @@ public CompletableFuture getConnectionToServiceUrl() { "Can't get client connection to HTTP service URL", null)); } InetSocketAddress address = lookup.resolveHost(); - return getConnection(address, address); + return getConnection(address, address, cnxPool.genRandomKeyToSelectCon()); } public CompletableFuture getConnection(final InetSocketAddress logicalAddress, - final InetSocketAddress physicalAddress) { - return cnxPool.getConnection(logicalAddress, physicalAddress); + final InetSocketAddress physicalAddress, + final int randomKeyForSelectConnection) { + return cnxPool.getConnection(logicalAddress, physicalAddress, randomKeyForSelectConnection); } /** visible for pulsar-functions. **/ diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 0418a54c772cc..1d1a6f85bfd41 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -65,6 +65,8 @@ public void setup() throws NoSuchFieldException, IllegalAccessException { ConcurrentOpenHashMap.newBuilder().build(); cnx = spy(new ClientCnxTest(new ClientConfigurationData(), eventLoopGroup)); PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); doReturn(client).when(consumer).getClient(); doReturn(cnx).when(consumer).getClientCnx(); doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java index 36ffa30296bb0..63fbb239439bd 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -121,6 +122,8 @@ public void testInitialize() { AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary); Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary); Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration(); @@ -163,6 +166,8 @@ public void testAutoClusterFailoverSwitchWithoutAuthentication() { AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary); Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary); Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration(); @@ -217,6 +222,8 @@ public void testAutoClusterFailoverSwitchWithAuthentication() throws IOException AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary); Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary); Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration(); @@ -270,6 +277,8 @@ public void testAutoClusterFailoverSwitchTlsTrustStore() throws IOException { AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary); Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary); Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java index 4b80e19c256d7..abb195c9830d0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java @@ -105,6 +105,8 @@ public void recoveryAfterOom() { final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); producerConfigurationData.setCompressionType(CompressionType.NONE); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); MemoryLimitController memoryLimitController = mock(MemoryLimitController.class); when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController); try { @@ -148,6 +150,8 @@ public void testMessagesSize() throws Exception { final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); producerConfigurationData.setCompressionType(CompressionType.NONE); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); MemoryLimitController memoryLimitController = mock(MemoryLimitController.class); when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController); try { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java index 4db5dbe877685..ff7d7f12dd452 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java @@ -19,7 +19,9 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import io.netty.channel.ChannelHandlerContext; @@ -72,11 +74,16 @@ static PulsarClientImpl mockClientCnx(PulsarClientImpl clientMock) { .thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class))); when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class)); when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock)); - when(clientMock.getConnection(any(), any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock)); + when(clientMock.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(clientCnxMock)); + when(clientMock.getConnection(anyString(), anyInt())) + .thenReturn(CompletableFuture.completedFuture(clientCnxMock)); + when(clientMock.getConnection(any(), any(), anyInt())) + .thenReturn(CompletableFuture.completedFuture(clientCnxMock)); ConnectionPool connectionPoolMock = mock(ConnectionPool.class); when(clientMock.getCnxPool()).thenReturn(connectionPoolMock); when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock)); - when(connectionPoolMock.getConnection(any(), any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock)); + when(connectionPoolMock.getConnection(any(), any(), anyInt())) + .thenReturn(CompletableFuture.completedFuture(clientCnxMock)); return clientMock; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 8dbd23f9c29c9..3fe136630462f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -76,6 +76,8 @@ public class ConsumerBuilderImplTest { @BeforeMethod(alwaysRun = true) public void setup() { PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); ConsumerConfigurationData consumerConfigurationData = mock(ConsumerConfigurationData.class); when(consumerConfigurationData.getTopicsPattern()).thenReturn(Pattern.compile("\\w+")); when(consumerConfigurationData.getSubscriptionName()).thenReturn("testSubscriptionName"); @@ -104,6 +106,8 @@ public void testConsumerBuilderImpl() throws PulsarClientException { @Test(expectedExceptions = IllegalArgumentException.class) public void testConsumerBuilderImplWhenSchemaIsNull() { PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); ConsumerConfigurationData consumerConfigurationData = mock(ConsumerConfigurationData.class); new ConsumerBuilderImpl(client, consumerConfigurationData, null); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java index 570b139832806..227e0db10b724 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java @@ -31,6 +31,7 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @Test(groups = "broker-impl") public class ControlledClusterFailoverTest { @@ -88,6 +89,8 @@ public void testControlledClusterFailoverSwitch() throws IOException { ControlledClusterFailover controlledClusterFailover = Mockito.spy((ControlledClusterFailover) provider); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); controlledClusterFailover.initialize(pulsarClient); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java index 223881d85a87b..2bd18f69386f1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java @@ -70,6 +70,8 @@ public class PartitionedProducerImplTest { @BeforeMethod(alwaysRun = true) public void setup() { client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); schema = mock(Schema.class); producerInterceptors = mock(ProducerInterceptors.class); producerCreatedFuture = new CompletableFuture<>(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java index bb3e3fc3accf6..b830d375303bb 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java @@ -52,6 +52,8 @@ public class ProducerBuilderImplTest { public void setup() { Producer producer = mock(Producer.class); client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES); when(client.newProducer()).thenReturn(producerBuilderImpl); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java index 32d0eff6e792e..27e2dcb37cee0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java @@ -40,6 +40,8 @@ public void testIncrementNumAcksReceived() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setStatsIntervalSeconds(1); PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); when(client.getConfiguration()).thenReturn(conf); Timer timer = new HashedWheelTimer(); when(client.timer()).thenReturn(timer); @@ -60,6 +62,8 @@ public void testGetStatsAndCancelStatsTimeoutWithoutArriveUpdateInterval() { ClientConfigurationData conf = new ClientConfigurationData(); conf.setStatsIntervalSeconds(60); PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); when(client.getConfiguration()).thenReturn(conf); Timer timer = new HashedWheelTimer(); when(client.timer()).thenReturn(timer); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 54d13538d7867..e0b25db891247 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; @@ -122,7 +123,7 @@ public void testConsumerIsClosed() throws Exception { when(cnx.ctx()).thenReturn(ctx); when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())) .thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class))); - when(pool.getConnection(any(InetSocketAddress.class), any(InetSocketAddress.class))) + when(pool.getConnection(any(InetSocketAddress.class), any(InetSocketAddress.class), anyInt())) .thenReturn(CompletableFuture.completedFuture(cnx)); ClientConfigurationData conf = new ClientConfigurationData(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java index 9959a2038555c..eee8ba4e8f41a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java @@ -52,6 +52,8 @@ public void setup() { Reader reader = mock(Reader.class); when(reader.readNextAsync()).thenReturn(CompletableFuture.allOf()); client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); when(client.newReader(any(Schema.class))) .thenReturn(new ReaderBuilderImpl(client, Schema.BYTES)); when(client.createReaderAsync(any(ReaderConfigurationData.class), any(Schema.class))) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java index 68c886bc7211a..6a866034ddbf8 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java @@ -38,6 +38,8 @@ public class TableViewImplTest { @BeforeClass(alwaysRun = true) public void setup() { client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); when(client.newReader(any(Schema.class))) .thenReturn(new ReaderBuilderImpl(client, Schema.BYTES)); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java index e462bb4d62cb5..1b39448fbe770 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java @@ -28,7 +28,9 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.naming.NamespaceName; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -52,6 +54,9 @@ public class TopicListWatcherTest { public void setup() { listener = mock(TopicsChangedListener.class); client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); + when(connectionPool.genRandomKeyToSelectCon()).thenReturn(0); when(client.getConfiguration()).thenReturn(new ClientConfigurationData()); clientCnxFuture = new CompletableFuture<>(); when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture); @@ -59,6 +64,9 @@ public void setup() { when(client.timer()).thenReturn(timer); String topic = "persistent://tenant/ns/topic\\d+"; when(client.getConnection(topic)).thenReturn(clientCnxFuture); + when(client.getConnection(topic, 0)).thenReturn(clientCnxFuture); + when(client.getConnection(any(), any(), anyInt())).thenReturn(clientCnxFuture); + when(connectionPool.getConnection(any(), any(), anyInt())).thenReturn(clientCnxFuture); watcherFuture = new CompletableFuture<>(); watcher = new TopicListWatcher(listener, client, Pattern.compile(topic), 7, @@ -67,7 +75,7 @@ public void setup() { @Test public void testWatcherGrabsConnection() { - verify(client).getConnection(any()); + verify(client).getConnection(anyString(), anyInt()); } @Test diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java index f6c668703d9db..91ad321048226 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java @@ -45,6 +45,8 @@ public class UnAckedMessageTrackerTest { @Test public void testAddAndRemove() { PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); when(client.timer()).thenReturn(timer); @@ -83,6 +85,8 @@ public void testAddAndRemove() { @Test public void testTrackChunkedMessageId() { PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); when(client.timer()).thenReturn(timer); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java index 8959e67023463..bfd6af37e3ea6 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; @@ -46,6 +47,8 @@ public class MultiVersionSchemaInfoProviderTest { @BeforeMethod public void setup() { PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); when(client.getLookup()).thenReturn(mock(LookupService.class)); schemaProvider = new MultiVersionSchemaInfoProvider( TopicName.get("persistent://public/default/my-topic"), client); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index e0ebb52da7490..90f7df37fa196 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerBase; @@ -99,6 +100,8 @@ public void setup() throws PulsarClientException { producer = mock(Producer.class); client = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.getCnxPool()).thenReturn(connectionPool); when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES)); when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any())) .thenReturn(CompletableFuture.completedFuture(producer)); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index fdac39512cc24..799bad839a451 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; @@ -56,6 +57,7 @@ import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.common.functions.FunctionConfig; @@ -95,6 +97,8 @@ public byte[] serialize(String input) { */ private static PulsarClientImpl getPulsarClient() throws PulsarClientException { PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class); doReturn(consumerBuilder).when(consumerBuilder).topics(anyList()); doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString()); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 91e4c06fe5b49..5d6e4a3dc75e7 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -21,6 +21,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertSame; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; @@ -44,6 +46,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.functions.ConsumerConfig; @@ -105,6 +108,8 @@ public static Object[] getPulsarSourceImpls() { */ private static PulsarClientImpl getPulsarClient() throws PulsarClientException { PulsarClientImpl pulsarClient = Mockito.mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); ConsumerBuilder goodConsumerBuilder = Mockito.mock(ConsumerBuilder.class); ConsumerBuilder badConsumerBuilder = Mockito.mock(ConsumerBuilder.class); Mockito.doReturn(goodConsumerBuilder).when(goodConsumerBuilder) diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java index 8da24fd1b7250..5c10a59bd1388 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -76,7 +77,8 @@ public LeaderServiceTest() { @BeforeMethod public void setup() throws PulsarClientException { mockClient = mock(PulsarClientImpl.class); - + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(mockClient.getCnxPool()).thenReturn(connectionPool); mockConsumer = mock(ConsumerImpl.class); ConsumerBuilder mockConsumerBuilder = mock(ConsumerBuilder.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index e066bb24e6ef0..ac3176b3135e2 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.functions.WorkerInfo; @@ -68,7 +69,8 @@ public MembershipManagerTest() { private static PulsarClient mockPulsarClient() throws PulsarClientException { PulsarClientImpl mockClient = mock(PulsarClientImpl.class); - + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(mockClient.getCnxPool()).thenReturn(connectionPool); ConsumerImpl mockConsumer = mock(ConsumerImpl.class); ConsumerBuilder mockConsumerBuilder = mock(ConsumerBuilder.class);