[Bug](runtime-filter) add lock for RuntimeFilterConsumer::acquire_expr/signal to avoid mult…#49739
Merged
BiteTheDDDDt merged 1 commit intoapache:masterfrom Apr 7, 2025
Merged
Conversation
Contributor
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
Contributor
Author
|
run buildall |
Gabriel39
previously approved these changes
Apr 2, 2025
Contributor
|
PR approved by at least one committer and no changes requested. |
Contributor
|
PR approved by anyone and no changes requested. |
Contributor
Author
|
run buildall |
Contributor
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
Gabriel39
previously approved these changes
Apr 3, 2025
Contributor
|
PR approved by at least one committer and no changes requested. |
HappenLee
reviewed
Apr 3, 2025
| DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1); | ||
| _profile->add_info_string("ReachTimeoutLimit", "true"); | ||
| std::unique_lock<std::mutex> l(_mtx); | ||
| _set_state(State::TIMEOUT); |
Contributor
There was a problem hiding this comment.
maybe set TIMEOUT/ after set READY
Contributor
Author
|
run buildall |
Contributor
Author
|
run buildall |
Contributor
Author
|
run buildall |
1 similar comment
Contributor
Author
|
run buildall |
…ithread read and write wrapper add comment update update ut update update comment update fix update update
Contributor
Author
|
run buildall |
Contributor
|
PR approved by at least one committer and no changes requested. |
yiguolei
approved these changes
Apr 7, 2025
koarz
pushed a commit
to koarz/doris
that referenced
this pull request
Jun 4, 2025
…r/signal to avoid mult… (apache#49739) …ithread read and write wrapper ### What problem does this PR solve? ```cpp ================================================================= ==28655==ERROR: AddressSanitizer: heap-use-after-free on address 0x61000331f350 at pc 0x5577863cea6c bp 0x7fac1d2a9ef0 sp 0x7fac1d2a9ee8 READ of size 1 at 0x61000331f350 thread T1933 (Pipe_normal [wo) #0 0x5577863cea6b in doris::RuntimeFilterWrapper::debug_string[abi:cxx11]() /root/doris/be/src/runtime_filter/runtime_filter_wrapper.cpp:585:46 apache#1 0x55778632f8f5 in doris::RuntimeFilter::_debug_string[abi:cxx11]() const /root/doris/be/src/runtime_filter/runtime_filter.cpp:128:61 apache#2 0x55778638d946 in doris::RuntimeFilterConsumer::debug_string[abi:cxx11]() const /root/doris/be/src/runtime_filter/runtime_filter_consumer.h:61:57 apache#3 0x55778634ed24 in doris::RuntimeFilterConsumer::_set_state(doris::RuntimeFilterConsumer::State) /root/doris/be/src/runtime_filter/runtime_filter_consumer.h:125:43 apache#4 0x55778634c6fa in doris::RuntimeFilterConsumer::acquire_expr(std::vector<std::shared_ptr<doris::vectorized::VRuntimeFilterWrapper>, std::allocator<std::shared_ptr<doris::vectorized::VRuntimeFilterWrapper> > >&) /root/doris/be/src/runtime_filter/runtime_filter_consumer.cpp:67:9 apache#5 0x5577a1fbdd1b in doris::RuntimeFilterConsumerHelper::acquire_runtime_filter(std::vector<std::shared_ptr<doris::vectorized::VExprContext>, std::allocator<std::shared_ptr<doris::vectorized::VExprContext> > >&) /root/doris/be/src/runtime_filter/runtime_filter_consumer_helper.cpp:90:9 apache#6 0x5577b9aec26c in doris::pipeline::ScanLocalState<doris::pipeline::OlapScanLocalState>::open(doris::RuntimeState*) /root/doris/be/src/pipeline/exec/scan_operator.cpp:100:5 apache#7 0x5577ba853e05 in doris::pipeline::PipelineTask::_open() /root/doris/be/src/pipeline/pipeline_task.cpp:217:32 apache#8 0x5577ba8608e0 in doris::pipeline::PipelineTask::execute(bool*) /root/doris/be/src/pipeline/pipeline_task.cpp:407:9 apache#9 0x5577ba8b4bb5 in doris::pipeline::TaskScheduler::_do_work(int) /root/doris/be/src/pipeline/task_scheduler.cpp:147:9 apache#10 0x5577869c537c in doris::ThreadPool::dispatch_thread() /root/doris/be/src/util/threadpool.cpp:619:24 apache#11 0x55778699ba1e in std::function<void ()>::operator()() const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9 apache#12 0x55778699ba1e in doris::Thread::supervise_thread(void*) /root/doris/be/src/util/thread.cpp:496:5 apache#13 0x7fb57bef3608 in start_thread /build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8 apache#14 0x7fb57c1a0132 in __clone /build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95 0x61000331f350 is located 16 bytes inside of 192-byte region [0x61000331f340,0x61000331f400) freed by thread T817 (brpc_light) here: #0 0x557781e1c80d in operator delete(void*) (/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x3395280d) (BuildId: 45c9f8cc52f91b28) apache#1 0x557781e778b8 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::operator=(std::__shared_count<(__gnu_cxx::_Lock_policy)2> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:721:15 apache#2 0x55778634d1d1 in std::__shared_ptr<doris::RuntimeFilterWrapper, (__gnu_cxx::_Lock_policy)2>::operator=(std::__shared_ptr<doris::RuntimeFilterWrapper, (__gnu_cxx::_Lock_policy)2> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1148:69 apache#3 0x55778634d1d1 in std::shared_ptr<doris::RuntimeFilterWrapper>::operator=(std::shared_ptr<doris::RuntimeFilterWrapper> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:359:65 apache#4 0x55778634d1d1 in doris::RuntimeFilterConsumer::signal(doris::RuntimeFilter*) /root/doris/be/src/runtime_filter/runtime_filter_consumer.cpp:76:14 apache#5 0x557785dd52c3 in auto doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0::operator()<std::shared_ptr<doris::RuntimeFilterConsumer> >(std::shared_ptr<doris::RuntimeFilterConsumer>&) const /root/doris/be/src/runtime/fragment_mgr.cpp:1295:72 apache#6 0x557785dd52c3 in void std::__invoke_impl<void, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&>(std::__invoke_other, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14 apache#7 0x557785dd52c3 in std::__invoke_result<doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&>::type std::__invoke<doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&>(doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0&, std::shared_ptr<doris::RuntimeFilterConsumer>&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:96:14 apache#8 0x557785dd52c3 in std::ranges::in_fun_result<__gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0> std::ranges::__for_each_fn::operator()<__gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, __gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, std::identity, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0>(__gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, __gnu_cxx::__normal_iterator<std::shared_ptr<doris::RuntimeFilterConsumer>*, std::vector<std::shared_ptr<doris::RuntimeFilterConsumer>, std::allocator<std::shared_ptr<doris::RuntimeFilterConsumer> > > >, doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*)::$_0, std::identity) const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/ranges_algo.h:187:4 apache#9 0x557785dd52c3 in _ZNKSt6ranges13__for_each_fnclIRSt6vectorISt10shared_ptrIN5doris21RuntimeFilterConsumerEESaIS6_EESt8identityZNS4_11FragmentMgr14apply_filterv2EPKNS4_23PPublishFilterRequestV2EPN5butil26IOBufAsZeroCopyInputStreamEE3$_0EENS_13in_fun_resultINSt11conditionalIL_ZNS_14borrowed_rangeIT_EEEDTclsr8__detailE14__ranges_beginclsr3stdE7declvalIRSM_EEEENS_8danglingEE4typeET1_EEOSM_SS_T0_ /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/ranges_algo.h:197:9 apache#10 0x557785dd52c3 in doris::FragmentMgr::apply_filterv2(doris::PPublishFilterRequestV2 const*, butil::IOBufAsZeroCopyInputStream*) /root/doris/be/src/runtime/fragment_mgr.cpp:1295:13 apache#11 0x5577865a517e in doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0::operator()() const /root/doris/be/src/service/internal_service.cpp:1444:48 apache#12 0x5577865a517e in void std::__invoke_impl<void, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&>(std::__invoke_other, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14 apache#13 0x5577865a517e in std::enable_if<is_invocable_r_v<void, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&>, void>::type std::__invoke_r<void, doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&>(doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:111:2 apache#14 0x5577865a517e in std::_Function_handler<void (), doris::PInternalService::apply_filterv2(google::protobuf::RpcController*, doris::PPublishFilterRequestV2 const*, doris::PPublishFilterResponse*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291:9 apache#15 0x5577865f07c1 in std::function<void ()>::operator()() const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9 apache#16 0x5577865f07c1 in doris::WorkThreadPool<false>::work_thread(int) /root/doris/be/src/util/work_thread_pool.hpp:158:17 apache#17 0x5577bddb233f in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18 previously allocated by thread T781 (brpc_light) here: #0 0x557781e1bfad in operator new(unsigned long) (/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x33951fad) (BuildId: 45c9f8cc52f91b28) apache#1 0x5577863411ae in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/new_allocator.h:121:27 apache#2 0x5577863411ae in std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/allocator.h:173:32 apache#3 0x5577863411ae in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:460:20 apache#4 0x5577863411ae in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, (__gnu_cxx::_Lock_policy)2> >&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/allocated_ptr.h:97:21 apache#5 0x5577863411ae in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(doris::RuntimeFilterWrapper*&, std::_Sp_alloc_shared_tag<std::allocator<doris::RuntimeFilterWrapper> >, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:648:19 apache#6 0x55778632e555 in std::__shared_ptr<doris::RuntimeFilterWrapper, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(std::_Sp_alloc_shared_tag<std::allocator<doris::RuntimeFilterWrapper> >, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1337:14 apache#7 0x55778632e555 in std::shared_ptr<doris::RuntimeFilterWrapper>::shared_ptr<std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(std::_Sp_alloc_shared_tag<std::allocator<doris::RuntimeFilterWrapper> >, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:409:4 apache#8 0x55778632e555 in std::shared_ptr<doris::RuntimeFilterWrapper> std::allocate_shared<doris::RuntimeFilterWrapper, std::allocator<doris::RuntimeFilterWrapper>, doris::RuntimeFilterParams*>(std::allocator<doris::RuntimeFilterWrapper> const&, doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:860:14 apache#9 0x55778632e555 in std::shared_ptr<doris::RuntimeFilterWrapper> std::make_shared<doris::RuntimeFilterWrapper, doris::RuntimeFilterParams*>(doris::RuntimeFilterParams*&&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:876:14 apache#10 0x55778632e555 in doris::RuntimeFilter::_init_with_desc(doris::TRuntimeFilterDesc const*, doris::TQueryOptions const*) /root/doris/be/src/runtime_filter/runtime_filter.cpp:123:16 apache#11 0x5577863794de in doris::RuntimeFilterConsumer::create(doris::RuntimeFilterParamsContext*, doris::TRuntimeFilterDesc const*, int, std::shared_ptr<doris::RuntimeFilterConsumer>*, doris::RuntimeProfile*) /root/doris/be/src/runtime_filter/runtime_filter_consumer.h:46:9 apache#12 0x55778636bb9f in doris::RuntimeFilterMgr::register_consumer_filter(doris::TRuntimeFilterDesc const&, int, std::shared_ptr<doris::RuntimeFilterConsumer>*, doris::RuntimeProfile*) /root/doris/be/src/runtime_filter/runtime_filter_mgr.cpp:80:5 apache#13 0x557786147c10 in doris::RuntimeState::register_consumer_runtime_filter(doris::TRuntimeFilterDesc const&, bool, int, std::shared_ptr<doris::RuntimeFilterConsumer>*, doris::RuntimeProfile*) /root/doris/be/src/runtime/runtime_state.cpp:514:17 apache#14 0x5577a1fbc3e1 in doris::RuntimeFilterConsumerHelper::_register_runtime_filter(bool) /root/doris/be/src/runtime_filter/runtime_filter_consumer_helper.cpp:51:9 ``` ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [x] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
…ithread read and write wrapper
What problem does this PR solve?
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)