Skip to content

Commit f69d0ce

Browse files
authored
Fix deadlock when closing the partitioned producer (#187)
1 parent 39183d3 commit f69d0ce

File tree

3 files changed

+40
-0
lines changed

3 files changed

+40
-0
lines changed

lib/PartitionedProducerImpl.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
148148
const auto numPartitions = getNumPartitionsWithLock();
149149
assert(numProducersCreated_ <= numPartitions && partitionIndex <= numPartitions);
150150

151+
if (state_ == Closing) {
152+
return;
153+
}
154+
151155
if (state_ == Failed) {
152156
// We have already informed client that producer creation failed
153157
if (++numProducersCreated_ == numPartitions) {

tests/ProducerTest.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <thread>
2323

2424
#include "HttpHelper.h"
25+
#include "PulsarFriend.h"
2526
#include "lib/Future.h"
2627
#include "lib/Latch.h"
2728
#include "lib/LogUtils.h"
@@ -467,4 +468,27 @@ TEST(ProducerTest, testCloseProducerBeforeCreated) {
467468
client.close();
468469
}
469470

471+
TEST(ProducerTest, testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUpdate) {
472+
const std::string topic = "public/default/partition-test" + std::to_string(time(nullptr));
473+
std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions";
474+
475+
int res = makePutRequest(topicOperateUrl, "2");
476+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
477+
478+
ClientConfiguration clientConf;
479+
clientConf.setPartititionsUpdateInterval(1);
480+
Client client(serviceUrl, clientConf);
481+
ProducerConfiguration conf;
482+
Producer producer;
483+
client.createProducer(topic, conf, producer);
484+
PartitionedProducerImpl& partitionedProducer = PulsarFriend::getPartitionedProducerImpl(producer);
485+
486+
// TODO: Replace by producer interceptor to reproduce the issue then we can remove
487+
// PulsarFriend::updatePartitions
488+
PulsarFriend::updatePartitions(partitionedProducer, 3);
489+
490+
producer.close();
491+
client.close();
492+
}
493+
470494
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

tests/PulsarFriend.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,18 @@ class PulsarFriend {
187187
long unAckedMessagesTimeoutMs) {
188188
consumerConfiguration.impl_->unAckedMessagesTimeoutMs = unAckedMessagesTimeoutMs;
189189
}
190+
191+
static PartitionedProducerImpl& getPartitionedProducerImpl(Producer producer) {
192+
PartitionedProducerImpl* partitionedProducer =
193+
static_cast<PartitionedProducerImpl*>(producer.impl_.get());
194+
return *partitionedProducer;
195+
}
196+
197+
static void updatePartitions(PartitionedProducerImpl& partitionedProducer, int newPartitions) {
198+
LookupDataResultPtr lookupData = std::make_shared<LookupDataResult>();
199+
lookupData->setPartitions(newPartitions);
200+
partitionedProducer.handleGetPartitions(ResultOk, lookupData);
201+
}
190202
};
191203
} // namespace pulsar
192204

0 commit comments

Comments
 (0)