Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [client] fix same producer/consumer use more than one connection per broker #21144

Merged
merged 10 commits into from
Sep 11, 2023
Next Next commit
[fix] [client] fix same producer/consumer use more than one connectio…
…n per broker
poorbarcode committed Sep 7, 2023
commit 91ac75577084726c6922dc9be6b9c367f3f2bae3
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
@@ -61,6 +62,8 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
private final PulsarService pulsarService;
private final PulsarClientImpl pulsarClient;

private final int randomKeyForSelectConnection;

private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits");
private volatile int requestCredits;
@@ -74,6 +77,7 @@ public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTime
this.operationTimeoutInMills = operationTimeoutInMills;
this.timer = timer;
this.requestCredits = Math.max(100, maxConcurrentRequests);
this.randomKeyForSelectConnection = pulsarClient.getCnxPool().genRandomKeyToSelectCon();
}

@Override
@@ -296,7 +300,7 @@ protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
}

public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
return pulsarClient.getConnection(topic);
return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
}

public CompletableFuture<ClientCnx> getClientCnx(String topic) {
@@ -317,7 +321,8 @@ public CompletableFuture<ClientCnx> getClientCnx(String topic) {
}
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return pulsarClient.getConnection(brokerAddress, brokerAddress);
return pulsarClient.getConnection(brokerAddress, brokerAddress,
randomKeyForSelectConnection);
} else {
// Bundle is unloading, lookup topic
return getClientCnxWithLookup(topic);
Original file line number Diff line number Diff line change
@@ -28,13 +28,20 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import io.netty.util.concurrent.Promise;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -80,6 +87,33 @@ public void testSingleIpAddress() throws Exception {
eventLoop.shutdownGracefully();
}

@Test
public void testSelectConnectionForSameProducer() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_");
admin.topics().createNonPartitionedTopic(topicName);
final CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
// 10 connection per broker.
final PulsarClient clientWith10ConPerBroker = PulsarClient.builder().connectionsPerBroker(10)
.serviceUrl(lookupUrl.toString()).build();
ProducerImpl producer = (ProducerImpl) clientWith10ConPerBroker.newProducer().topic(topicName).create();
commandCloseProducer.setProducerId(producer.producerId);
// An error will be reported when the Producer reconnects using a different connection.
// If no error is reported, the same connection was used when reconnecting.
for (int i = 0; i < 20; i++) {
// Trigger reconnect
producer.getClientCnx().handleCloseProducer(commandCloseProducer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help to clarify why multiple call handleCloseProducer is valid?

    protected void handleCloseProducer(CommandCloseProducer closeProducer) {
        log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
        final long producerId = closeProducer.getProducerId();
        ProducerImpl<?> producer = producers.remove(producerId);
        if (producer != null) {
            producer.connectionClosed(this);
        } else {
            log.warn("Producer with id {} not found while closing producer ", producerId);
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the pulsar client chooses a random connection in the original implementation, it might select the correct one. So more times to reconnect, more possible to reproduce the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a risk to throw NPE.

    public void connectionClosed(ClientCnx cnx) {
        lastConnectionClosedTimestamp = System.currentTimeMillis();
        duringConnect.set(false);
        state.client.getCnxPool().releaseConnection(cnx);
        if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
            if (!isValidStateForReconnection()) {
                log.info("[{}] [{}] Ignoring reconnection request (state: {})",
                        state.topic, state.getHandlerName(), state.getState());
                return;
            }
            long delayMs = backoff.next();
            state.setState(State.Connecting);
            log.info("[{}] [{}] Closed connection {} -- Will try again in {} s",
                    state.topic, state.getHandlerName(), cnx.channel(),
                    delayMs / 1000.0);
            state.client.timer().newTimeout(timeout -> {
                log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
                grabCnx();
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Awaitility.await().untilAsserted(() ->
Assert.assertEquals(producer.getState().toString(), HandlerState.State.Ready.toString(),
"The producer uses a different connection when reconnecting")
);
}

// cleanup.
producer.close();
clientWith10ConPerBroker.close();
admin.topics().delete(topicName);
}

@Test
public void testDoubleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
@@ -205,20 +239,23 @@ protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws

ClientCnx cnx = pool.getConnection(
InetSocketAddress.createUnresolved("proxy", 9999),
InetSocketAddress.createUnresolved("proxy", 9999)).get();
InetSocketAddress.createUnresolved("proxy", 9999),
pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);

cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker1", 9999),
InetSocketAddress.createUnresolved("proxy", 9999)).get();
InetSocketAddress.createUnresolved("proxy", 9999),
pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker1:9999");


cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker2", 9999),
InetSocketAddress.createUnresolved("broker2", 9999)).get();
InetSocketAddress.createUnresolved("broker2", 9999),
pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "broker2");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);

Original file line number Diff line number Diff line change
@@ -122,7 +122,7 @@ public CompletableFuture<ClientCnx> getConnection(String topic) {
result.completeExceptionally(new IOException("New connections are rejected."));
return result;
} else {
return super.getConnection(topic);
return super.getConnection(topic, getCnxPool().genRandomKeyToSelectCon());
}
}

Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@ public class ConnectionHandler {
protected volatile long lastConnectionClosedTimestamp = 0L;
private final AtomicBoolean duringConnect = new AtomicBoolean(false);

protected final int randomKeyForSelectConnection;

interface Connection {

/**
@@ -58,6 +60,7 @@ default void connectionFailed(PulsarClientException e) {

protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) {
this.state = state;
this.randomKeyForSelectConnection = state.client.getCnxPool().genRandomKeyToSelectCon();
this.connection = connection;
this.backoff = backoff;
CLIENT_CNX_UPDATER.set(this, null);
@@ -88,11 +91,11 @@ protected void grabCnx() {
if (state.redirectedClusterURI != null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
state.redirectedClusterURI.getPort());
cnxFuture = state.client.getConnection(address, address);
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
cnxFuture = state.client.getConnection(state.topic); //
cnxFuture = state.client.getConnection(state.topic, randomKeyForSelectConnection); //
}
cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
.thenAccept(__ -> duringConnect.set(false))
Original file line number Diff line number Diff line change
@@ -165,8 +165,12 @@ private static AddressResolver<InetSocketAddress> createAddressResolver(ClientCo

private static final Random random = new Random();

public int genRandomKeyToSelectCon() {
return random.nextInt(maxConnectionsPerHosts);
}

public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress address) {
return getConnection(address, address);
return getConnection(address, address, signSafeMod(random.nextInt(), maxConnectionsPerHosts));
}

void closeAllConnections() {
@@ -204,14 +208,12 @@ void closeAllConnections() {
* @return a future that will produce the ClientCnx object
*/
public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress) {
InetSocketAddress physicalAddress, final int randomKey) {
if (maxConnectionsPerHosts == 0) {
// Disable pooling
return createConnection(logicalAddress, physicalAddress, -1);
}

final int randomKey = signSafeMod(random.nextInt(), maxConnectionsPerHosts);

final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
CompletableFuture<ClientCnx> completableFuture = innerPool
Original file line number Diff line number Diff line change
@@ -944,10 +944,20 @@ public void updateTlsTrustStorePathAndPassword(String tlsTrustStorePath, String
conf.setTlsTrustStorePassword(tlsTrustStorePassword);
}

public CompletableFuture<ClientCnx> getConnection(final String topic, int randomKeyForSelectConnection) {
TopicName topicName = TopicName.get(topic);
return lookup.getBroker(topicName)
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), randomKeyForSelectConnection));
}

/**
* Only for test.
*/
@VisibleForTesting
public CompletableFuture<ClientCnx> getConnection(final String topic) {
TopicName topicName = TopicName.get(topic);
return lookup.getBroker(topicName)
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight()));
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}

public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
@@ -956,12 +966,13 @@ public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
"Can't get client connection to HTTP service URL", null));
}
InetSocketAddress address = lookup.resolveHost();
return getConnection(address, address);
return getConnection(address, address, cnxPool.genRandomKeyToSelectCon());
}

public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress logicalAddress,
final InetSocketAddress physicalAddress) {
return cnxPool.getConnection(logicalAddress, physicalAddress);
final InetSocketAddress physicalAddress,
final int randomKeyForSelectConnection) {
return cnxPool.getConnection(logicalAddress, physicalAddress, randomKeyForSelectConnection);
}

/** visible for pulsar-functions. **/
Original file line number Diff line number Diff line change
@@ -72,11 +72,12 @@ static PulsarClientImpl mockClientCnx(PulsarClientImpl clientMock) {
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
when(clientMock.getConnection(any(), any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
when(clientMock.getConnection(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
when(clientMock.getCnxPool()).thenReturn(connectionPoolMock);
when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
when(connectionPoolMock.getConnection(any(), any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
when(connectionPoolMock.getConnection(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(clientCnxMock));
return clientMock;
}

Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
@@ -122,7 +123,7 @@ public void testConsumerIsClosed() throws Exception {
when(cnx.ctx()).thenReturn(ctx);
when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong()))
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
when(pool.getConnection(any(InetSocketAddress.class), any(InetSocketAddress.class)))
when(pool.getConnection(any(InetSocketAddress.class), any(InetSocketAddress.class), anyInt()))
.thenReturn(CompletableFuture.completedFuture(cnx));

ClientConfigurationData conf = new ClientConfigurationData();