Skip to content

Commit

Permalink
Fix seek failure by timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Jan 22, 2024
1 parent 72b7311 commit 73e3089
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 79 deletions.
2 changes: 0 additions & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ class ConsumerImpl : public ConsumerImplBase {
const ClientConnectionPtr& cnx, MessageId& messageId);

friend class PulsarFriend;

// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;

FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages);
Expand Down
51 changes: 0 additions & 51 deletions lib/MultiResultCallback.h

This file was deleted.

96 changes: 71 additions & 25 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "LookupService.h"
#include "MessageImpl.h"
#include "MessagesImpl.h"
#include "MultiResultCallback.h"
#include "MultiTopicsBrokerConsumerStatsImpl.h"
#include "TopicName.h"
#include "UnAckedMessageTrackerDisabled.h"
Expand Down Expand Up @@ -545,31 +544,26 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
return;
}

incomingMessages_.push(msg);
incomingMessagesSize_.fetch_add(msg.getLength());

// try trigger pending batch messages
Lock batchOptionLock(batchReceiveOptionMutex_);
if (hasEnoughMessagesForBatchReceive()) {
ConsumerImplBase::notifyBatchPendingReceivedCallback();
if (PULSAR_UNLIKELY(duringSeek_.load(std::memory_order_acquire))) {
// incomingMessages_ will be set after seek is done
return;
}
batchOptionLock.unlock();

if (messageListener_) {
listenerExecutor_->postWork(
std::bind(&MultiTopicsConsumerImpl::internalListener, get_shared_this_ptr(), consumer));
}
incomingMessages_.push(msg);
incomingMessagesSize_.fetch_add(msg.getLength());
notifyListeners();
}

void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
void MultiTopicsConsumerImpl::internalListener() {
Message m;
incomingMessages_.pop(m);
try {
Consumer self{get_shared_this_ptr()};
messageListener_(self, m);
messageProcessed(m);
} catch (const std::exception& e) {
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
while (incomingMessages_.pop(m, std::chrono::milliseconds(0))) {
try {
Consumer self{get_shared_this_ptr()};
messageListener_(self, m);
messageProcessed(m);
} catch (const std::exception& e) {
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
}
}
}

Expand Down Expand Up @@ -907,10 +901,43 @@ void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callb
return;
}

MultiResultCallback multiResultCallback(callback, consumers_.size());
consumers_.forEachValue([&timestamp, &multiResultCallback](ConsumerImplPtr consumer) {
consumer->seekAsync(timestamp, multiResultCallback);
});
duringSeek_.store(true, std::memory_order_release);
incomingMessages_.clear();
incomingMessagesSize_ = 0;

auto consumers = consumers_.move();
auto numConsumersLeft = std::make_shared<std::atomic_size_t>(consumers.size());
auto weakSelf = weak_from_this();
auto wrappedCallback = [this, weakSelf, callback, consumers, numConsumersLeft](Result result) {
auto self = weakSelf.lock();
if (!self) {
callback(result);
return;
}
if (result != ResultOk) {
duringSeek_.store(false, std::memory_order_release);
callback(result);
return;
}
if (--*numConsumersLeft > 0) {
return;
}
// All sub-consumers have completed the seek operation
for (auto&& kv : consumers) {
auto& consumer = kv.second;
consumer->incomingMessages_.drainTo(incomingMessages_);
incomingMessagesSize_ += consumer->incomingMessagesSize_;
consumer->incomingMessagesSize_ = 0;
consumers_.emplace(kv.first, consumer);
}
notifyListeners();
// Resume the normal messageReceived logic
duringSeek_.store(false, std::memory_order_release);
callback(ResultOk);
};
for (auto&& kv : consumers) {
kv.second->seekAsync(timestamp, wrappedCallback);
}
}

void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
Expand Down Expand Up @@ -1127,3 +1154,22 @@ void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallba
});
});
}

void MultiTopicsConsumerImpl::notifyListeners() {
// try trigger pending batch messages
Lock batchOptionLock(batchReceiveOptionMutex_);
if (hasEnoughMessagesForBatchReceive()) {
ConsumerImplBase::notifyBatchPendingReceivedCallback();
}
batchOptionLock.unlock();

if (messageListener_) {
auto weakSelf = weak_from_this();
listenerExecutor_->postWork([weakSelf]() {
auto self = weakSelf.lock();
if (self) {
self->internalListener();
}
});
}
}
6 changes: 5 additions & 1 deletion lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void notifyResult(CloseCallback closeCallback);
void messageReceived(Consumer consumer, const Message& msg);
void messageProcessed(Message& msg);
void internalListener(Consumer consumer);
void internalListener();
void receiveMessages();
void failPendingReceiveCallback();
void notifyPendingReceivedCallback(Result result, const Message& message,
Expand Down Expand Up @@ -170,6 +170,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
friend class PulsarFriend;

private:
mutable std::atomic_bool duringSeek_{false};

std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
void cancelTimers() noexcept;
Expand All @@ -178,6 +180,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
}

void notifyListeners();

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
Expand Down
23 changes: 23 additions & 0 deletions lib/UnboundedBlockingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,29 @@ class UnboundedBlockingQueue {
queueEmptyCondition_.notify_all();
}

void drainTo(UnboundedBlockingQueue<T>& other) {
if (this != &other) {
Lock lock(mutex_);
if (queue_.empty()) {
return;
}
Lock otherLock(other.mutex_);
if (other.queue_.empty()) {
queue_.swap(other.queue_);
otherLock.unlock();
other.queueEmptyCondition_.notify_all();
return;
}
do {
if (other.queue_.full()) {
other.queue_.set_capacity(other.queue_.size() * 2);
}
other.queue_.push_back(std::move(queue_.front()));
queue_.pop_front();
} while (!queue_.empty());
}
}

private:
bool isEmptyNoMutex() const { return queue_.empty(); }
bool isClosedNoMutex() const { return closed_; }
Expand Down
28 changes: 28 additions & 0 deletions tests/UnboundedBlockingQueueTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,31 @@ TEST(UnboundedBlockingQueueTest, testCloseInterruptOnEmpty) {
ASSERT_TRUE(wasUnblocked);
thread.join();
}

TEST(UnboundedBlockingQueueTest, testDrainTo) {
UnboundedBlockingQueue<int> queue(3);
queue.push(1);
queue.push(2);
queue.push(3);

UnboundedBlockingQueue<int> otherQueue(3);
std::thread([&queue, &otherQueue]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
queue.drainTo(otherQueue);
}).detach();
int x;
ASSERT_FALSE(queue.empty());
otherQueue.pop(x); // blocked until queue.drainTo(otherQueue) is called
ASSERT_TRUE(queue.empty());
ASSERT_EQ(x, 1);

queue.push(4);
queue.push(5);
queue.drainTo(otherQueue);
ASSERT_TRUE(queue.empty());
for (int i = 2; i <= 5; i++) {
otherQueue.pop(x);
ASSERT_EQ(x, i);
}
ASSERT_TRUE(otherQueue.empty());
}

0 comments on commit 73e3089

Please sign in to comment.