Skip to content

[C++][Acero] Threading issues in the groupby node #46227

@jtanx

Description

@jtanx

Describe the bug, including details regarding any error messages, version, and platform.

We have been intermittently noticing deadlocks in a fairly simple graph that uses the aggregation node. We've had difficulty in reliably reproducing the deadlock (where the main thread ends up indefinitely waiting on the exec plan's finished future while all the other threads are idle).

In trying to make a smaller test case, I have noticed what appears to be some pretty significant threading issues in the implementation of the GroupByNode.

Referring to the attached test case built with arrow at this revision: 18e8f50:
test.zip

This performs the following aggregation:

ac::TableSourceNodeOptions srcOpts(tb);
  ac::AggregateNodeOptions aggOpts(
      {{"hash_min", nullptr, "avg_d_kbps", "min_avg_d_kbps"}}, {"year", "devices"});

  auto decl = ac::Declaration::Sequence({
      {"table_source", std::move(srcOpts)},
      {"aggregate", std::move(aggOpts)},
  });

Running this under helgrind with valgrind --log-file=helgrind.log --tool=helgrind --history-backtrace-size=100 ./testit, it reports the following trace (among many others):

==42024== ----------------------------------------------------------------
==42024== 
==42024== Possible data race during read of size 8 at 0x57DEA30 by thread #4
==42024== Locks held: none
==42024==    at 0x406E71: _M_ptr (unique_ptr.h:199)
==42024==    by 0x406E71: get (unique_ptr.h:470)
==42024==    by 0x406E71: operator bool (unique_ptr.h:487)
==42024==    by 0x406E71: arrow::acero::aggregate::GroupByNode::OutputResult(bool) (groupby_aggregate_node.cc:342)
==42024==    by 0x4073E7: arrow::acero::aggregate::GroupByNode::InputFinished(arrow::acero::ExecNode*, int) (groupby_aggregate_node.cc:396)
==42024==    by 0x38CEAF: operator() (source_node.cc:226)
==42024==    by 0x38CEAF: __invoke_impl<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::StartProducing()::<lambda(arrow::Result<int>)> mutable::<lambda()>&> (invoke.h:61)
==42024==    by 0x38CEAF: __invoke_r<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::StartProducing()::<lambda(arrow::Result<int>)> mutable::<lambda()>&> (invoke.h:116)
==42024==    by 0x38CEAF: std::_Function_handler<arrow::Status (), arrow::acero::(anonymous namespace)::SourceNode::StartProducing()::{lambda(arrow::Result<int>)#1}::operator()(arrow::Result<int>)::{lambda()#1}>::_M_invoke(std::_Any_data const&) (std_function.h:291)
==42024==    by 0x3730AF: operator() (std_function.h:591)
==42024==    by 0x3730AF: operator()<std::function<arrow::Status()>&> (future.h:150)
==42024==    by 0x3730AF: __invoke_impl<void, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:61)
==42024==    by 0x3730AF: __invoke<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:96)
==42024==    by 0x3730AF: __call<void, 0, 1> (functional:506)
==42024==    by 0x3730AF: operator()<> (functional:591)
==42024==    by 0x3730AF: arrow::internal::FnOnce<void ()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> >::invoke() (functional.h:152)
==42024==    by 0x14B8F6C: operator() (functional.h:140)
==42024==    by 0x14B8F6C: WorkerLoop (thread_pool.cc:478)
==42024==    by 0x14B8F6C: operator() (thread_pool.cc:643)
==42024==    by 0x14B8F6C: __invoke_impl<void, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:61)
==42024==    by 0x14B8F6C: __invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:96)
==42024==    by 0x14B8F6C: _M_invoke<0> (std_thread.h:292)
==42024==    by 0x14B8F6C: operator() (std_thread.h:299)
==42024==    by 0x14B8F6C: std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::{lambda()#1}> > >::_M_run() (std_thread.h:244)
==42024==    by 0x4B69DB3: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.33)
==42024==    by 0x4854B7A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==42024==    by 0x4EAEAA3: start_thread (pthread_create.c:447)
==42024==    by 0x4F3BA33: clone (clone.S:100)
==42024== 
==42024== This conflicts with a previous write of size 8 by thread #3
==42024== Locks held: none
==42024==    at 0x405C8A: reset (unique_ptr.h:209)
==42024==    by 0x405C8A: operator= (unique_ptr.h:191)
==42024==    by 0x405C8A: operator= (unique_ptr.h:243)
==42024==    by 0x405C8A: operator= (unique_ptr.h:414)
==42024==    by 0x405C8A: arrow::acero::aggregate::GroupByNode::InitLocalStateIfNeeded(arrow::acero::aggregate::GroupByNode::ThreadLocalState*) (groupby_aggregate_node.cc:428)
==42024==    by 0x407C45: arrow::acero::aggregate::GroupByNode::Consume(arrow::compute::ExecSpan) (groupby_aggregate_node.cc:218)
==42024==    by 0x408A2C: operator() (groupby_aggregate_node.cc:376)
==42024==    by 0x408A2C: HandleSegments<arrow::acero::aggregate::GroupByNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::<lambda(const arrow::compute::ExecBatch&, const arrow::compute::Segment&)> > (aggregate_internal.h:139)
==42024==    by 0x408A2C: arrow::acero::aggregate::GroupByNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) (groupby_aggregate_node.cc:382)
==42024==    by 0x3943D5: operator() (source_node.cc:158)
==42024==    by 0x3943D5: __invoke_impl<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const arrow::compute::ExecBatch&)::<lambda()>&> (invoke.h:61)
==42024==    by 0x3943D5: __invoke_r<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const arrow::compute::ExecBatch&)::<lambda()>&> (invoke.h:116)
==42024==    by 0x3943D5: std::_Function_handler<arrow::Status (), arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::{lambda()#1}>::_M_invoke(std::_Any_data const&) (std_function.h:291)
==42024==    by 0x3730AF: operator() (std_function.h:591)
==42024==    by 0x3730AF: operator()<std::function<arrow::Status()>&> (future.h:150)
==42024==    by 0x3730AF: __invoke_impl<void, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:61)
==42024==    by 0x3730AF: __invoke<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:96)
==42024==    by 0x3730AF: __call<void, 0, 1> (functional:506)
==42024==    by 0x3730AF: operator()<> (functional:591)
==42024==    by 0x3730AF: arrow::internal::FnOnce<void ()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> >::invoke() (functional.h:152)
==42024==    by 0x14B8F6C: operator() (functional.h:140)
==42024==    by 0x14B8F6C: WorkerLoop (thread_pool.cc:478)
==42024==    by 0x14B8F6C: operator() (thread_pool.cc:643)
==42024==    by 0x14B8F6C: __invoke_impl<void, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:61)
==42024==    by 0x14B8F6C: __invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:96)
==42024==    by 0x14B8F6C: _M_invoke<0> (std_thread.h:292)
==42024==    by 0x14B8F6C: operator() (std_thread.h:299)
==42024==    by 0x14B8F6C: std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::{lambda()#1}> > >::_M_run() (std_thread.h:244)
==42024==    by 0x4B69DB3: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.33)
==42024==    by 0x4854B7A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==42024==    by 0x4EAEAA3: start_thread (pthread_create.c:447)
==42024==    by 0x4F3BA33: clone (clone.S:100)
==42024==  Address 0x57dea30 is 0 bytes inside a block of size 352 alloc'd
==42024==    at 0x4849023: operator new(unsigned long) (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==42024==    by 0x40CF76: allocate (new_allocator.h:151)
==42024==    by 0x40CF76: allocate (alloc_traits.h:482)
==42024==    by 0x40CF76: _M_allocate (stl_vector.h:381)
==42024==    by 0x40CF76: _M_allocate (stl_vector.h:378)
==42024==    by 0x40CF76: std::vector<arrow::acero::aggregate::GroupByNode::ThreadLocalState, std::allocator<arrow::acero::aggregate::GroupByNode::ThreadLocalState> >::_M_default_append(unsigned long) (vector.tcc:663)
==42024==    by 0x40D1D2: resize (stl_vector.h:1016)
==42024==    by 0x40D1D2: arrow::acero::aggregate::GroupByNode::StartProducing() (aggregate_internal.h:289)
==42024==    by 0x32B8EA: operator() (exec_plan.cc:175)
==42024==    by 0x32B8EA: arrow::internal::FnOnce<arrow::Status (arrow::util::AsyncTaskScheduler*)>::FnImpl<arrow::acero::(anonymous namespace)::ExecPlanImpl::StartProducing()::{lambda(arrow::util::AsyncTaskScheduler*)#1}>::invoke(arrow::util::AsyncTaskScheduler*&&) (functional.h:152)
==42024==    by 0x143A479: operator() (functional.h:140)
==42024==    by 0x143A479: arrow::util::AsyncTaskScheduler::Make(arrow::internal::FnOnce<arrow::Status (arrow::util::AsyncTaskScheduler*)>, arrow::internal::FnOnce<void (arrow::Status const&)>, arrow::StopToken) (async_util.cc:471)
==42024==    by 0x329640: arrow::acero::(anonymous namespace)::ExecPlanImpl::StartProducing() (exec_plan.cc:193)
==42024==    by 0x332AAB: StartProducing (exec_plan.cc:439)
==42024==    by 0x332AAB: arrow::acero::(anonymous namespace)::DeclarationToTableImpl(arrow::acero::Declaration, arrow::acero::QueryOptions, arrow::internal::Executor*) (exec_plan.cc:662)
==42024==    by 0x333433: operator() (exec_plan.cc:787)
==42024==    by 0x333433: arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>::FnImpl<arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::{lambda(arrow::internal::Executor*)#1}>::invoke(arrow::internal::Executor*&&) (functional.h:152)
==42024==    by 0x342B55: operator() (functional.h:140)
==42024==    by 0x342B55: arrow::Future<std::shared_ptr<arrow::Table> >::SyncType arrow::internal::RunSynchronously<arrow::Future<std::shared_ptr<arrow::Table> >, std::shared_ptr<arrow::Table> >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>, bool) (thread_pool.h:587)
==42024==    by 0x334F63: arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*) (exec_plan.cc:789)
==42024==    by 0x320A8F: RunTest() (testit.cc:32)
==42024==    by 0x30B824: main (testit.cc:39)
==42024==  Block was alloc'd by thread #1
==42024== 
==42024== ----------------------------------------------------------------

Inspecting how InputRecieved and InputFinished are implemented in the groupby node, along with what the source node does - it uses the task scheduler to call InputReceived() on the downstream node and similarly so with InputFinished() - I can see how this race can happen.

Presumably the task scheduler does not guarantee any sort of ordering over which is called first between InputReceived() or InputFinished() - furthermore they can be called simultaneously off different threads. This implies that both of these methods in the groupby node must be threadsafe - which therefore implies that OutputResult must also be threadsafe. But the way that OutputResult is currently implemented is clearly not - it first modifies the thread-local state (potentially swapping entries; these entries could also potentially be populated during this iteration if the other threads have just spawned) and further merges the thread-local state.

I think this explains the above race - InputFinished is being called from one thread while another thread is in the middle of calling InputReceived - and both end up in OutputResult when they should not. This is also causing other issues such as the execution plan to be marked as finished even before the source node has called InputFinished on the aggregation node - which means that valgrind is also observing that InputFinished could be called well after things have been destroyed.

So it looks like there is a clear issue here - but I am not familiar enough with the intended execution/threading model to say what the fix should be. Some suggestions here would be appreciated

Component(s)

C++

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions