Skip to content

Commit 0b893a1

Browse files
Fix the seek method could be blocked forever when subscribe RPC is slower than seek RPC (#533)
1 parent d040039 commit 0b893a1

File tree

10 files changed

+420
-200
lines changed

10 files changed

+420
-200
lines changed

lib/ClientConnection.cc

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <openssl/x509.h>
2222
#include <pulsar/MessageIdBuilder.h>
2323

24+
#include <chrono>
2425
#include <fstream>
2526

2627
#include "AsioDefines.h"
@@ -31,6 +32,7 @@
3132
#include "ConsumerImpl.h"
3233
#include "ExecutorService.h"
3334
#include "LogUtils.h"
35+
#include "MockServer.h"
3436
#include "OpSendMsg.h"
3537
#include "ProducerImpl.h"
3638
#include "PulsarApi.pb.h"
@@ -1005,15 +1007,17 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
10051007
void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
10061008
const std::string& listenerName, uint64_t requestId,
10071009
const LookupDataResultPromisePtr& promise) {
1008-
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise);
1010+
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP",
1011+
promise);
10091012
}
10101013

10111014
void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId,
10121015
const LookupDataResultPromisePtr& promise) {
1013-
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise);
1016+
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA",
1017+
promise);
10141018
}
10151019

1016-
void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
1020+
void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
10171021
const LookupDataResultPromisePtr& promise) {
10181022
Lock lock(mutex_);
10191023
std::shared_ptr<LookupDataResultPtr> lookupDataResult;
@@ -1042,6 +1046,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
10421046
pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
10431047
numOfPendingLookupRequest_++;
10441048
lock.unlock();
1049+
LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")");
10451050
sendCommand(cmd);
10461051
}
10471052

@@ -1158,12 +1163,15 @@ void ClientConnection::sendPendingCommands() {
11581163
}
11591164
}
11601165

1161-
Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) {
1166+
Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId,
1167+
const char* requestType) {
11621168
Lock lock(mutex_);
11631169

11641170
if (isClosed()) {
11651171
lock.unlock();
11661172
Promise<Result, ResponseData> promise;
1173+
LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << requestId
1174+
<< ") to a closed connection");
11671175
promise.setFailed(ResultNotConnected);
11681176
return promise.getFuture();
11691177
}
@@ -1182,7 +1190,17 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
11821190
pendingRequests_.insert(std::make_pair(requestId, requestData));
11831191
lock.unlock();
11841192

1185-
sendCommand(cmd);
1193+
LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")");
1194+
if (mockingRequests_.load(std::memory_order_acquire)) {
1195+
if (mockServer_ == nullptr) {
1196+
LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType);
1197+
sendCommand(cmd);
1198+
} else if (!mockServer_->sendRequest(requestType, requestId)) {
1199+
sendCommand(cmd);
1200+
}
1201+
} else {
1202+
sendCommand(cmd);
1203+
}
11861204
return requestData.promise.getFuture();
11871205
}
11881206

@@ -1625,9 +1643,6 @@ void ClientConnection::handleConsumerStatsResponse(
16251643

16261644
void ClientConnection::handleLookupTopicRespose(
16271645
const proto::CommandLookupTopicResponse& lookupTopicResponse) {
1628-
LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
1629-
<< lookupTopicResponse.request_id());
1630-
16311646
Lock lock(mutex_);
16321647
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
16331648
if (it != pendingLookupRequests_.end()) {

lib/ClientConnection.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ struct ResponseData {
115115

116116
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
117117

118+
class MockServer;
118119
class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<ClientConnection> {
119120
enum State : uint8_t
120121
{
@@ -123,6 +124,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
123124
Ready,
124125
Disconnected
125126
};
127+
using RequestDelayType =
128+
std::unordered_map<std::string /* request type */, long /* delay in milliseconds */>;
126129

127130
public:
128131
typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
@@ -185,7 +188,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
185188
* Send a request with a specific Id over the connection. The future will be
186189
* triggered when the response for this request is received
187190
*/
188-
Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int requestId);
191+
Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int requestId,
192+
const char* requestType);
189193

190194
const std::string& brokerAddress() const;
191195

@@ -208,6 +212,13 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
208212
Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, const std::string& version,
209213
uint64_t requestId);
210214

215+
void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
216+
mockServer_ = mockServer;
217+
// Mark that requests will first go through the mock server, if the mock server cannot process it,
218+
// fall back to the normal logic
219+
mockingRequests_.store(true, std::memory_order_release);
220+
}
221+
211222
private:
212223
struct PendingRequestData {
213224
Promise<Result, ResponseData> promise;
@@ -264,7 +275,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
264275
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
265276
void handleSendPair(const ASIO_ERROR& err);
266277
void sendPendingCommands();
267-
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const LookupDataResultPromisePtr& promise);
278+
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
279+
const LookupDataResultPromisePtr& promise);
268280

269281
void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData& pendingRequestData);
270282

@@ -308,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
308320
}
309321
}
310322

323+
void mockSendCommand(const char* requestType, uint64_t requestId, const SharedBuffer& cmd);
324+
311325
std::atomic<State> state_{Pending};
312326
TimeDuration operationsTimeout_;
313327
AuthenticationPtr authentication_;
@@ -391,6 +405,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
391405
DeadlineTimerPtr keepAliveTimer_;
392406
DeadlineTimerPtr consumerStatsRequestTimer_;
393407

408+
std::atomic_bool mockingRequests_{false};
409+
std::shared_ptr<MockServer> mockServer_;
410+
394411
void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector<uint64_t>& consumerStatsRequests);
395412

396413
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
@@ -405,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
405422

406423
friend class PulsarFriend;
407424
friend class ConsumerTest;
425+
friend class MockServer;
408426

409427
void checkServerError(ServerError error, const std::string& message);
410428

0 commit comments

Comments
 (0)