Skip to content

Commit

Permalink
[fix] [client] fix same producer/consumer use more than one connectio…
Browse files Browse the repository at this point in the history
…n per broker (apache#21144)

Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.

- In a connection, the second connection waits for the first connection to complete.
- In a topic, the second connection will override the previous one.

However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.

When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.`

Modifications: Make the same producer/consumer usage the same connection
(cherry picked from commit f2b9a3f)
  • Loading branch information
poorbarcode committed Sep 14, 2023
1 parent 086c59a commit b510415
Show file tree
Hide file tree
Showing 32 changed files with 207 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
private final PulsarService pulsarService;
private final PulsarClientImpl pulsarClient;

private final int randomKeyForSelectConnection;

private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits");
private volatile int requestCredits;
Expand All @@ -74,6 +76,7 @@ public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTime
this.operationTimeoutInMills = operationTimeoutInMills;
this.timer = timer;
this.requestCredits = Math.max(100, maxConcurrentRequests);
this.randomKeyForSelectConnection = pulsarClient.getCnxPool().genRandomKeyToSelectCon();
}

@Override
Expand Down Expand Up @@ -296,7 +299,7 @@ protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
}

public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
return pulsarClient.getConnection(topic);
return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
}

public CompletableFuture<ClientCnx> getClientCnx(String topic) {
Expand All @@ -317,7 +320,8 @@ public CompletableFuture<ClientCnx> getClientCnx(String topic) {
}
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return pulsarClient.getConnection(brokerAddress, brokerAddress);
return pulsarClient.getConnection(brokerAddress, brokerAddress,
randomKeyForSelectConnection);
} else {
// Bundle is unloading, lookup topic
return getClientCnxWithLookup(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
Expand All @@ -62,8 +63,11 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
final PulsarService pulsar = mock(PulsarService.class);
final BrokerService broker = mock(BrokerService.class);
final Topic localTopic = mock(Topic.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
when(localClient.getCnxPool()).thenReturn(connectionPool);
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
when(remoteClient.getCnxPool()).thenReturn(connectionPool);
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
when(broker.executor()).thenReturn(eventLoopGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand Down Expand Up @@ -1706,6 +1707,8 @@ public void testAtomicReplicationRemoval() throws Exception {
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
PulsarClientImpl pulsarClientMock = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(pulsarClientMock.getCnxPool()).thenReturn(connectionPool);
when(pulsarClientMock.newProducer(any())).thenAnswer(
invocation -> {
ProducerBuilderImpl producerBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Collections;
Expand All @@ -42,6 +43,7 @@
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -76,7 +78,10 @@ public NonStartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration c
throw new RuntimeException(e);
}
setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class));
setClient(mock(PulsarClientImpl.class));
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
setClient(mockClient);
this.namespaceService = mock(NamespaceService.class);
try {
startNamespaceService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.transaction.buffer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -51,6 +53,7 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -253,14 +256,21 @@ public void testTransactionBufferMetrics() throws Exception {
assertEquals(pending.size(), 1);
}

/**
* This is a flaky test.
*/
@Test
public void testTransactionBufferClientTimeout() throws Exception {
PulsarService pulsarService = pulsarServiceList.get(0);
PulsarClient mockClient = mock(PulsarClientImpl.class);
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture);
when(((PulsarClientImpl)mockClient).getConnection(any(), any(), anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
Expand All @@ -287,7 +297,9 @@ public PulsarClient answer(InvocationOnMock invocation) throws Throwable {
ConcurrentSkipListMap<Long, Object> outstandingRequests =
(ConcurrentSkipListMap<Long, Object>) field.get(transactionBufferHandler);

assertEquals(outstandingRequests.size(), 1);
Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals(outstandingRequests.size(), 1);
});

Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
if (outstandingRequests.size() == 0) {
Expand All @@ -307,11 +319,13 @@ public PulsarClient answer(InvocationOnMock invocation) throws Throwable {
@Test
public void testTransactionBufferChannelUnActive() throws PulsarServerException {
PulsarService pulsarService = pulsarServiceList.get(0);
PulsarClient mockClient = mock(PulsarClientImpl.class);
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand All @@ -46,15 +47,20 @@ public class TransactionBufferHandlerImplTest {

@Test
public void testRequestCredits() throws PulsarServerException {
PulsarClient pulsarClient = mock(PulsarClientImpl.class);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
PulsarService pulsarService = mock(PulsarService.class);
NamespaceService namespaceService = mock(NamespaceService.class);
when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
when(pulsarService.getClient()).thenReturn(pulsarClient);
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
Optional<NamespaceEphemeralData> opData = Optional.empty();
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
when(((PulsarClientImpl)pulsarClient).getConnection(anyString(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
Expand All @@ -75,7 +81,9 @@ public void testRequestCredits() throws PulsarServerException {

@Test
public void testMinRequestCredits() throws PulsarServerException {
PulsarClient pulsarClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
PulsarService pulsarService = mock(PulsarService.class);
when(pulsarService.getClient()).thenReturn(pulsarClient);
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;
import io.netty.util.concurrent.Promise;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -80,6 +84,36 @@ public void testSingleIpAddress() throws Exception {
eventLoop.shutdownGracefully();
}

@Test
public void testSelectConnectionForSameProducer() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_");
admin.topics().createNonPartitionedTopic(topicName);
final CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
// 10 connection per broker.
final PulsarClient clientWith10ConPerBroker = PulsarClient.builder().connectionsPerBroker(10)
.serviceUrl(lookupUrl.toString()).build();
ProducerImpl producer = (ProducerImpl) clientWith10ConPerBroker.newProducer().topic(topicName).create();
commandCloseProducer.setProducerId(producer.producerId);
// An error will be reported when the Producer reconnects using a different connection.
// If no error is reported, the same connection was used when reconnecting.
for (int i = 0; i < 20; i++) {
// Trigger reconnect
ClientCnx cnx = producer.getClientCnx();
if (cnx != null) {
cnx.handleCloseProducer(commandCloseProducer);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(producer.getState().toString(), HandlerState.State.Ready.toString(),
"The producer uses a different connection when reconnecting")
);
}
}

// cleanup.
producer.close();
clientWith10ConPerBroker.close();
admin.topics().delete(topicName);
}

@Test
public void testDoubleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
Expand Down Expand Up @@ -200,22 +234,25 @@ protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws

ClientCnx cnx = pool.getConnection(
InetSocketAddress.createUnresolved("proxy", 9999),
InetSocketAddress.createUnresolved("proxy", 9999)).get();
InetSocketAddress.createUnresolved("proxy", 9999),
pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
cnx.close();

cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker", 9999),
InetSocketAddress.createUnresolved("proxy", 9999)).get();
InetSocketAddress.createUnresolved("proxy", 9999),
pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999");
cnx.close();


cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker", 9999),
InetSocketAddress.createUnresolved("broker", 9999)).get();
InetSocketAddress.createUnresolved("broker", 9999),
pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "broker");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
cnx.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public CompletableFuture<ClientCnx> getConnection(String topic) {
result.completeExceptionally(new IOException("New connections are rejected."));
return result;
} else {
return super.getConnection(topic);
return super.getConnection(topic, getCnxPool().genRandomKeyToSelectCon());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.compaction;

import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -48,6 +50,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -258,7 +261,10 @@ public void testCompactEmptyTopic() throws Exception {
public void testPhaseOneLoopTimeConfiguration() {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, mockClient,
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ConnectionHandler {
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
private final AtomicBoolean duringConnect = new AtomicBoolean(false);
protected final int randomKeyForSelectConnection;

interface Connection {

Expand All @@ -58,6 +59,7 @@ default void connectionFailed(PulsarClientException e) {

protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) {
this.state = state;
this.randomKeyForSelectConnection = state.client.getCnxPool().genRandomKeyToSelectCon();
this.connection = connection;
this.backoff = backoff;
CLIENT_CNX_UPDATER.set(this, null);
Expand Down Expand Up @@ -88,11 +90,11 @@ protected void grabCnx() {
if (state.redirectedClusterURI != null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
state.redirectedClusterURI.getPort());
cnxFuture = state.client.getConnection(address, address);
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
cnxFuture = state.client.getConnection(state.topic); //
cnxFuture = state.client.getConnection(state.topic, randomKeyForSelectConnection);
}
cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
.thenAccept(__ -> duringConnect.set(false))
Expand Down
Loading

0 comments on commit b510415

Please sign in to comment.