Skip to content

Commit 1daf3fe

Browse files
committed
[fix] Avoid resource leakage of AckGroupingTracker
1 parent dd649f5 commit 1daf3fe

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

lib/AckGroupingTrackerEnabled.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
5151
const HandlerBasePtr& handlerPtr, uint64_t consumerId,
5252
long ackGroupingTimeMs, long ackGroupingMaxSize)
5353
: AckGroupingTracker(),
54+
state_(NotStarted),
5455
handlerWeakPtr_(handlerPtr),
5556
consumerId_(consumerId),
5657
nextCumulativeAckMsgId_(MessageId::earliest()),
@@ -67,7 +68,10 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
6768
<< ackGroupingMaxSize);
6869
}
6970

70-
void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }
71+
void AckGroupingTrackerEnabled::start() {
72+
state_ = Ready;
73+
this->scheduleTimer();
74+
}
7175

7276
bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
7377
{
@@ -110,6 +114,7 @@ void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId)
110114
}
111115

112116
void AckGroupingTrackerEnabled::close() {
117+
state_ = Closed;
113118
this->flush();
114119
std::lock_guard<std::mutex> lock(this->mutexTimer_);
115120
if (this->timer_) {
@@ -164,6 +169,10 @@ void AckGroupingTrackerEnabled::flushAndClean() {
164169
}
165170

166171
void AckGroupingTrackerEnabled::scheduleTimer() {
172+
if (state_ != Ready) {
173+
return;
174+
}
175+
167176
std::lock_guard<std::mutex> lock(this->mutexTimer_);
168177
this->timer_ = this->executor_->createDeadlineTimer();
169178
this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));

lib/AckGroupingTrackerEnabled.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,15 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
7171
//! Method for scheduling grouping timer.
7272
void scheduleTimer();
7373

74+
//! State
75+
enum State
76+
{
77+
NotStarted,
78+
Ready,
79+
Closed,
80+
};
81+
std::atomic<State> state_;
82+
7483
//! The connection handler.
7584
HandlerBaseWeakPtr handlerWeakPtr_;
7685

lib/ConsumerImpl.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,9 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
12121212
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
12131213

12141214
void ConsumerImpl::shutdown() {
1215+
if (ackGroupingTrackerPtr_) {
1216+
ackGroupingTrackerPtr_->close();
1217+
}
12151218
incomingMessages_.clear();
12161219
possibleSendToDeadLetterTopicMessages_.clear();
12171220
resetCnx();

0 commit comments

Comments
 (0)