Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [client] fix same producer/consumer use more than one connection per broker #21144

Merged
merged 10 commits into from
Sep 11, 2023

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Sep 7, 2023

Motivation

2023-09-07 17:02:24.024 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ClientCnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] Received error from server: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic 
2023-09-07 17:02:24.024 [pulsar-client-io-1-3] [ERROR] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619844271, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} 
2023-09-07 17:02:24.024 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Could not get connection to broker: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619844271, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} -- Will try again in 1.589 s 
2023-09-07 17:02:26.026 [pulsar-timer-29-1] [INFO] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Reconnecting after connection was closed 
2023-09-07 17:02:26.026 [pulsar-client-io-1-1] [INFO] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Creating producer on cnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] 
2023-09-07 17:02:26.026 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ClientCnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] Received error from server: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic 
2023-09-07 17:02:26.026 [pulsar-client-io-1-3] [ERROR] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619844879, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} 
2023-09-07 17:02:26.026 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Could not get connection to broker: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619844879, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} -- Will try again in 2.967 s 
2023-09-07 17:02:29.029 [pulsar-timer-29-1] [INFO] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Reconnecting after connection was closed 
2023-09-07 17:02:29.029 [pulsar-client-io-1-1] [INFO] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Creating producer on cnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] 
2023-09-07 17:02:29.029 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ClientCnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] Received error from server: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic 
2023-09-07 17:02:29.029 [pulsar-client-io-1-3] [ERROR] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619845297, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} 
2023-09-07 17:02:29.029 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Could not get connection to broker: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619845297, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} -- Will try again in 6.166 s 
2023-09-07 17:02:35.035 [pulsar-timer-29-1] [INFO] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Reconnecting after connection was closed 
2023-09-07 17:02:35.035 [pulsar-client-io-1-1] [INFO] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Creating producer on cnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] 
2023-09-07 17:02:35.035 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ClientCnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] Received error from server: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic 
2023-09-07 17:02:35.035 [pulsar-client-io-1-3] [ERROR] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619845853, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} 
2023-09-07 17:02:35.035 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Could not get connection to broker: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619845853, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} -- Will try again in 12.561 s 
2023-09-07 17:02:48.048 [pulsar-timer-29-1] [INFO] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Reconnecting after connection was closed 
2023-09-07 17:02:48.048 [pulsar-client-io-1-1] [INFO] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Creating producer on cnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] 
2023-09-07 17:02:48.048 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ClientCnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] Received error from server: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic 
2023-09-07 17:02:48.048 [pulsar-client-io-1-3] [ERROR] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619846487, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} 
2023-09-07 17:02:48.048 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Could not get connection to broker: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619846487, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} -- Will try again in 4.927 s 
2023-09-07 17:02:53.053 [pulsar-timer-29-1] [INFO] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Reconnecting after connection was closed 
2023-09-07 17:02:53.053 [pulsar-client-io-1-1] [INFO] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Creating producer on cnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] 
2023-09-07 17:02:53.053 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ClientCnx [id: 0x41a97e71, L:/127.0.0.1:42574 - R:broker-1/127.0.0.1:6651] Received error from server: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic 
2023-09-07 17:02:53.053 [pulsar-client-io-1-3] [ERROR] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619846876, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} 
2023-09-07 17:02:53.053 [pulsar-client-io-1-3] [WARN] org.apache.pulsar.client.impl.ConnectionHandler [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Could not get connection to broker: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'testProducer-b1fc76' is already connected to topic","reqId":2397430458619846876, "remote":"broker-1/127.0.0.1:6651", "local":"/127.0.0.1:42574"} -- Will try again in 47.925 s
2023-09-07 17:02:56.056 [pulsar-timer-29-1] [INFO] org.apache.pulsar.client.impl.ProducerImpl [persistent://public/default/tp1-partition-2] [testProducer-b1fc76] Message send timed out. Failing 1 messages
org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer testProducer-b1fc76 can not send message to the topic persistent://public/default/tp1-partition-2 within given timeout : createdAt 30.003 seconds ago, firstSentAt 1.3327835314E7 seconds ago, lastSentAt 1.3327835314E7 seconds ago, retryCount 0

Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.

  • In a connection, the second connection waits for the first connection to complete[1]
  • In a topic, the second connection will override the previous one[2]

However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.

When the config connectionsPerBroker of PulsarClient is larger than 1, a producer could use more than one connection[3], leading to the error above. You can reproduce this issue by the test testSelectConnectionForSameProducer.

[1]
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1398-L1423

CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);

if (existingProducerFuture != null) {
    if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
        Producer producer = existingProducerFuture.getNow(null);
        log.info("[{}] Producer with the same id is already created:"
                + " producerId={}, producer={}", remoteAddress, producerId, producer);
        commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
                producer.getSchemaVersion());
        return null;
    } else {
        // There was an early request to create a producer with same producerId.
        // This can happen when client timeout is lower than the broker timeouts.
        // We need to wait until the previous producer creation request
        // either complete or fails.
        ServerError error = null;
        if (!existingProducerFuture.isDone()) {
            error = ServerError.ServiceNotReady;
        } else {
            error = getErrorCode(existingProducerFuture);
            // remove producer with producerId as it's already completed with exception
            producers.remove(producerId, existingProducerFuture);
        }
        log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}",
                remoteAddress, topicName, producerId);
        commandSender.sendErrorResponse(requestId, error, "Producer is already present on the connection");
        return null;
    }
}

[2]
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L983-L999

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
        throws BrokerServiceException {
    if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
            && !isUserProvidedProducerName(newProducer)) {
        oldProducer.close(false);
        if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
            // Met concurrent update, throw exception here so that client can try reconnect later.
            throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
                    + "' replace concurrency error");
        } else {
            handleProducerRemoved(oldProducer);
        }
    } else {
        throw new BrokerServiceException.NamingException(
                "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
    }
}

[3]
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L213-L218

final int randomKey = signSafeMod(random.nextInt(), maxConnectionsPerHosts);

final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
        pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
CompletableFuture<ClientCnx> completableFuture = innerPool
        .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));

Modifications

Make the same producer/consumer usage the same connection

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode self-assigned this Sep 7, 2023
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 7, 2023
@poorbarcode poorbarcode added this to the 3.2.0 milestone Sep 7, 2023
@codelipenghui codelipenghui added the category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost label Sep 8, 2023
@Technoboy- Technoboy- closed this Sep 8, 2023
@Technoboy- Technoboy- reopened this Sep 8, 2023
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just fix the isSuccessorTo method?

    public boolean isSuccessorTo(Producer other) {
        return Objects.equals(producerName, other.producerName)
                && Objects.equals(topic, other.topic)
                && producerId == other.producerId
                && Objects.equals(cnx, other.cnx) // remove this line
                && other.getEpoch() < epoch;
    }

It appears logically incorrect to assume that there can only be one connection used by the same producer.

If we want to fix the issue on the client side, multiple clients need to catch up on the fix.

As you mentioned in the PR's description

Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.

1. In a connection, the second connection waits for the first connection to complete[1]
2. In a topic, the second connection will override the previous one[2]

However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.

Fixed the isSuccessorTo method will fix the second mechanism to make it can really work for multiple connections.

@codelipenghui
Copy link
Contributor

Some how, it looks like related to this change #12846?

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Sep 8, 2023

Some how, it looks like related to this change #12846?

No, the PR #12846 just used isSuccessorTo instead of hashcode & equals.

Why not just fix the isSuccessorTo method?

In our design, we assume that the IO events are sequential. But one producer/consumer can use different connections, making the sequential broken. I think the current solution is better.

@codelipenghui
Copy link
Contributor

codelipenghui commented Sep 11, 2023

@poorbarcode

No, the PR #12846 just used isSuccessorTo instead of hashcode & equals.

Oh, thanks for the explanation. It hasn't changed the behavior.

In our design, we assume that the IO events are sequential. But one producer/consumer can use different connections, making the sequential broken. I think the current solution is better.

This is quite convincing.

How about the connection being broken? Both the broker and client will handle the channelInactive event. However, the order of completion remains unknown to us. If the broker hasn't done the cleanup job from the broken connection, but the client tries to re-create the producer with a new connection. Will the client still get a producer already connected error?

Maybe we should fix both of them? The client always uses the same connection only when the connection is not available. The broker should make sure the same producer with a newer epoch can fence the existing producer with an older epoch.

@codelipenghui
Copy link
Contributor

#21155 (comment) Here is the answer for my question.

The delineation between the client and the broker has become clearer, with the client being responsible for using the same connection as possible, while the broker solely handles the fence issues pertaining to that particular connection. As for fence problems (disconnects) associated with different connections, the broker is given time to clean up old producers without applying the fence mechanism, while the client persists in retrying.

@poorbarcode
Copy link
Contributor Author

@codelipenghui

How about the connection being broken? Both the broker and client will handle the channelInactive event. However, the order of completion remains unknown to us. If the broker hasn't done the cleanup job from the broken connection, but the client tries to re-create the producer with a new connection. Will the client still get a producer already connected error?
Maybe we should fix both of them? The client always uses the same connection only when the connection is not available. The broker should make sure the same producer with a newer epoch can fence the existing producer with an older epoch.

I pushed a new PR #21155 to improve the method Producer.isSuccessorTo. It does these two improvements:

  • If a producer with the same name tries to use a new connection: let the client retry and async checks if the old connection is available(The producers related to the unavailable connection will be automatically cleaned up).
  • Since multiple producers created by the same client will have different producer-id, the same producer will use the same connection. We can just use connection + producer-id to check whether the new producer can override the old one.

// If no error is reported, the same connection was used when reconnecting.
for (int i = 0; i < 20; i++) {
// Trigger reconnect
producer.getClientCnx().handleCloseProducer(commandCloseProducer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help to clarify why multiple call handleCloseProducer is valid?

    protected void handleCloseProducer(CommandCloseProducer closeProducer) {
        log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
        final long producerId = closeProducer.getProducerId();
        ProducerImpl<?> producer = producers.remove(producerId);
        if (producer != null) {
            producer.connectionClosed(this);
        } else {
            log.warn("Producer with id {} not found while closing producer ", producerId);
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the pulsar client chooses a random connection in the original implementation, it might select the correct one. So more times to reconnect, more possible to reproduce the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a risk to throw NPE.

    public void connectionClosed(ClientCnx cnx) {
        lastConnectionClosedTimestamp = System.currentTimeMillis();
        duringConnect.set(false);
        state.client.getCnxPool().releaseConnection(cnx);
        if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
            if (!isValidStateForReconnection()) {
                log.info("[{}] [{}] Ignoring reconnection request (state: {})",
                        state.topic, state.getHandlerName(), state.getState());
                return;
            }
            long delayMs = backoff.next();
            state.setState(State.Connecting);
            log.info("[{}] [{}] Closed connection {} -- Will try again in {} s",
                    state.topic, state.getHandlerName(), cnx.channel(),
                    delayMs / 1000.0);
            state.client.timer().newTimeout(timeout -> {
                log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
                grabCnx();
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@poorbarcode poorbarcode merged commit f2b9a3f into apache:master Sep 11, 2023
Technoboy- pushed a commit that referenced this pull request Sep 12, 2023
…n per broker (#21144)

Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.

- In a connection, the second connection waits for the first connection to complete.
- In a topic, the second connection will override the previous one.

However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.

When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.`

Modifications: Make the same producer/consumer usage the same connection
poorbarcode added a commit that referenced this pull request Sep 14, 2023
…n per broker (#21144)

Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.

- In a connection, the second connection waits for the first connection to complete.
- In a topic, the second connection will override the previous one.

However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.

When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.`

Modifications: Make the same producer/consumer usage the same connection
(cherry picked from commit f2b9a3f)
poorbarcode added a commit that referenced this pull request Sep 14, 2023
…n per broker (#21144)

Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.

- In a connection, the second connection waits for the first connection to complete.
- In a topic, the second connection will override the previous one.

However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.

When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.`

Modifications: Make the same producer/consumer usage the same connection
(cherry picked from commit f2b9a3f)
poorbarcode added a commit that referenced this pull request Sep 14, 2023
…n per broker (#21144)

Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.

- In a connection, the second connection waits for the first connection to complete.
- In a topic, the second connection will override the previous one.

However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.

When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.`

Modifications: Make the same producer/consumer usage the same connection
(cherry picked from commit f2b9a3f)
liangyepianzhou added a commit that referenced this pull request Sep 18, 2023
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this pull request Nov 6, 2023
…ers per connection

### Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool.

https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75

If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a  `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed.

### Modifications

- Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`.
- Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object.

### Verifying this change

`ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.
BewareMyPower added a commit to apache/pulsar-client-cpp that referenced this pull request Nov 7, 2023
…ers per connection (#337)

### Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool.

https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75

If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a  `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed.

### Modifications

- Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`.
- Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object.

### Verifying this change

`ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.
BewareMyPower added a commit to apache/pulsar-client-cpp that referenced this pull request Nov 7, 2023
…ers per connection (#337)

### Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool.

https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75

If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a  `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed.

### Modifications

- Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`.
- Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object.

### Verifying this change

`ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.

(cherry picked from commit 6f115e7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost cherry-picked/branch-2.10 cherry-picked/branch-2.11 cherry-picked/branch-3.0 cherry-picked/branch-3.1 doc-not-needed Your PR changes do not impact docs ready-to-test release/2.10.6 release/2.11.3 release/3.0.2 release/3.1.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants