Skip to content

Commit 69e5680

Browse files
authored
[fix] Flush no batch message when call producer.flush (#98)
1 parent 94b909e commit 69e5680

File tree

3 files changed

+65
-10
lines changed

3 files changed

+65
-10
lines changed

lib/OpSendMsg.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ struct OpSendMsg {
3939
boost::posix_time::ptime timeout_;
4040
uint32_t messagesCount_;
4141
uint64_t messagesSize_;
42+
std::vector<std::function<void(Result)>> trackerCallbacks_;
4243

4344
OpSendMsg() = default;
4445

@@ -59,6 +60,13 @@ struct OpSendMsg {
5960
if (sendCallback_) {
6061
sendCallback_(result, messageId);
6162
}
63+
for (const auto& trackerCallback : trackerCallbacks_) {
64+
trackerCallback(result);
65+
}
66+
}
67+
68+
void addTrackerCallback(std::function<void(Result)> trackerCallback) {
69+
trackerCallbacks_.emplace_back(trackerCallback);
6270
}
6371
};
6472

lib/ProducerImpl.cc

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -332,17 +332,25 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen
332332
}
333333

334334
void ProducerImpl::flushAsync(FlushCallback callback) {
335+
if (state_ != Ready) {
336+
callback(ResultAlreadyClosed);
337+
return;
338+
}
335339
if (batchMessageContainer_) {
336-
if (state_ == Ready) {
337-
Lock lock(mutex_);
338-
auto failures = batchMessageAndSend(callback);
340+
Lock lock(mutex_);
341+
auto failures = batchMessageAndSend(callback);
342+
lock.unlock();
343+
failures.complete();
344+
} else {
345+
Lock lock(mutex_);
346+
if (!pendingMessagesQueue_.empty()) {
347+
auto& opSendMsg = pendingMessagesQueue_.back();
339348
lock.unlock();
340-
failures.complete();
349+
opSendMsg.addTrackerCallback(callback);
341350
} else {
342-
callback(ResultAlreadyClosed);
351+
lock.unlock();
352+
callback(ResultOk);
343353
}
344-
} else {
345-
callback(ResultOk);
346354
}
347355
}
348356

tests/ProducerTest.cc

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,10 @@ TEST_P(ProducerTest, testMaxMessageSize) {
245245
client.close();
246246
}
247247

248-
TEST_P(ProducerTest, testChunkingMaxMessageSize) {
248+
TEST(ProducerTest, testChunkingMaxMessageSize) {
249249
Client client(serviceUrl);
250250

251-
const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") +
252-
(GetParam() ? "batch-" : "no-batch-") + std::to_string(time(nullptr));
251+
const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") + std::to_string(time(nullptr));
253252

254253
Consumer consumer;
255254
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
@@ -297,4 +296,44 @@ TEST(ProducerTest, testExclusiveProducer) {
297296
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3));
298297
}
299298

299+
TEST_P(ProducerTest, testFlushNoBatch) {
300+
Client client(serviceUrl);
301+
302+
auto partitioned = GetParam();
303+
const auto topicName = std::string("testFlushNoBatch") +
304+
(partitioned ? "partitioned-" : "-no-partitioned-") +
305+
std::to_string(time(nullptr));
306+
307+
if (partitioned) {
308+
// call admin api to make it partitioned
309+
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
310+
int res = makePutRequest(url, "5");
311+
LOG_INFO("res = " << res);
312+
ASSERT_FALSE(res != 204 && res != 409);
313+
}
314+
315+
ProducerConfiguration producerConfiguration;
316+
producerConfiguration.setBatchingEnabled(false);
317+
318+
Producer producer;
319+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
320+
321+
std::atomic_int needCallBack(100);
322+
auto cb = [&needCallBack](Result code, const MessageId& msgId) {
323+
ASSERT_EQ(code, ResultOk);
324+
needCallBack.fetch_sub(1);
325+
};
326+
327+
for (int i = 0; i < 100; ++i) {
328+
Message msg = MessageBuilder().setContent("content").build();
329+
producer.sendAsync(msg, cb);
330+
}
331+
332+
producer.flush();
333+
ASSERT_EQ(needCallBack.load(), 0);
334+
producer.close();
335+
336+
client.close();
337+
}
338+
300339
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)