Skip to content

Refactor rebalance callback to more pure implementation #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 27, 2016

Conversation

webmakersteve
Copy link
Contributor

Rebalance callback can now be set like any other configuration value. If it is omitted, we rely on the internal librdkafka rebalance callback. If you set it to true, it will use a node implementation (and expose the event to you). If you provide a function, you will do the assignment yourself, and it will emit the event.

@webmakersteve webmakersteve merged commit f709184 into master Sep 27, 2016
@webmakersteve webmakersteve deleted the node-rebalance branch September 27, 2016 23:14
mszabo-wikia added a commit to mszabo-wikia/node-rdkafka that referenced this pull request May 23, 2025
Producer::NodeProduce() creates a temporary buffer backed by
a `new char[0]` for keys and payloads that were empty buffers, since
node::Buffer::Data() returns NULL for these.

The [NAN docs](https://github.com/nodejs/nan/blob/main/doc/buffers.md#nannewbuffer)
indicate that the data managed by this buffer will be freed by free() in
the absence of a custom free callback, which leads ASAN to complain:
```
    =================================================================
==28555==ERROR: AddressSanitizer: alloc-dealloc-mismatch (operator new [] vs free) on 0x60200010fc70

    #0 0x7f6c0e98fb6f in __interceptor_free ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123
    Blizzard#1 0x127e3d4 in v8::internal::BackingStore::~BackingStore() (/usr/bin/node+0x127e3d4)
    Blizzard#2 0xc0934a in std::_Sp_counted_deleter<v8::BackingStore*, std::default_delete<v8::BackingStore>, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() (/usr/bin/node+0xc0934a)
    Blizzard#3 0x10a9fce in v8::internal::ArrayBufferSweeper::SweepingJob::SweepYoung() (/usr/bin/node+0x10a9fce)
    Blizzard#4 0x10aab2e in std::_Function_handler<void (), v8::internal::ArrayBufferSweeper::RequestSweep(v8::internal::ArrayBufferSweeper::SweepingType, v8::internal::ArrayBufferSweeper::TreatAllYoungAsPromoted)::{lambda()Blizzard#1}>::_M_invoke(std::_Any_data const&) (/usr/bin/node+0x10aab2e)
    Blizzard#5 0xd49370 in node::(anonymous namespace)::PlatformWorkerThread(void*) (/usr/bin/node+0xd49370)
    Blizzard#6 0x7f6c0e591ea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
    Blizzard#7 0x7f6c0e4afa6e in __clone (/lib/x86_64-linux-gnu/libc.so.6+0xfba6e)

0x60200010fc70 is located 0 bytes inside of 1-byte region [0x60200010fc70,0x60200010fc71)
allocated by thread T0 here:
    #0 0x7f6c0e9917a7 in operator new[](unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:102
    Blizzard#1 0x7f6963812e97 in NodeKafka::Producer::NodeProduce(Nan::FunctionCallbackInfo<v8::Value> const&) ../src/producer.cc:501
    Blizzard#2 0x7f69637f18f7 in FunctionCallbackWrapper ../node_modules/nan/nan_callbacks_12_inl.h:177
    Blizzard#3 0x7f6c0430d545  (/usr/bin/node+0x14e5545)
    Blizzard#4 0x7f6be45470df  (<unknown module>)
    Blizzard#5 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#6 0x7f6be453ef57  (<unknown module>)
    Blizzard#7 0x7f6be454d668  (<unknown module>)
    Blizzard#8 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#9 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#10 0x7f6be453ce66  (<unknown module>)
    Blizzard#11 0x7f6be44fdf61  (<unknown module>)
    Blizzard#12 0x7f6be4543bee  (<unknown module>)
    Blizzard#13 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#14 0x7f6be44fddea  (<unknown module>)
    Blizzard#15 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#16 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#17 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#18 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#19 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#20 0x18e20db in Builtins_JSEntryTrampoline (/usr/bin/node+0x18e20db)
    Blizzard#21 0x18e1e02 in Builtins_JSEntry (/usr/bin/node+0x18e1e02)
    Blizzard#22 0x105f17a in v8::internal::(anonymous namespace)::Invoke(v8::internal::Isolate*, v8::internal::(anonymous namespace)::InvokeParams const&) (/usr/bin/node+0x105f17a)
    Blizzard#23 0x1060213 in v8::internal::Execution::Call(v8::internal::Isolate*, v8::internal::Handle<v8::internal::Object>, v8::internal::Handle<v8::internal::Object>, int, v8::internal::Handle<v8::internal::Object>*) (/usr/bin/node+0x1060213)
    Blizzard#24 0xf22a7c in v8::Function::Call(v8::Local<v8::Context>, v8::Local<v8::Value>, int, v8::Local<v8::Value>*) (/usr/bin/node+0xf22a7c)
    Blizzard#25 0xbc911a in node::InternalMakeCallback(node::Environment*, v8::Local<v8::Object>, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc911a)
    Blizzard#26 0xbc925d in node::MakeCallback(v8::Isolate*, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc925d)
    Blizzard#27 0x7f6963835c25 in Nan::AsyncResource::runInAsyncScope(v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*) ../node_modules/nan/nan.h:631
    Blizzard#28 0x7f6963835c25 in Nan::Callback::Call_(v8::Isolate*, v8::Local<v8::Object>, int, v8::Local<v8::Value>*, Nan::AsyncResource*) const ../node_modules/nan/nan.h:1881
    Blizzard#29 0x7f6963835c25 in Nan::Callback::Call(int, v8::Local<v8::Value>*) const ../node_modules/nan/nan.h:1825
    Blizzard#30 0x7f6963835c25 in NodeKafka::Workers::ConnectionMetadata::HandleOKCallback() ../src/workers.cc:116
    Blizzard#31 0x7f69637b6bac in Nan::AsyncWorker::WorkComplete() ../node_modules/nan/nan.h:2008
    Blizzard#32 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*) ../node_modules/nan/nan.h:2365
    Blizzard#33 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*, int) ../node_modules/nan/nan.h:2369
    Blizzard#34 0x18bb75c in uv__work_done ../deps/uv/src/threadpool.c:329

Thread T4 created by T0 here:
    #0 0x7f6c0e93b2a2 in __interceptor_pthread_create ../../../../src/libsanitizer/asan/asan_interceptors.cpp:214
    Blizzard#1 0x18cdf7b in uv_thread_create_ex ../deps/uv/src/unix/thread.c:172
    Blizzard#2 0x18cdf7b in uv_thread_create ../deps/uv/src/unix/thread.c:126
    Blizzard#3 0xd4c115 in node::WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int) (/usr/bin/node+0xd4c115)
    Blizzard#4 0xd4c3cb in node::NodePlatform::NodePlatform(int, v8::TracingController*, v8::PageAllocator*) (/usr/bin/node+0xd4c3cb)
    Blizzard#5 0xc6f785 in node::InitializeOncePerProcessInternal(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, node::ProcessInitializationFlags::Flags) (/usr/bin/node+0xc6f785)
    Blizzard#6 0xc70c83 in node::Start(int, char**) (/usr/bin/node+0xc70c83)
    Blizzard#7 0x7f6c0e3d7d09 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x23d09)

SUMMARY: AddressSanitizer: alloc-dealloc-mismatch ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123 in __interceptor_free
```

Reproducer:
```js
const { Producer } = require('.');
var producer = new Producer({
  'client.id': 'kafka-test',
  'metadata.broker.list': '127.0.0.1:9092',
  'dr_cb': true,
  'debug': 'all'
});
producer.connect({}, function(err) {
  if (err) {
    console.error('Error connecting to Kafka:', err);
    return;
  }
  producer.produce('test', null, Buffer.from(''), '');

  producer.disconnect();
});
```

There should not be a need to allocate a buffer in this scenario.
Use a single static character array holding a null-terminator as the
payload/key instead.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant