Skip to content

Do not change subscribed state from assign / unassign methods (#29) #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

ArtemisMucaj
Copy link
Contributor

@ArtemisMucaj ArtemisMucaj commented Sep 20, 2016

Changing m_is_subscribed state on assign and unassign causes unrecoverable RdKafka::ERR__STATE error when consuming.
#29

@ArtemisMucaj ArtemisMucaj changed the title Do not change subscribed behavior from assign / unassign methods (#29) Do not change subscribed state from assign / unassign methods (#29) Sep 20, 2016
@webmakersteve
Copy link
Contributor

webmakersteve commented Sep 21, 2016

The purpose of that variable is to ensure that when you consume, you are subscribed to a set of topics. This is the only way I can track that. How does this behave when you remove this piece?

Additionally, I believe there are other places where this variable is checked. If we are refactoring to get rid of it we should do it in all the other places as well.

@ArtemisMucaj
Copy link
Contributor Author

ArtemisMucaj commented Sep 21, 2016

Removing it from assign and unassign methods fixes the issue. I don't think it's a good practice to modify your m_subscribed state here as unassigning / assigning doesn't mean that you're subscribed / unsubscribed to a topic!

This is what happens :

  1. reblance causes unassign to be called
  2. Consume method will eventually throw ERR__STATE (https://github.com/Blizzard/node-rdkafka/blob/master/src/consumer.cc#L260) depending on wheter RdKafka::KafkaConsumer has queued a rebalance operation or not
  3. If no operation was queued then we'll never call assign again and m_subscribed state will be stuck to false even though librdkafka is subscribed

It makes sense to track subscribedstate but not to change it on assign / unassign.

If the purpose of this variable is to hack the lib in order to use the normal consumer (not the high level consumer) then maybe removing IsSubscribed() in Consume would be a better solution (https://github.com/Blizzard/node-rdkafka/blob/master/src/consumer.cc#L253)

@ArtemisMucaj
Copy link
Contributor Author

ArtemisMucaj commented Sep 21, 2016

subscribe ->https://github.com/edenhill/librdkafka/blob/62bef22212ad5218bd08d6231184c49d36a2d98b/src/rdkafka_subscription.c#L52
assign -> https://github.com/edenhill/librdkafka/blob/62bef22212ad5218bd08d6231184c49d36a2d98b/src/rdkafka_subscription.c#L70

I just checked and they seem to be two different operations, maybe we should maintain two states instead of only having m_is_subscribed.

Whaddya think?

@webmakersteve
Copy link
Contributor

I think if we stop assign and unassign from modifying that variable and not relying on it to check the inner state before consumption (relying on librdkafka to do that for us) is the best solution. We can still use the value to determine whether the user has actually called subscribe to stop it from being called subsequently instead of failing silently.

If you update the PR to do that I can merge it in after I test it.

@ArtemisMucaj
Copy link
Contributor Author

Will work on that asap.

Thanks

- Do not update m_is_subscribed variable from assign / unassign method
- Removed IsSubscribed call in Consumer::Consume & ConsumerConsumeLoop's
  execute method
@ArtemisMucaj ArtemisMucaj force-pushed the change-subscribed-state-behavior branch from 8ec02d6 to 30973e5 Compare September 22, 2016 19:13
@ArtemisMucaj
Copy link
Contributor Author

@webmakersteve Does this seem good to you ?

@webmakersteve webmakersteve merged commit 908a2fa into Blizzard:master Sep 25, 2016
mszabo-wikia added a commit to mszabo-wikia/node-rdkafka that referenced this pull request May 23, 2025
Producer::NodeProduce() creates a temporary buffer backed by
a `new char[0]` for keys and payloads that were empty buffers, since
node::Buffer::Data() returns NULL for these.

The [NAN docs](https://github.com/nodejs/nan/blob/main/doc/buffers.md#nannewbuffer)
indicate that the data managed by this buffer will be freed by free() in
the absence of a custom free callback, which leads ASAN to complain:
```
    =================================================================
==28555==ERROR: AddressSanitizer: alloc-dealloc-mismatch (operator new [] vs free) on 0x60200010fc70

    #0 0x7f6c0e98fb6f in __interceptor_free ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123
    Blizzard#1 0x127e3d4 in v8::internal::BackingStore::~BackingStore() (/usr/bin/node+0x127e3d4)
    Blizzard#2 0xc0934a in std::_Sp_counted_deleter<v8::BackingStore*, std::default_delete<v8::BackingStore>, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() (/usr/bin/node+0xc0934a)
    Blizzard#3 0x10a9fce in v8::internal::ArrayBufferSweeper::SweepingJob::SweepYoung() (/usr/bin/node+0x10a9fce)
    Blizzard#4 0x10aab2e in std::_Function_handler<void (), v8::internal::ArrayBufferSweeper::RequestSweep(v8::internal::ArrayBufferSweeper::SweepingType, v8::internal::ArrayBufferSweeper::TreatAllYoungAsPromoted)::{lambda()Blizzard#1}>::_M_invoke(std::_Any_data const&) (/usr/bin/node+0x10aab2e)
    Blizzard#5 0xd49370 in node::(anonymous namespace)::PlatformWorkerThread(void*) (/usr/bin/node+0xd49370)
    Blizzard#6 0x7f6c0e591ea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
    Blizzard#7 0x7f6c0e4afa6e in __clone (/lib/x86_64-linux-gnu/libc.so.6+0xfba6e)

0x60200010fc70 is located 0 bytes inside of 1-byte region [0x60200010fc70,0x60200010fc71)
allocated by thread T0 here:
    #0 0x7f6c0e9917a7 in operator new[](unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:102
    Blizzard#1 0x7f6963812e97 in NodeKafka::Producer::NodeProduce(Nan::FunctionCallbackInfo<v8::Value> const&) ../src/producer.cc:501
    Blizzard#2 0x7f69637f18f7 in FunctionCallbackWrapper ../node_modules/nan/nan_callbacks_12_inl.h:177
    Blizzard#3 0x7f6c0430d545  (/usr/bin/node+0x14e5545)
    Blizzard#4 0x7f6be45470df  (<unknown module>)
    Blizzard#5 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#6 0x7f6be453ef57  (<unknown module>)
    Blizzard#7 0x7f6be454d668  (<unknown module>)
    Blizzard#8 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#9 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#10 0x7f6be453ce66  (<unknown module>)
    Blizzard#11 0x7f6be44fdf61  (<unknown module>)
    Blizzard#12 0x7f6be4543bee  (<unknown module>)
    Blizzard#13 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#14 0x7f6be44fddea  (<unknown module>)
    Blizzard#15 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#16 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#17 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#18 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#19 0x18e3d1b in Builtins_InterpreterEntryTrampoline (/usr/bin/node+0x18e3d1b)
    Blizzard#20 0x18e20db in Builtins_JSEntryTrampoline (/usr/bin/node+0x18e20db)
    Blizzard#21 0x18e1e02 in Builtins_JSEntry (/usr/bin/node+0x18e1e02)
    Blizzard#22 0x105f17a in v8::internal::(anonymous namespace)::Invoke(v8::internal::Isolate*, v8::internal::(anonymous namespace)::InvokeParams const&) (/usr/bin/node+0x105f17a)
    Blizzard#23 0x1060213 in v8::internal::Execution::Call(v8::internal::Isolate*, v8::internal::Handle<v8::internal::Object>, v8::internal::Handle<v8::internal::Object>, int, v8::internal::Handle<v8::internal::Object>*) (/usr/bin/node+0x1060213)
    Blizzard#24 0xf22a7c in v8::Function::Call(v8::Local<v8::Context>, v8::Local<v8::Value>, int, v8::Local<v8::Value>*) (/usr/bin/node+0xf22a7c)
    Blizzard#25 0xbc911a in node::InternalMakeCallback(node::Environment*, v8::Local<v8::Object>, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc911a)
    Blizzard#26 0xbc925d in node::MakeCallback(v8::Isolate*, v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*, node::async_context) (/usr/bin/node+0xbc925d)
    Blizzard#27 0x7f6963835c25 in Nan::AsyncResource::runInAsyncScope(v8::Local<v8::Object>, v8::Local<v8::Function>, int, v8::Local<v8::Value>*) ../node_modules/nan/nan.h:631
    Blizzard#28 0x7f6963835c25 in Nan::Callback::Call_(v8::Isolate*, v8::Local<v8::Object>, int, v8::Local<v8::Value>*, Nan::AsyncResource*) const ../node_modules/nan/nan.h:1881
    Blizzard#29 0x7f6963835c25 in Nan::Callback::Call(int, v8::Local<v8::Value>*) const ../node_modules/nan/nan.h:1825
    Blizzard#30 0x7f6963835c25 in NodeKafka::Workers::ConnectionMetadata::HandleOKCallback() ../src/workers.cc:116
    Blizzard#31 0x7f69637b6bac in Nan::AsyncWorker::WorkComplete() ../node_modules/nan/nan.h:2008
    Blizzard#32 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*) ../node_modules/nan/nan.h:2365
    Blizzard#33 0x7f69637b6bac in Nan::AsyncExecuteComplete(uv_work_s*, int) ../node_modules/nan/nan.h:2369
    Blizzard#34 0x18bb75c in uv__work_done ../deps/uv/src/threadpool.c:329

Thread T4 created by T0 here:
    #0 0x7f6c0e93b2a2 in __interceptor_pthread_create ../../../../src/libsanitizer/asan/asan_interceptors.cpp:214
    Blizzard#1 0x18cdf7b in uv_thread_create_ex ../deps/uv/src/unix/thread.c:172
    Blizzard#2 0x18cdf7b in uv_thread_create ../deps/uv/src/unix/thread.c:126
    Blizzard#3 0xd4c115 in node::WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int) (/usr/bin/node+0xd4c115)
    Blizzard#4 0xd4c3cb in node::NodePlatform::NodePlatform(int, v8::TracingController*, v8::PageAllocator*) (/usr/bin/node+0xd4c3cb)
    Blizzard#5 0xc6f785 in node::InitializeOncePerProcessInternal(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, node::ProcessInitializationFlags::Flags) (/usr/bin/node+0xc6f785)
    Blizzard#6 0xc70c83 in node::Start(int, char**) (/usr/bin/node+0xc70c83)
    Blizzard#7 0x7f6c0e3d7d09 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x23d09)

SUMMARY: AddressSanitizer: alloc-dealloc-mismatch ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:123 in __interceptor_free
```

Reproducer:
```js
const { Producer } = require('.');
var producer = new Producer({
  'client.id': 'kafka-test',
  'metadata.broker.list': '127.0.0.1:9092',
  'dr_cb': true,
  'debug': 'all'
});
producer.connect({}, function(err) {
  if (err) {
    console.error('Error connecting to Kafka:', err);
    return;
  }
  producer.produce('test', null, Buffer.from(''), '');

  producer.disconnect();
});
```

There should not be a need to allocate a buffer in this scenario.
Use a single static character array holding a null-terminator as the
payload/key instead.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants