Skip to content

Commit ebae92e

Browse files
authored
Delay the timing of setting reconnectionPending to false to avoid double attempt at reconnecting (#328)
Related Issue: #235 ### Motivation A potential double scheduling of reconnection due to a broker shutdown was observed. The reconnect can be scheduled with either of the following codes [ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L1209](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L1209) or [ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ClientConnection.cc#L1350](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ClientConnection.cc#L1350) -> [ https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L121](https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L121) If a second reconnection request is received during the first reconnection attempt, it triggers additional reconnection attempts. If the second reconnection is successful, the consumer is removed from `cnx`: [ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L285](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L285) -> [ https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L63](https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L63) --> [ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L217](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L217) The problem is that the consumer will no longer be able to manage events coming from the broker. To cope with this issue, a new flag `reconnectionPending_` has been introduced via #310 . However, while the above change reduces the likelihood of the problem occurring, it doesn't eliminate the problem entirely. In fact, the double reconnects have been observed even after #310(I tried with b35ae1a): ``` # Consumer is connected to broker1, but broker1 shutdown closes Consumer and reconnection is scheduled. ... 2023-09-26 15:42:05.736 INFO [140591970158336] ConsumerImpl:1207 | Broker notification of Closed consumer: 5 2023-09-26 15:42:05.736 INFO [140591970158336] HandlerBase:147 | [0x7fde18046510, dummy_24, 5] Schedule reconnection in 0.1 s ... # Consumer attempts to connect to broker1, but fails, and a reconnection is scheduled again. ... 2023-09-26 15:42:05.836 INFO [140591970158336] HandlerBase:80 | [0x7fde18046510, dummy_24, 5] Getting connection from pool 2023-09-26 15:42:05.837 WARN [140591970158336] ClientConnection:1741 | [<host(client)>:55304 -> <host(broker1)>:<prot>] Received error response from server: Retryable (Namespace is being unloaded, cannot add topic persistent://shustsud-test2/test/partitioned-topic-partition-5) -- req_id: 16 2023-09-26 15:42:05.837 WARN [140591970158336] ConsumerImpl:317 | [0x7fde18046510, dummy_24, 5] Failed to reconnect consumer: Retryable 2023-09-26 15:42:05.837 INFO [140591970158336] HandlerBase:147 | [0x7fde18046510, dummy_24, 5] Schedule reconnection in 0.194 s ... # During the connection attempt, the connection to broker1 is closed and further reconnection is scheduled. # After that, two subscribe requests are sent to broker2. 2023-09-26 15:42:06.034 INFO [140591970158336] HandlerBase:80 | [0x7fde18046510, dummy_24, 5] Getting connection from pool ... 2023-09-26 15:42:06.515 ERROR [140591970158336] ClientConnection:1330 | [<host(client)>:55304 -> <host(broker1)>:<prot>] Connection closed with ConnectError 2023-09-26 15:42:06.515 INFO [140591970158336] ConnectionPool:122 | Remove connection for pulsar+ssl://<host(broker1)>:<prot> 2023-09-26 15:42:06.515 INFO [140591970158336] HandlerBase:147 | [0x7fde18046510, dummy_24, 5] Schedule reconnection in 0.392 s ... 2023-09-26 15:42:06.907 INFO [140591970158336] HandlerBase:80 | [0x7fde18046510, dummy_24, 5] Getting connection from pool ... 2023-09-26 15:42:06.912 INFO [140591970158336] ConsumerImpl:282 | [0x7fde18046510, dummy_24, 5] Created consumer on broker [<host(client)>:54582 -> <host(broker2)>:<prot>] ... 2023-09-26 15:42:07.103 INFO [140591970158336] ConsumerImpl:282 | [0x7fde18046510, dummy_24, 5] Created consumer on broker [<host(client)>:54582 -> <host(broker2)>:<prot>] ... ``` To completely eliminate the possibility of the double reconnects, I suggest adjusting the timing of when reconnectionPending_ is set to false. Ideally, this should be done after the handleCreateConsumer method or the handleCreateProducer method has been completed. ### Modifications The timing for setting `reconnectionPending_` to false has been changed.
1 parent 33085eb commit ebae92e

File tree

8 files changed

+99
-44
lines changed

8 files changed

+99
-44
lines changed

lib/ConsumerImpl.cc

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,14 @@ void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) {
218218
interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), messageIds);
219219
}
220220

221-
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
221+
Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
222+
// Do not use bool, only Result.
223+
Promise<Result, bool> promise;
224+
222225
if (state_ == Closed) {
223226
LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
224-
return;
227+
promise.setFailed(ResultAlreadyClosed);
228+
return promise.getFuture();
225229
}
226230

227231
// Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
@@ -249,9 +253,20 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
249253
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
250254
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
251255
config_.getKeySharedPolicy(), config_.getPriorityLevel());
256+
257+
// Keep a reference to ensure object is kept alive.
258+
auto self = get_shared_this_ptr();
252259
cnx->sendRequestWithId(cmd, requestId)
253-
.addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx,
254-
std::placeholders::_1));
260+
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
261+
Result handleResult = handleCreateConsumer(cnx, result);
262+
if (handleResult == ResultOk) {
263+
promise.setSuccess();
264+
} else {
265+
promise.setFailed(handleResult);
266+
}
267+
});
268+
269+
return promise.getFuture();
255270
}
256271

257272
void ConsumerImpl::connectionFailed(Result result) {
@@ -271,7 +286,9 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n
271286
}
272287
}
273288

274-
void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
289+
Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
290+
Result handleResult = ResultOk;
291+
275292
static bool firstTime = true;
276293
if (result == ResultOk) {
277294
if (firstTime) {
@@ -313,20 +330,21 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
313330
if (consumerCreatedPromise_.isComplete()) {
314331
// Consumer had already been initially created, we need to retry connecting in any case
315332
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
316-
scheduleReconnection();
333+
handleResult = ResultRetryable;
317334
} else {
318335
// Consumer was not yet created, retry to connect to broker if it's possible
319-
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
320-
if (isResultRetryable(result)) {
321-
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
322-
scheduleReconnection();
336+
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
337+
if (isResultRetryable(handleResult)) {
338+
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(handleResult));
323339
} else {
324-
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
325-
consumerCreatedPromise_.setFailed(result);
340+
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(handleResult));
341+
consumerCreatedPromise_.setFailed(handleResult);
326342
state_ = Failed;
327343
}
328344
}
329345
}
346+
347+
return handleResult;
330348
}
331349

332350
void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {

lib/ConsumerImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,14 @@ class ConsumerImpl : public ConsumerImplBase {
142142

143143
protected:
144144
// overrided methods from HandlerBase
145-
void connectionOpened(const ClientConnectionPtr& cnx) override;
145+
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override;
146146
void connectionFailed(Result result) override;
147147

148148
// impl methods from ConsumerImpl base
149149
bool hasEnoughMessagesForBatchReceive() const override;
150150
void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
151151

152-
void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
152+
Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
153153

154154
void internalListener();
155155

lib/ConsumerImplBase.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,12 @@ class ConsumerImplBase : public HandlerBase {
8484

8585
protected:
8686
// overrided methods from HandlerBase
87-
void connectionOpened(const ClientConnectionPtr& cnx) override {}
87+
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override {
88+
// Do not use bool, only Result.
89+
Promise<Result, bool> promise;
90+
promise.setSuccess();
91+
return promise.getFuture();
92+
}
8893
void connectionFailed(Result result) override {}
8994

9095
// consumer impl generic method.

lib/Future.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ class Promise {
138138

139139
bool setFailed(Result result) const { return state_->complete(result, {}); }
140140

141+
bool setSuccess() const { return setValue({}); }
142+
141143
bool isComplete() const { return state_->completed(); }
142144

143145
Future<Result, Type> getFuture() const { return Future<Result, Type>{state_}; }

lib/HandlerBase.cc

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,15 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
6767
}
6868

6969
void HandlerBase::grabCnx() {
70-
if (getCnx().lock()) {
71-
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
70+
bool expectedState = false;
71+
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
72+
LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
7273
return;
7374
}
7475

75-
bool expectedState = false;
76-
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
77-
LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
76+
if (getCnx().lock()) {
77+
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
78+
reconnectionPending_ = false;
7879
return;
7980
}
8081

@@ -83,17 +84,23 @@ void HandlerBase::grabCnx() {
8384
if (!client) {
8485
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
8586
connectionFailed(ResultConnectError);
87+
reconnectionPending_ = false;
8688
return;
8789
}
8890
auto self = shared_from_this();
8991
client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
90-
reconnectionPending_ = false;
91-
9292
if (result == ResultOk) {
9393
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
94-
connectionOpened(cnx);
94+
connectionOpened(cnx).addListener([this, self](Result result, bool) {
95+
// Do not use bool, only Result.
96+
reconnectionPending_ = false;
97+
if (isResultRetryable(result)) {
98+
scheduleReconnection();
99+
}
100+
});
95101
} else {
96102
connectionFailed(result);
103+
reconnectionPending_ = false;
97104
scheduleReconnection();
98105
}
99106
});

lib/HandlerBase.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <string>
2727

2828
#include "Backoff.h"
29+
#include "Future.h"
2930

3031
namespace pulsar {
3132

@@ -74,10 +75,13 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
7475
virtual void beforeConnectionChange(ClientConnection& cnx) = 0;
7576

7677
/*
77-
* connectionOpened will be implemented by derived class to receive notification
78+
* connectionOpened will be implemented by derived class to receive notification.
79+
*
80+
* @return ResultOk if the connection is successfully completed.
81+
* @return ResultError if there was a failure. ResultRetryable if reconnection is needed.
82+
* @return Do not use bool, only Result.
7883
*/
79-
80-
virtual void connectionOpened(const ClientConnectionPtr& connection) = 0;
84+
virtual Future<Result, bool> connectionOpened(const ClientConnectionPtr& connection) = 0;
8185

8286
virtual void connectionFailed(Result result) = 0;
8387

lib/ProducerImpl.cc

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,14 @@ void ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
135135
connection.removeProducer(producerId_);
136136
}
137137

138-
void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
138+
Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
139+
// Do not use bool, only Result.
140+
Promise<Result, bool> promise;
141+
139142
if (state_ == Closed) {
140143
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
141-
return;
144+
promise.setFailed(ResultAlreadyClosed);
145+
return promise.getFuture();
142146
}
143147

144148
ClientImplPtr client = client_.lock();
@@ -149,9 +153,20 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
149153
userProvidedProducerName_, conf_.isEncryptionEnabled(),
150154
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
151155
topicEpoch, conf_.impl_->initialSubscriptionName);
156+
157+
// Keep a reference to ensure object is kept alive.
158+
auto self = shared_from_this();
152159
cnx->sendRequestWithId(cmd, requestId)
153-
.addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
154-
std::placeholders::_1, std::placeholders::_2));
160+
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
161+
Result handleResult = handleCreateProducer(cnx, result, responseData);
162+
if (handleResult == ResultOk) {
163+
promise.setSuccess();
164+
} else {
165+
promise.setFailed(handleResult);
166+
}
167+
});
168+
169+
return promise.getFuture();
155170
}
156171

157172
void ProducerImpl::connectionFailed(Result result) {
@@ -167,8 +182,10 @@ void ProducerImpl::connectionFailed(Result result) {
167182
}
168183
}
169184

170-
void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
171-
const ResponseData& responseData) {
185+
Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
186+
const ResponseData& responseData) {
187+
Result handleResult = ResultOk;
188+
172189
Lock lock(mutex_);
173190

174191
LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
@@ -190,7 +207,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
190207
lock.unlock();
191208
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
192209
}
193-
return;
210+
return ResultAlreadyClosed;
194211
}
195212

196213
if (result == ResultOk) {
@@ -259,6 +276,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
259276
}
260277
lock.unlock();
261278
producerCreatedPromise_.setFailed(result);
279+
handleResult = result;
262280
} else if (producerCreatedPromise_.isComplete()) {
263281
if (result == ResultProducerBlockedQuotaExceededException) {
264282
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
@@ -269,22 +287,23 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
269287

270288
// Producer had already been initially created, we need to retry connecting in any case
271289
LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result));
272-
scheduleReconnection();
290+
handleResult = ResultRetryable;
273291
} else {
274292
// Producer was not yet created, retry to connect to broker if it's possible
275-
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
276-
if (isResultRetryable(result)) {
277-
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
278-
scheduleReconnection();
293+
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
294+
if (isResultRetryable(handleResult)) {
295+
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(handleResult));
279296
} else {
280-
LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
281-
failPendingMessages(result, false);
297+
LOG_ERROR(getName() << "Failed to create producer: " << strResult(handleResult));
298+
failPendingMessages(handleResult, false);
282299
state_ = Failed;
283300
lock.unlock();
284-
producerCreatedPromise_.setFailed(result);
301+
producerCreatedPromise_.setFailed(handleResult);
285302
}
286303
}
287304
}
305+
306+
return handleResult;
288307
}
289308

290309
auto ProducerImpl::getPendingCallbacksWhenFailed() -> decltype(pendingMessagesQueue_) {

lib/ProducerImpl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
120120

121121
// overrided methods from HandlerBase
122122
void beforeConnectionChange(ClientConnection& connection) override;
123-
void connectionOpened(const ClientConnectionPtr& connection) override;
123+
Future<Result, bool> connectionOpened(const ClientConnectionPtr& connection) override;
124124
void connectionFailed(Result result) override;
125125
const std::string& getName() const override { return producerStr_; }
126126

127127
private:
128128
void printStats();
129129

130-
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
131-
const ResponseData& responseData);
130+
Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
131+
const ResponseData& responseData);
132132

133133
void resendMessages(ClientConnectionPtr cnx);
134134

0 commit comments

Comments
 (0)