-
Notifications
You must be signed in to change notification settings - Fork 408
Refactor rebalance callback to more pure implementation #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
6ccb603
to
228a8e3
Compare
mszabo-wikia
added a commit
to mszabo-wikia/node-rdkafka
that referenced
this pull request
May 23, 2025
Producer::NodeProduce() creates a temporary buffer backed by a `new char[0]` for keys and payloads that were empty buffers, since node::Buffer::Data() returns NULL for these. The [NAN docs](https://github.com/nodejs/nan/blob/main/doc/buffers.md#nannewbuffer) indicate that the data managed by this buffer will be freed by free() in the absence of a custom free callback, which leads ASAN to complain: ``` ================================================================= ==28555==ERROR: AddressSanitizer: alloc-dealloc-mismatch (operator new [] vs free) on 0x60200010fc70 #0 0x7f6c0e98fb6f in __interceptor_free ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123 Blizzard#1 0x127e3d4 in v8::internal::BackingStore::~BackingStore() (/usr/bin/node+0x127e3d4) Blizzard#2 0xc0934a in std::_Sp_counted_deleter<v8::BackingStore*, std::default_delete<v8::BackingStore>, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() (/usr/bin/node+0xc0934a) Blizzard#3 0x10a9fce in v8::internal::ArrayBufferSweeper::SweepingJob::SweepYoung() (/usr/bin/node+0x10a9fce) Blizzard#4 0x10aab2e in std::_Function_handler<void (), v8::internal::ArrayBufferSweeper::RequestSweep(v8::internal::ArrayBufferSweeper::SweepingType, v8::internal::ArrayBufferSweeper::TreatAllYoungAsPromoted)::{lambda()Blizzard#1}>::_M_invoke(std::_Any_data const&) (/usr/bin/node+0x10aab2e) Blizzard#5 0xd49370 in node::(anonymous namespace)::PlatformWorkerThread(void*) (/usr/bin/node+0xd49370) Blizzard#6 0x7f6c0e591ea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Blizzard#7 0x7f6c0e4afa6e in __clone (/lib/x86_64-linux-gnu/libc.so.6+0xfba6e) 0x60200010fc70 is located 0 bytes inside of 1-byte region [0x60200010fc70,0x60200010fc71) allocated by thread T0 here: #0 0x7f6c0e9917a7 in operator new[](unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:102 Blizzard#1 0x7f6963812e97 in NodeKafka::Producer::NodeProduce(Nan::FunctionCallbackInfo<v8::Value> const&) ../src/producer.cc:501 Blizzard#2 0x7f69637f18f7 in FunctionCallbackWrapper ../node_modules/nan/nan_callbacks_12_inl.h:177 Blizzard#3 0x7f6c0430d545 (/usr/bin/node+0x14e5545) Blizzard#4 0x7f6be45470df (<unknown module>) Blizzard#5 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#6 0x7f6be453ef57 (<unknown module>) Blizzard#7 0x7f6be454d668 (<unknown module>) Blizzard#8 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#9 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#10 0x7f6be453ce66 (<unknown module>) Blizzard#11 0x7f6be44fdf61 (<unknown module>) Blizzard#12 0x7f6be4543bee (<unknown module>) Blizzard#13 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#14 0x7f6be44fddea (<unknown module>) Blizzard#15 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#16 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#17 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#18 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#19 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#20 0x18e20db in Builtins_JSEntryTrampoline (/usr/bin/node+0x18e20db) Blizzard#21 0x18e1e02 in Builtins_JSEntry (/usr/bin/node+0x18e1e02) Blizzard#22 0x105f17a in v8::internal::(anonymous namespace)::Invoke(v8::internal::Isolate*, v8::internal::(anonymous namespace)::InvokeParams const&) (/usr/bin/node+0x105f17a) Blizzard#23 0x1060213 in v8::internal::Execution::Call(v8::internal::Isolate*, v8::internal::Handle<v8::internal::Object>, v8::internal::Handle<v8::internal::Object>, int, v8::internal::Handle<v8::internal::Object>*) (/usr/bin/node+0x1060213) Blizzard#24 0xf22a7c in v8::Function::Call(v8::Local<v8::Context>, v8::Local<v8::Value>, int, v8::Local<v8::Value>*) (/usr/bin/node+0xf22a7c) Blizzard#25 0xbc911a in node::InternalMakeCallback(node::Environment*, v8::Local<v8::Object>, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc911a) Blizzard#26 0xbc925d in node::MakeCallback(v8::Isolate*, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc925d) Blizzard#27 0x7f6963835c25 in Nan::AsyncResource::runInAsyncScope(v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*) ../node_modules/nan/nan.h:631 Blizzard#28 0x7f6963835c25 in Nan::Callback::Call_(v8::Isolate*, v8::Local<v8::Object>, int, v8::Local<v8::Value>*, Nan::AsyncResource*) const ../node_modules/nan/nan.h:1881 Blizzard#29 0x7f6963835c25 in Nan::Callback::Call(int, v8::Local<v8::Value>*) const ../node_modules/nan/nan.h:1825 Blizzard#30 0x7f6963835c25 in NodeKafka::Workers::ConnectionMetadata::HandleOKCallback() ../src/workers.cc:116 Blizzard#31 0x7f69637b6bac in Nan::AsyncWorker::WorkComplete() ../node_modules/nan/nan.h:2008 Blizzard#32 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*) ../node_modules/nan/nan.h:2365 Blizzard#33 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*, int) ../node_modules/nan/nan.h:2369 Blizzard#34 0x18bb75c in uv__work_done ../deps/uv/src/threadpool.c:329 Thread T4 created by T0 here: #0 0x7f6c0e93b2a2 in __interceptor_pthread_create ../../../../src/libsanitizer/asan/asan_interceptors.cpp:214 Blizzard#1 0x18cdf7b in uv_thread_create_ex ../deps/uv/src/unix/thread.c:172 Blizzard#2 0x18cdf7b in uv_thread_create ../deps/uv/src/unix/thread.c:126 Blizzard#3 0xd4c115 in node::WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int) (/usr/bin/node+0xd4c115) Blizzard#4 0xd4c3cb in node::NodePlatform::NodePlatform(int, v8::TracingController*, v8::PageAllocator*) (/usr/bin/node+0xd4c3cb) Blizzard#5 0xc6f785 in node::InitializeOncePerProcessInternal(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, node::ProcessInitializationFlags::Flags) (/usr/bin/node+0xc6f785) Blizzard#6 0xc70c83 in node::Start(int, char**) (/usr/bin/node+0xc70c83) Blizzard#7 0x7f6c0e3d7d09 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x23d09) SUMMARY: AddressSanitizer: alloc-dealloc-mismatch ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123 in __interceptor_free ``` Reproducer: ```js const { Producer } = require('.'); var producer = new Producer({ 'client.id': 'kafka-test', 'metadata.broker.list': '127.0.0.1:9092', 'dr_cb': true, 'debug': 'all' }); producer.connect({}, function(err) { if (err) { console.error('Error connecting to Kafka:', err); return; } producer.produce('test', null, Buffer.from(''), ''); producer.disconnect(); }); ``` There should not be a need to allocate a buffer in this scenario. Use a single static character array holding a null-terminator as the payload/key instead.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Rebalance callback can now be set like any other configuration value. If it is omitted, we rely on the internal
librdkafka
rebalance callback. If you set it totrue
, it will use a node implementation (and expose the event to you). If you provide a function, you will do the assignment yourself, and it will emit the event.