-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [client] fix same producer/consumer use more than one connection per broker #21144
[fix] [client] fix same producer/consumer use more than one connection per broker #21144
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just fix the isSuccessorTo
method?
public boolean isSuccessorTo(Producer other) {
return Objects.equals(producerName, other.producerName)
&& Objects.equals(topic, other.topic)
&& producerId == other.producerId
&& Objects.equals(cnx, other.cnx) // remove this line
&& other.getEpoch() < epoch;
}
It appears logically incorrect to assume that there can only be one connection used by the same producer.
If we want to fix the issue on the client side, multiple clients need to catch up on the fix.
As you mentioned in the PR's description
Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.
1. In a connection, the second connection waits for the first connection to complete[1]
2. In a topic, the second connection will override the previous one[2]
However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.
Fixed the isSuccessorTo
method will fix the second mechanism to make it can really work for multiple connections.
Some how, it looks like related to this change #12846? |
No, the PR #12846 just used
In our design, we assume that the IO events are sequential. But one producer/consumer can use different connections, making the sequential broken. I think the current solution is better. |
Oh, thanks for the explanation. It hasn't changed the behavior.
This is quite convincing. How about the connection being broken? Both the broker and client will handle the Maybe we should fix both of them? The client always uses the same connection only when the connection is not available. The broker should make sure the same producer with a newer epoch can fence the existing producer with an older epoch. |
#21155 (comment) Here is the answer for my question. The delineation between the client and the broker has become clearer, with the client being responsible for using the same connection as possible, while the broker solely handles the fence issues pertaining to that particular connection. As for fence problems (disconnects) associated with different connections, the broker is given time to clean up old producers without applying the fence mechanism, while the client persists in retrying. |
I pushed a new PR #21155 to improve the method
|
// If no error is reported, the same connection was used when reconnecting. | ||
for (int i = 0; i < 20; i++) { | ||
// Trigger reconnect | ||
producer.getClientCnx().handleCloseProducer(commandCloseProducer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please help to clarify why multiple call handleCloseProducer
is valid?
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
final long producerId = closeProducer.getProducerId();
ProducerImpl<?> producer = producers.remove(producerId);
if (producer != null) {
producer.connectionClosed(this);
} else {
log.warn("Producer with id {} not found while closing producer ", producerId);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the pulsar client chooses a random connection in the original implementation, it might select the correct one. So more times to reconnect, more possible to reproduce the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has a risk to throw NPE.
public void connectionClosed(ClientCnx cnx) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
duringConnect.set(false);
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
state.topic, state.getHandlerName(), state.getState());
return;
}
long delayMs = backoff.next();
state.setState(State.Connecting);
log.info("[{}] [{}] Closed connection {} -- Will try again in {} s",
state.topic, state.getHandlerName(), cnx.channel(),
delayMs / 1000.0);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
…n per broker (#21144) Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct. - In a connection, the second connection waits for the first connection to complete. - In a topic, the second connection will override the previous one. However, if a producer can use different connections to connect to the broker, these two mechanisms will not work. When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.` Modifications: Make the same producer/consumer usage the same connection
…n per broker (#21144) Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct. - In a connection, the second connection waits for the first connection to complete. - In a topic, the second connection will override the previous one. However, if a producer can use different connections to connect to the broker, these two mechanisms will not work. When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.` Modifications: Make the same producer/consumer usage the same connection (cherry picked from commit f2b9a3f)
…n per broker (#21144) Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct. - In a connection, the second connection waits for the first connection to complete. - In a topic, the second connection will override the previous one. However, if a producer can use different connections to connect to the broker, these two mechanisms will not work. When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.` Modifications: Make the same producer/consumer usage the same connection (cherry picked from commit f2b9a3f)
…n per broker (#21144) Motivation: Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct. - In a connection, the second connection waits for the first connection to complete. - In a topic, the second connection will override the previous one. However, if a producer can use different connections to connect to the broker, these two mechanisms will not work. When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection, leading to the error above. You can reproduce this issue by the test `testSelectConnectionForSameProducer.` Modifications: Make the same producer/consumer usage the same connection (cherry picked from commit f2b9a3f)
…ers per connection ### Motivation This is a catch up for apache/pulsar#21144 When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool. https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75 If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed. ### Modifications - Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`. - Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object. ### Verifying this change `ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.
…ers per connection (#337) ### Motivation This is a catch up for apache/pulsar#21144 When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool. https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75 If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed. ### Modifications - Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`. - Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object. ### Verifying this change `ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.
…ers per connection (#337) ### Motivation This is a catch up for apache/pulsar#21144 When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool. https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75 If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed. ### Modifications - Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`. - Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object. ### Verifying this change `ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change. (cherry picked from commit 6f115e7)
Motivation
Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct.
However, if a producer can use different connections to connect to the broker, these two mechanisms will not work.
When the config
connectionsPerBroker
ofPulsarClient
is larger than1
, a producer could use more than one connection[3], leading to the error above. You can reproduce this issue by the testtestSelectConnectionForSameProducer
.[1]
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1398-L1423
[2]
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L983-L999
[3]
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L213-L218
Modifications
Make the same producer/consumer usage the same connection
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x