Skip to content

Commit 9d710be

Browse files
committed
Avoid leaking uv_async_t in Dispatcher::Deactivate()
On disconnect, Dispatcher::Deactivate() calls uv_close() to close its active uv_async_t instance and nulls out the corresponding field, but never frees the underlying memory. To reproduce, run the following with node-rdkafka and librdkafka compiled with ASAN: ```js const { KafkaConsumer } = require('../'); const consumer = new KafkaConsumer({ 'group.id': 'kafka', 'metadata.broker.list': 'localhost:9092', }, {}); consumer.connect({ timeout: 2000 }, function (err) { if (err) { console.error('Error connecting to Kafka:', err); return; } consumer.disconnect(); }) ``` This should report: ``` Direct leak of 128 byte(s) in 1 object(s) allocated from: #0 0x7f049c3aa647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99 Blizzard#1 0x7f0471d872a1 in NodeKafka::Callbacks::Dispatcher::Activate() ../src/callbacks.cc:69 Blizzard#2 0x7f0471e75c06 in NodeKafka::Workers::KafkaConsumerConnect::HandleOKCallback() ../src/workers.cc:579 Blizzard#3 0x7f0471dd679c in Nan::AsyncWorker::WorkComplete() ../node_modules/nan/nan.h:2008 Blizzard#4 0x7f0471dd679c in Nan::AsyncExecuteComplete(uv_work_s*) ../node_modules/nan/nan.h:2365 Blizzard#5 0x7f0471dd679c in Nan::AsyncExecuteComplete(uv_work_s*, int) ../node_modules/nan/nan.h:2369 Blizzard#6 0x18bb75c in uv__work_done ../deps/uv/src/threadpool.c:329 Blizzard#7 0x18bf0b2 in uv__async_io ../deps/uv/src/unix/async.c:176 Blizzard#8 0x18d3b2a in uv__io_poll ../deps/uv/src/unix/linux.c:1485 Blizzard#9 0x18bfdd6 in uv_run ../deps/uv/src/unix/core.c:447 Blizzard#10 0xbc9be5 in node::SpinEventLoopInternal(node::Environment*) (/usr/bin/node+0xbc9be5) Blizzard#11 0xd1d920 in node::NodeMainInstance::Run(node::ExitCode*, node::Environment*) [clone .part.0] (/usr/bin/node+0xd1d920) Blizzard#12 0xd1e38c in node::NodeMainInstance::Run() (/usr/bin/node+0xd1e38c) Blizzard#13 0xc710be in node::Start(int, char**) (/usr/bin/node+0xc710be) Blizzard#14 0x7f049bdf0d09 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x23d09) ``` The [libuv documentation](https://docs.libuv.org/en/v1.x/handle.html#c.uv_close) says it's only safe to free the underlying memory in a close callback passed to uv_close(), or after such a callback has returned. So, use a unique_ptr with a custom deleter to accomplish this.
1 parent 24e6e0c commit 9d710be

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

src/callbacks.cc

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ v8::Local<v8::Array> TopicPartitionListToV8Array(
4848
return tp_array;
4949
}
5050

51-
Dispatcher::Dispatcher() {
52-
async = NULL;
51+
Dispatcher::Dispatcher(): async(nullptr, async_deleter) {
5352
uv_mutex_init(&async_lock);
5453
}
5554

@@ -66,19 +65,17 @@ Dispatcher::~Dispatcher() {
6665
// Only run this if we aren't already listening
6766
void Dispatcher::Activate() {
6867
if (!async) {
69-
async = new uv_async_t;
70-
uv_async_init(uv_default_loop(), async, AsyncMessage_);
68+
async = std::unique_ptr<uv_async_t, decltype(async_deleter)*>(
69+
new uv_async_t(), async_deleter);
70+
uv_async_init(uv_default_loop(), async.get(), AsyncMessage_);
7171

7272
async->data = this;
7373
}
7474
}
7575

7676
// Should be able to run this regardless of whether it is active or not
7777
void Dispatcher::Deactivate() {
78-
if (async) {
79-
uv_close(reinterpret_cast<uv_handle_t*>(async), NULL);
80-
async = NULL;
81-
}
78+
async.reset();
8279
}
8380

8481
bool Dispatcher::HasCallbacks() {
@@ -87,7 +84,7 @@ bool Dispatcher::HasCallbacks() {
8784

8885
void Dispatcher::Execute() {
8986
if (async) {
90-
uv_async_send(async);
87+
uv_async_send(async.get());
9188
}
9289
}
9390

@@ -119,6 +116,17 @@ void Dispatcher::RemoveCallback(const v8::Local<v8::Function> &cb) {
119116
}
120117
}
121118

119+
// Custom deleter for uv_async_t smart pointers
120+
void Dispatcher::async_deleter(uv_async_t* async) {
121+
uv_close(
122+
reinterpret_cast<uv_handle_t*>(async),
123+
// Release memory after uv_close() has finished.
124+
[](uv_handle_t* handle) {
125+
delete reinterpret_cast<uv_async_t*>(handle);
126+
}
127+
);
128+
}
129+
122130
event_t::event_t(const RdKafka::Event &event) {
123131
message = "";
124132
fac = "";

src/callbacks.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <uv.h>
1313
#include <nan.h>
1414

15+
#include <memory>
1516
#include <vector>
1617
#include <deque>
1718

@@ -49,7 +50,9 @@ class Dispatcher {
4950
dispatcher->Flush();
5051
}
5152

52-
uv_async_t *async;
53+
static inline void async_deleter(uv_async_t* async);
54+
55+
std::unique_ptr<uv_async_t, decltype(async_deleter)*> async;
5356
};
5457

5558
struct event_t {

0 commit comments

Comments
 (0)