Skip to content

Commit b35ae1a

Browse files
Avoid accessing a null ClientConnection instance (#317)
### Motivation We observed server null `ClientConnection` accesses in test environment. See the `this=0x0` outputs in the following two typical stacks. ``` #8 bytesWritten (this=0xb8, size=371) at lib/SharedBuffer.h:166 #9 pulsar::ClientConnection::handleRead (this=0x0, err=..., bytesTransferred=371, minReadSize=4) at lib/ClientConnection.cc:609 ``` ``` #12 0x00007f33202933d2 in unique_lock (__m=..., this=0x7f3311c82800) at /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/std_mutex.h:197 #13 pulsar::ClientConnection::sendPendingCommands (this=0x0) at lib/ClientConnection.cc:1071 #14 0x00007f3320293d2d in pulsar::ClientConnection::handleSendPair (this=0x0, err=...) at lib/ClientConnection.cc:1066 ``` Though `shared_from_this()` is always passed to the `std::bind` function, when the method of `ClientConnection` is called, the pointer is still `null`. ### Modifications First, replace all `std::bind` calls with the lambda expression that catches `std::weak_ptr<ClientConnection>` and perform null checks explicitly on the value returned by the `lock()` method. Since now all asio callbacks don't hold a `shared_ptr`, the owner of the `ClientConnection` object should be `ConnectionPool`, i.e. the pool maintains some connections, while all asio callbacks use `weak_ptr` to test if the connection is present. Second, make `ClientConnection::getConnection` return `shared_ptr` rather than `weak_ptr` so that the caller side does not need to check if `lock()` returns null in the callback of this future. We cannot make `ConnectionPool::getConnectionAsync` return `shared_ptr` because it could return the future of `connectPromise_`, which is hold by `ClientConnection` itself. We should avoid holding a `shared_ptr` of `ClientConnection` because its owner is `ConnectionPool`.
1 parent ba5902a commit b35ae1a

12 files changed

+319
-170
lines changed

lib/BinaryProtoLookupService.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,12 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
122122
promise->setFailed(result);
123123
return;
124124
}
125+
auto conn = clientCnx.lock();
126+
if (!conn) {
127+
promise->setFailed(ResultConnectError);
128+
return;
129+
}
125130
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
126-
ClientConnectionPtr conn = clientCnx.lock();
127131
uint64_t requestId = newRequestId();
128132
conn->newPartitionedMetadataLookup(topicName, requestId, lookupPromise);
129133
lookupPromise->getFuture().addListener(std::bind(&BinaryProtoLookupService::handlePartitionMetadataLookup,
@@ -212,7 +216,11 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
212216
return;
213217
}
214218

215-
ClientConnectionPtr conn = clientCnx.lock();
219+
auto conn = clientCnx.lock();
220+
if (!conn) {
221+
promise->setFailed(ResultConnectError);
222+
return;
223+
}
216224
uint64_t requestId = newRequestId();
217225
LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName);
218226
conn->newGetTopicsOfNamespace(nsName, mode, requestId)

lib/ClientConnection.cc

Lines changed: 206 additions & 79 deletions
Large diffs are not rendered by default.

lib/ClientConnection.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ using TcpResolverPtr = std::shared_ptr<boost::asio::ip::tcp::resolver>;
5454
class ExecutorService;
5555
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
5656

57+
class ConnectionPool;
5758
class ClientConnection;
5859
typedef std::shared_ptr<ClientConnection> ClientConnectionPtr;
5960
typedef std::weak_ptr<ClientConnection> ClientConnectionWeakPtr;
@@ -127,16 +128,21 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
127128
*/
128129
ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
129130
ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration,
130-
const AuthenticationPtr& authentication, const std::string& clientVersion);
131+
const AuthenticationPtr& authentication, const std::string& clientVersion,
132+
ConnectionPool& pool);
131133
~ClientConnection();
132134

135+
#if __cplusplus < 201703L
136+
std::weak_ptr<ClientConnection> weak_from_this() noexcept { return shared_from_this(); }
137+
#endif
138+
133139
/*
134140
* starts tcp connect_async
135141
* @return future<ConnectionPtr> which is not yet set
136142
*/
137143
void tcpConnectAsync();
138144

139-
void close(Result result = ResultConnectError);
145+
void close(Result result = ResultConnectError, bool detach = true);
140146

141147
bool isClosed() const;
142148

@@ -383,6 +389,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
383389
bool isTlsAllowInsecureConnection_ = false;
384390

385391
const std::string clientVersion_;
392+
ConnectionPool& pool_;
386393
friend class PulsarFriend;
387394

388395
void closeSocket();

lib/ClientImpl.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517517
}
518518
}
519519

520-
Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
521-
Promise<Result, ClientConnectionWeakPtr> promise;
520+
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic) {
521+
Promise<Result, ClientConnectionPtr> promise;
522522

523523
const auto topicNamePtr = TopicName::get(topic);
524524
if (!topicNamePtr) {
@@ -537,7 +537,12 @@ Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::str
537537
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
538538
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
539539
if (result == ResultOk) {
540-
promise.setValue(weakCnx);
540+
auto cnx = weakCnx.lock();
541+
if (cnx) {
542+
promise.setValue(cnx);
543+
} else {
544+
promise.setFailed(ResultConnectError);
545+
}
541546
} else {
542547
promise.setFailed(result);
543548
}

lib/ClientImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ConsumerImplBase;
5050
typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
5151

5252
class ClientConnection;
53-
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
53+
using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
5454

5555
class LookupService;
5656
using LookupServicePtr = std::shared_ptr<LookupService>;
@@ -96,7 +96,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9696

9797
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9898

99-
Future<Result, ClientConnectionWeakPtr> getConnection(const std::string& topic);
99+
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic);
100100

101101
void closeAsync(CloseCallback callback);
102102
void shutdown();

lib/ConnectionPool.cc

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ bool ConnectionPool::close() {
4848
return false;
4949
}
5050

51-
std::unique_lock<std::mutex> lock(mutex_);
51+
std::unique_lock<std::recursive_mutex> lock(mutex_);
5252
if (poolConnections_) {
5353
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
54-
ClientConnectionPtr cnx = cnxIt->second.lock();
54+
auto& cnx = cnxIt->second;
5555
if (cnx) {
56-
cnx->close(ResultDisconnected);
56+
// The 2nd argument is false because removing a value during the iteration will cause segfault
57+
cnx->close(ResultDisconnected, false);
5758
}
5859
}
5960
pool_.clear();
@@ -69,22 +70,22 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
6970
return promise.getFuture();
7071
}
7172

72-
std::unique_lock<std::mutex> lock(mutex_);
73+
std::unique_lock<std::recursive_mutex> lock(mutex_);
7374

7475
if (poolConnections_) {
7576
PoolMap::iterator cnxIt = pool_.find(logicalAddress);
7677
if (cnxIt != pool_.end()) {
77-
ClientConnectionPtr cnx = cnxIt->second.lock();
78+
auto& cnx = cnxIt->second;
7879

79-
if (cnx && !cnx->isClosed()) {
80+
if (!cnx->isClosed()) {
8081
// Found a valid or pending connection in the pool
8182
LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: " //
82-
<< (cnx.use_count() - 1) << " @ " << cnx.get());
83+
<< (cnx.use_count()) << " @ " << cnx.get());
8384
return cnx->getConnectFuture();
8485
} else {
85-
// Deleting stale connection
86-
LOG_INFO("Deleting stale connection from pool for "
87-
<< logicalAddress << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
86+
// The closed connection should have been removed from the pool in ClientConnection::close
87+
LOG_WARN("Deleting stale connection from pool for "
88+
<< logicalAddress << " use_count: " << (cnx.use_count()) << " @ " << cnx.get());
8889
pool_.erase(logicalAddress);
8990
}
9091
}
@@ -94,7 +95,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
9495
ClientConnectionPtr cnx;
9596
try {
9697
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
97-
clientConfiguration_, authentication_, clientVersion_));
98+
clientConfiguration_, authentication_, clientVersion_, *this));
9899
} catch (const std::runtime_error& e) {
99100
lock.unlock();
100101
LOG_ERROR("Failed to create connection: " << e.what())
@@ -114,4 +115,13 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
114115
return future;
115116
}
116117

118+
void ConnectionPool::remove(const std::string& key, ClientConnection* value) {
119+
std::lock_guard<std::recursive_mutex> lock(mutex_);
120+
auto it = pool_.find(key);
121+
if (it->second.get() == value) {
122+
LOG_INFO("Remove connection for " << key);
123+
pool_.erase(it);
124+
}
125+
}
126+
117127
} // namespace pulsar

lib/ConnectionPool.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class PULSAR_PUBLIC ConnectionPool {
5151
*/
5252
bool close();
5353

54+
void remove(const std::string& key, ClientConnection* value);
55+
5456
/**
5557
* Get a connection from the pool.
5658
* <p>
@@ -78,11 +80,11 @@ class PULSAR_PUBLIC ConnectionPool {
7880
ClientConfiguration clientConfiguration_;
7981
ExecutorServiceProviderPtr executorProvider_;
8082
AuthenticationPtr authentication_;
81-
typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
83+
typedef std::map<std::string, std::shared_ptr<ClientConnection>> PoolMap;
8284
PoolMap pool_;
8385
bool poolConnections_;
8486
const std::string clientVersion_;
85-
mutable std::mutex mutex_;
87+
mutable std::recursive_mutex mutex_;
8688
std::atomic_bool closed_{false};
8789

8890
friend class PulsarFriend;

lib/ConsumerImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,13 +315,13 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
315315
if (consumerCreatedPromise_.isComplete()) {
316316
// Consumer had already been initially created, we need to retry connecting in any case
317317
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
318-
scheduleReconnection(get_shared_this_ptr());
318+
scheduleReconnection();
319319
} else {
320320
// Consumer was not yet created, retry to connect to broker if it's possible
321321
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
322322
if (result == ResultRetryable) {
323323
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
324-
scheduleReconnection(get_shared_this_ptr());
324+
scheduleReconnection();
325325
} else {
326326
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
327327
consumerCreatedPromise_.setFailed(result);
@@ -1206,7 +1206,7 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
12061206
void ConsumerImpl::disconnectConsumer() {
12071207
LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
12081208
resetCnx();
1209-
scheduleReconnection(get_shared_this_ptr());
1209+
scheduleReconnection();
12101210
}
12111211

12121212
void ConsumerImpl::closeAsync(ResultCallback originalCallback) {

lib/HandlerBase.cc

Lines changed: 48 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -79,98 +79,92 @@ void HandlerBase::grabCnx() {
7979

8080
LOG_INFO(getName() << "Getting connection from pool");
8181
ClientImplPtr client = client_.lock();
82-
Future<Result, ClientConnectionWeakPtr> future = client->getConnection(*topic_);
83-
future.addListener(std::bind(&HandlerBase::handleNewConnection, std::placeholders::_1,
84-
std::placeholders::_2, get_weak_from_this()));
85-
}
86-
87-
void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr connection,
88-
HandlerBaseWeakPtr weakHandler) {
89-
HandlerBasePtr handler = weakHandler.lock();
90-
if (!handler) {
91-
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
82+
if (!client) {
83+
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
84+
connectionFailed(ResultConnectError);
9285
return;
9386
}
94-
95-
handler->reconnectionPending_ = false;
96-
97-
if (result == ResultOk) {
98-
ClientConnectionPtr conn = connection.lock();
99-
if (conn) {
100-
LOG_DEBUG(handler->getName() << "Connected to broker: " << conn->cnxString());
101-
handler->connectionOpened(conn);
102-
return;
103-
}
104-
// TODO - look deeper into why the connection is null while the result is ResultOk
105-
LOG_INFO(handler->getName() << "ClientConnectionPtr is no longer valid");
106-
}
107-
handler->connectionFailed(result);
108-
scheduleReconnection(handler);
87+
auto weakSelf = get_weak_from_this();
88+
client->getConnection(*topic_).addListener(
89+
[this, weakSelf](Result result, const ClientConnectionPtr& cnx) {
90+
auto self = weakSelf.lock();
91+
if (!self) {
92+
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
93+
return;
94+
}
95+
96+
reconnectionPending_ = false;
97+
98+
if (result == ResultOk) {
99+
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
100+
connectionOpened(cnx);
101+
} else {
102+
connectionFailed(result);
103+
scheduleReconnection();
104+
}
105+
});
109106
}
110107

111-
void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr connection,
112-
HandlerBaseWeakPtr weakHandler) {
113-
HandlerBasePtr handler = weakHandler.lock();
114-
if (!handler) {
115-
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
116-
return;
117-
}
108+
void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& cnx) {
109+
State state = state_;
118110

119-
State state = handler->state_;
120-
121-
ClientConnectionPtr currentConnection = handler->getCnx().lock();
122-
if (currentConnection && connection.lock().get() != currentConnection.get()) {
123-
LOG_WARN(handler->getName()
124-
<< "Ignoring connection closed since we are already attached to a newer connection");
111+
ClientConnectionPtr currentConnection = getCnx().lock();
112+
if (currentConnection && cnx.get() != currentConnection.get()) {
113+
LOG_WARN(
114+
getName() << "Ignoring connection closed since we are already attached to a newer connection");
125115
return;
126116
}
127117

128-
handler->resetCnx();
118+
resetCnx();
129119

130120
if (result == ResultRetryable) {
131-
scheduleReconnection(handler);
121+
scheduleReconnection();
132122
return;
133123
}
134124

135125
switch (state) {
136126
case Pending:
137127
case Ready:
138-
scheduleReconnection(handler);
128+
scheduleReconnection();
139129
break;
140130

141131
case NotStarted:
142132
case Closing:
143133
case Closed:
144134
case Producer_Fenced:
145135
case Failed:
146-
LOG_DEBUG(handler->getName()
147-
<< "Ignoring connection closed event since the handler is not used anymore");
136+
LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore");
148137
break;
149138
}
150139
}
151140

152-
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
153-
const auto state = handler->state_.load();
141+
void HandlerBase::scheduleReconnection() {
142+
const auto state = state_.load();
154143

155144
if (state == Pending || state == Ready) {
156-
TimeDuration delay = handler->backoff_.next();
145+
TimeDuration delay = backoff_.next();
157146

158-
LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0)
159-
<< " s");
160-
handler->timer_->expires_from_now(delay);
147+
LOG_INFO(getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s");
148+
timer_->expires_from_now(delay);
161149
// passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
162150
// so we will not run into the case where grabCnx is invoked on out of scope handler
163-
handler->timer_->async_wait(std::bind(&HandlerBase::handleTimeout, std::placeholders::_1, handler));
151+
auto weakSelf = get_weak_from_this();
152+
timer_->async_wait([weakSelf](const boost::system::error_code& ec) {
153+
auto self = weakSelf.lock();
154+
if (self) {
155+
self->handleTimeout(ec);
156+
}
157+
});
164158
}
165159
}
166160

167-
void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBasePtr handler) {
161+
void HandlerBase::handleTimeout(const boost::system::error_code& ec) {
168162
if (ec) {
169-
LOG_DEBUG(handler->getName() << "Ignoring timer cancelled event, code[" << ec << "]");
163+
LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec << "]");
170164
return;
171165
} else {
172-
handler->epoch_++;
173-
handler->grabCnx();
166+
epoch_++;
167+
grabCnx();
174168
}
175169
}
176170

lib/HandlerBase.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class HandlerBase {
6767
/*
6868
* Schedule reconnection after backoff time
6969
*/
70-
static void scheduleReconnection(HandlerBasePtr handler);
70+
void scheduleReconnection();
7171

7272
/**
7373
* Do some cleanup work before changing `connection_` to `cnx`.
@@ -89,10 +89,9 @@ class HandlerBase {
8989
virtual const std::string& getName() const = 0;
9090

9191
private:
92-
static void handleNewConnection(Result result, ClientConnectionWeakPtr connection, HandlerBaseWeakPtr wp);
93-
static void handleDisconnection(Result result, ClientConnectionWeakPtr connection, HandlerBaseWeakPtr wp);
92+
void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
9493

95-
static void handleTimeout(const boost::system::error_code& ec, HandlerBasePtr handler);
94+
void handleTimeout(const boost::system::error_code& ec);
9695

9796
protected:
9897
ClientImplWeakPtr client_;

0 commit comments

Comments
 (0)