-
Notifications
You must be signed in to change notification settings - Fork 408
Do not change subscribed state from assign / unassign methods (#29) #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not change subscribed state from assign / unassign methods (#29) #30
Conversation
The purpose of that variable is to ensure that when you consume, you are subscribed to a set of topics. This is the only way I can track that. How does this behave when you remove this piece? Additionally, I believe there are other places where this variable is checked. If we are refactoring to get rid of it we should do it in all the other places as well. |
Removing it from This is what happens :
It makes sense to track If the purpose of this variable is to hack the lib in order to use the normal consumer (not the high level consumer) then maybe removing |
I just checked and they seem to be two different operations, maybe we should maintain two states instead of only having Whaddya think? |
I think if we stop If you update the PR to do that I can merge it in after I test it. |
Will work on that asap. Thanks |
- Do not update m_is_subscribed variable from assign / unassign method - Removed IsSubscribed call in Consumer::Consume & ConsumerConsumeLoop's execute method
8ec02d6
to
30973e5
Compare
@webmakersteve Does this seem good to you ? |
Producer::NodeProduce() creates a temporary buffer backed by a `new char[0]` for keys and payloads that were empty buffers, since node::Buffer::Data() returns NULL for these. The [NAN docs](https://github.com/nodejs/nan/blob/main/doc/buffers.md#nannewbuffer) indicate that the data managed by this buffer will be freed by free() in the absence of a custom free callback, which leads ASAN to complain: ``` ================================================================= ==28555==ERROR: AddressSanitizer: alloc-dealloc-mismatch (operator new [] vs free) on 0x60200010fc70 #0 0x7f6c0e98fb6f in __interceptor_free ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123 Blizzard#1 0x127e3d4 in v8::internal::BackingStore::~BackingStore() (/usr/bin/node+0x127e3d4) Blizzard#2 0xc0934a in std::_Sp_counted_deleter<v8::BackingStore*, std::default_delete<v8::BackingStore>, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() (/usr/bin/node+0xc0934a) Blizzard#3 0x10a9fce in v8::internal::ArrayBufferSweeper::SweepingJob::SweepYoung() (/usr/bin/node+0x10a9fce) Blizzard#4 0x10aab2e in std::_Function_handler<void (), v8::internal::ArrayBufferSweeper::RequestSweep(v8::internal::ArrayBufferSweeper::SweepingType, v8::internal::ArrayBufferSweeper::TreatAllYoungAsPromoted)::{lambda()Blizzard#1}>::_M_invoke(std::_Any_data const&) (/usr/bin/node+0x10aab2e) Blizzard#5 0xd49370 in node::(anonymous namespace)::PlatformWorkerThread(void*) (/usr/bin/node+0xd49370) Blizzard#6 0x7f6c0e591ea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Blizzard#7 0x7f6c0e4afa6e in __clone (/lib/x86_64-linux-gnu/libc.so.6+0xfba6e) 0x60200010fc70 is located 0 bytes inside of 1-byte region [0x60200010fc70,0x60200010fc71) allocated by thread T0 here: #0 0x7f6c0e9917a7 in operator new[](unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:102 Blizzard#1 0x7f6963812e97 in NodeKafka::Producer::NodeProduce(Nan::FunctionCallbackInfo<v8::Value> const&) ../src/producer.cc:501 Blizzard#2 0x7f69637f18f7 in FunctionCallbackWrapper ../node_modules/nan/nan_callbacks_12_inl.h:177 Blizzard#3 0x7f6c0430d545 (/usr/bin/node+0x14e5545) Blizzard#4 0x7f6be45470df (<unknown module>) Blizzard#5 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#6 0x7f6be453ef57 (<unknown module>) Blizzard#7 0x7f6be454d668 (<unknown module>) Blizzard#8 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#9 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#10 0x7f6be453ce66 (<unknown module>) Blizzard#11 0x7f6be44fdf61 (<unknown module>) Blizzard#12 0x7f6be4543bee (<unknown module>) Blizzard#13 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#14 0x7f6be44fddea (<unknown module>) Blizzard#15 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#16 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#17 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#18 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#19 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b) Blizzard#20 0x18e20db in Builtins_JSEntryTrampoline (/usr/bin/node+0x18e20db) Blizzard#21 0x18e1e02 in Builtins_JSEntry (/usr/bin/node+0x18e1e02) Blizzard#22 0x105f17a in v8::internal::(anonymous namespace)::Invoke(v8::internal::Isolate*, v8::internal::(anonymous namespace)::InvokeParams const&) (/usr/bin/node+0x105f17a) Blizzard#23 0x1060213 in v8::internal::Execution::Call(v8::internal::Isolate*, v8::internal::Handle<v8::internal::Object>, v8::internal::Handle<v8::internal::Object>, int, v8::internal::Handle<v8::internal::Object>*) (/usr/bin/node+0x1060213) Blizzard#24 0xf22a7c in v8::Function::Call(v8::Local<v8::Context>, v8::Local<v8::Value>, int, v8::Local<v8::Value>*) (/usr/bin/node+0xf22a7c) Blizzard#25 0xbc911a in node::InternalMakeCallback(node::Environment*, v8::Local<v8::Object>, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc911a) Blizzard#26 0xbc925d in node::MakeCallback(v8::Isolate*, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc925d) Blizzard#27 0x7f6963835c25 in Nan::AsyncResource::runInAsyncScope(v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*) ../node_modules/nan/nan.h:631 Blizzard#28 0x7f6963835c25 in Nan::Callback::Call_(v8::Isolate*, v8::Local<v8::Object>, int, v8::Local<v8::Value>*, Nan::AsyncResource*) const ../node_modules/nan/nan.h:1881 Blizzard#29 0x7f6963835c25 in Nan::Callback::Call(int, v8::Local<v8::Value>*) const ../node_modules/nan/nan.h:1825 Blizzard#30 0x7f6963835c25 in NodeKafka::Workers::ConnectionMetadata::HandleOKCallback() ../src/workers.cc:116 Blizzard#31 0x7f69637b6bac in Nan::AsyncWorker::WorkComplete() ../node_modules/nan/nan.h:2008 Blizzard#32 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*) ../node_modules/nan/nan.h:2365 Blizzard#33 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*, int) ../node_modules/nan/nan.h:2369 Blizzard#34 0x18bb75c in uv__work_done ../deps/uv/src/threadpool.c:329 Thread T4 created by T0 here: #0 0x7f6c0e93b2a2 in __interceptor_pthread_create ../../../../src/libsanitizer/asan/asan_interceptors.cpp:214 Blizzard#1 0x18cdf7b in uv_thread_create_ex ../deps/uv/src/unix/thread.c:172 Blizzard#2 0x18cdf7b in uv_thread_create ../deps/uv/src/unix/thread.c:126 Blizzard#3 0xd4c115 in node::WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int) (/usr/bin/node+0xd4c115) Blizzard#4 0xd4c3cb in node::NodePlatform::NodePlatform(int, v8::TracingController*, v8::PageAllocator*) (/usr/bin/node+0xd4c3cb) Blizzard#5 0xc6f785 in node::InitializeOncePerProcessInternal(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, node::ProcessInitializationFlags::Flags) (/usr/bin/node+0xc6f785) Blizzard#6 0xc70c83 in node::Start(int, char**) (/usr/bin/node+0xc70c83) Blizzard#7 0x7f6c0e3d7d09 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x23d09) SUMMARY: AddressSanitizer: alloc-dealloc-mismatch ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123 in __interceptor_free ``` Reproducer: ```js const { Producer } = require('.'); var producer = new Producer({ 'client.id': 'kafka-test', 'metadata.broker.list': '127.0.0.1:9092', 'dr_cb': true, 'debug': 'all' }); producer.connect({}, function(err) { if (err) { console.error('Error connecting to Kafka:', err); return; } producer.produce('test', null, Buffer.from(''), ''); producer.disconnect(); }); ``` There should not be a need to allocate a buffer in this scenario. Use a single static character array holding a null-terminator as the payload/key instead.
Changing m_is_subscribed state on
assign
andunassign
causes unrecoverableRdKafka::ERR__STATE
error when consuming.#29