Skip to content

Delay the timing of setting reconnectionPending to false to avoid double attempt at reconnecting #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,14 @@ void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) {
interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), messageIds);
}

void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
// Do not use bool, only Result.
Promise<Result, bool> promise;

if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
return;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}

// Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
Expand Down Expand Up @@ -252,9 +256,20 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());

// Keep a reference to ensure object is kept alive.
auto self = get_shared_this_ptr();
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx,
std::placeholders::_1));
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
Result handleResult = handleCreateConsumer(cnx, result);
if (handleResult == ResultOk) {
promise.setSuccess();
} else {
promise.setFailed(handleResult);
}
});

return promise.getFuture();
}

void ConsumerImpl::connectionFailed(Result result) {
Expand All @@ -274,7 +289,9 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n
}
}

void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Result handleResult = ResultOk;

static bool firstTime = true;
if (result == ResultOk) {
if (firstTime) {
Expand Down Expand Up @@ -316,20 +333,21 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
if (consumerCreatedPromise_.isComplete()) {
// Consumer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
scheduleReconnection();
handleResult = ResultRetryable;
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
scheduleReconnection();
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(handleResult)) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(handleResult));
} else {
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
consumerCreatedPromise_.setFailed(result);
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(handleResult));
consumerCreatedPromise_.setFailed(handleResult);
state_ = Failed;
}
}
}

return handleResult;
}

void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
Expand Down
4 changes: 2 additions & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ class ConsumerImpl : public ConsumerImplBase {

protected:
// overrided methods from HandlerBase
void connectionOpened(const ClientConnectionPtr& cnx) override;
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override;
void connectionFailed(Result result) override;

// impl methods from ConsumerImpl base
bool hasEnoughMessagesForBatchReceive() const override;
void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;

void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);

void internalListener();

Expand Down
7 changes: 6 additions & 1 deletion lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ class ConsumerImplBase : public HandlerBase {

protected:
// overrided methods from HandlerBase
void connectionOpened(const ClientConnectionPtr& cnx) override {}
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override {
// Do not use bool, only Result.
Promise<Result, bool> promise;
promise.setSuccess();
return promise.getFuture();
}
void connectionFailed(Result result) override {}

// consumer impl generic method.
Expand Down
2 changes: 2 additions & 0 deletions lib/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class Promise {

bool setFailed(Result result) const { return state_->complete(result, {}); }

bool setSuccess() const { return setValue({}); }

bool isComplete() const { return state_->completed(); }

Future<Result, Type> getFuture() const { return Future<Result, Type>{state_}; }
Expand Down
23 changes: 15 additions & 8 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
}

void HandlerBase::grabCnx() {
if (getCnx().lock()) {
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
bool expectedState = false;
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
return;
}

bool expectedState = false;
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
if (getCnx().lock()) {
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
reconnectionPending_ = false;
return;
}

Expand All @@ -83,17 +84,23 @@ void HandlerBase::grabCnx() {
if (!client) {
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
connectionFailed(ResultConnectError);
reconnectionPending_ = false;
return;
}
auto self = shared_from_this();
client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
reconnectionPending_ = false;

if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx);
connectionOpened(cnx).addListener([this, self](Result result, bool) {
// Do not use bool, only Result.
reconnectionPending_ = false;
if (isResultRetryable(result)) {
scheduleReconnection();
}
});
} else {
connectionFailed(result);
reconnectionPending_ = false;
scheduleReconnection();
}
});
Expand Down
10 changes: 7 additions & 3 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <string>

#include "Backoff.h"
#include "Future.h"

namespace pulsar {

Expand Down Expand Up @@ -74,10 +75,13 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
virtual void beforeConnectionChange(ClientConnection& cnx) = 0;

/*
* connectionOpened will be implemented by derived class to receive notification
* connectionOpened will be implemented by derived class to receive notification.
*
* @return ResultOk if the connection is successfully completed.
* @return ResultError if there was a failure. ResultRetryable if reconnection is needed.
* @return Do not use bool, only Result.
*/

virtual void connectionOpened(const ClientConnectionPtr& connection) = 0;
virtual Future<Result, bool> connectionOpened(const ClientConnectionPtr& connection) = 0;

virtual void connectionFailed(Result result) = 0;

Expand Down
49 changes: 34 additions & 15 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,14 @@ void ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
connection.removeProducer(producerId_);
}

void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
// Do not use bool, only Result.
Promise<Result, bool> promise;

if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
return;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}

ClientImplPtr client = client_.lock();
Expand All @@ -149,9 +153,20 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
userProvidedProducerName_, conf_.isEncryptionEnabled(),
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
topicEpoch, conf_.impl_->initialSubscriptionName);

// Keep a reference to ensure object is kept alive.
auto self = shared_from_this();
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
std::placeholders::_1, std::placeholders::_2));
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
Result handleResult = handleCreateProducer(cnx, result, responseData);
if (handleResult == ResultOk) {
promise.setSuccess();
} else {
promise.setFailed(handleResult);
}
});

return promise.getFuture();
}

void ProducerImpl::connectionFailed(Result result) {
Expand All @@ -167,8 +182,10 @@ void ProducerImpl::connectionFailed(Result result) {
}
}

void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData) {
Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData) {
Result handleResult = ResultOk;

Lock lock(mutex_);

LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
Expand All @@ -190,7 +207,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
lock.unlock();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
return;
return ResultAlreadyClosed;
}

if (result == ResultOk) {
Expand Down Expand Up @@ -259,6 +276,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
}
lock.unlock();
producerCreatedPromise_.setFailed(result);
handleResult = result;
} else if (producerCreatedPromise_.isComplete()) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
Expand All @@ -269,22 +287,23 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r

// Producer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result));
scheduleReconnection();
handleResult = ResultRetryable;
} else {
// Producer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
scheduleReconnection();
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(handleResult)) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(handleResult));
} else {
LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
failPendingMessages(result, false);
LOG_ERROR(getName() << "Failed to create producer: " << strResult(handleResult));
failPendingMessages(handleResult, false);
state_ = Failed;
lock.unlock();
producerCreatedPromise_.setFailed(result);
producerCreatedPromise_.setFailed(handleResult);
}
}
}

return handleResult;
}

auto ProducerImpl::getPendingCallbacksWhenFailed() -> decltype(pendingMessagesQueue_) {
Expand Down
6 changes: 3 additions & 3 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {

// overrided methods from HandlerBase
void beforeConnectionChange(ClientConnection& connection) override;
void connectionOpened(const ClientConnectionPtr& connection) override;
Future<Result, bool> connectionOpened(const ClientConnectionPtr& connection) override;
void connectionFailed(Result result) override;
const std::string& getName() const override { return producerStr_; }

private:
void printStats();

void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);
Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);

void resendMessages(ClientConnectionPtr cnx);

Expand Down