Skip to content

Commit

Permalink
[TDF] Clear TTreeReaderValues before exiting task
Browse files Browse the repository at this point in the history
This fixes a race condition in which a TTreeReader and its
TTreeReaderValues could be deleted concurrently:
Thread #1) a task ends and pushes back processing slot
Thread #2) a task starts and overwrites thread-local TTreeReaderValues
Thread #1) first task deletes TTreeReader
  • Loading branch information
eguiraud authored and dpiparo committed Aug 8, 2017
1 parent 2b1a31c commit 26e8ace
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
30 changes: 28 additions & 2 deletions tree/treeplayer/inc/ROOT/TDFNodes.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class TLoopManager : public std::enable_shared_from_this<TLoopManager> {
void InitNodeSlots(TTreeReader *r, unsigned int slot);
void InitNodes();
void CleanUpNodes();
void CleanUpTask(unsigned int slot);
void JitActions();
void EvalChildrenCounts();

Expand Down Expand Up @@ -164,7 +165,6 @@ public:

void MakeProxy(TTreeReader *r, const std::string &bn)
{
Reset();
bool useReaderValue = std::is_same<ProxyParam_t, T>::value;
if (useReaderValue)
fReaderValue.reset(new TTreeReaderValue<T>(*r, bn.c_str()));
Expand Down Expand Up @@ -213,6 +213,15 @@ struct TTDFValueTuple<TypeList<BranchTypes...>> {
template <typename BranchType>
using TDFValueTuple_t = typename TTDFValueTuple<BranchType>::type;

/// Clear the proxies of a tuple of TColumnValues
template<typename ValueTuple, int...S>
void ResetTDFValueTuple(ValueTuple& values, StaticSeq<S...>)
{
// hack to expand a parameter pack without c++17 fold expressions.
std::initializer_list<int> expander{(std::get<S>(values).Reset(), 0)...};
(void)expander; // avoid "unused variable" warnings
}

class TActionBase {
protected:
TLoopManager *fImplPtr; ///< A raw pointer to the TLoopManager at the root of this functional
Expand All @@ -230,6 +239,7 @@ public:
virtual void Run(unsigned int slot, Long64_t entry) = 0;
virtual void InitSlot(TTreeReader *r, unsigned int slot) = 0;
virtual void TriggerChildrenCount() = 0;
virtual void ClearValueReaders(unsigned int slot) = 0;
unsigned int GetNSlots() const { return fNSlots; }
};

Expand Down Expand Up @@ -275,6 +285,11 @@ public:
void TriggerChildrenCount() final { fPrevData.IncrChildrenCount(); }

~TAction() { fHelper.Finalize(); }

virtual void ClearValueReaders(unsigned int slot) final
{
ResetTDFValueTuple(fValues[slot], TypeInd_t());
}
};

} // end NS TDF
Expand Down Expand Up @@ -311,6 +326,7 @@ public:
virtual void Update(unsigned int slot, Long64_t entry) = 0;
virtual void IncrChildrenCount() = 0;
virtual void StopProcessing() = 0;
virtual void ClearValueReaders(unsigned int slot) = 0;
void ResetChildrenCount()
{
fNChildren = 0;
Expand Down Expand Up @@ -401,6 +417,11 @@ public:
if (fNChildren == 1)
fPrevData.IncrChildrenCount();
}

virtual void ClearValueReaders(unsigned int slot) final
{
ResetTDFValueTuple(fValues[slot], TypeInd_t());
}
};

class TFilterBase {
Expand Down Expand Up @@ -440,6 +461,7 @@ public:
virtual void TriggerChildrenCount() = 0;
unsigned int GetNSlots() const { return fNSlots; }
virtual void ResetReportCount() = 0;
virtual void ClearValueReaders(unsigned int slot) = 0;
};

template <typename FilterF, typename PrevDataFrame>
Expand Down Expand Up @@ -531,6 +553,11 @@ public:
std::fill(fAccepted.begin(), fAccepted.end(), 0);
std::fill(fRejected.begin(), fRejected.end(), 0);
}

virtual void ClearValueReaders(unsigned int slot) final
{
ResetTDFValueTuple(fValues[slot], TypeInd_t());
}
};

class TRangeBase {
Expand Down Expand Up @@ -641,7 +668,6 @@ template <typename T>
void ROOT::Internal::TDF::TColumnValue<T>::SetTmpColumn(unsigned int slot,
ROOT::Detail::TDF::TCustomColumnBase *tmpColumn)
{
Reset();
fTmpColumn = tmpColumn;
if (tmpColumn->GetTypeId() != typeid(T))
throw std::runtime_error(std::string("TColumnValue: type specified is ") + typeid(T).name() +
Expand Down
11 changes: 11 additions & 0 deletions tree/treeplayer/src/TDFNodes.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ void TLoopManager::RunEmptySourceMT()
for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
RunAndCheckFilters(slot, currEntry);
}
CleanUpTask(slot);
slotStack.Push(slot);
};

Expand Down Expand Up @@ -203,6 +204,7 @@ void TLoopManager::RunTreeProcessorMT()
while (r.Next()) {
RunAndCheckFilters(slot, r.GetCurrentEntry());
}
CleanUpTask(slot);
slotStack.Push(slot);
});
#endif // not implemented otherwise
Expand Down Expand Up @@ -273,6 +275,15 @@ void TLoopManager::CleanUpNodes()
for (auto &pair : fBookedBranches) pair.second->ResetChildrenCount();
}

/// Perform clean-up operations. To be called at the end of each task execution.
// TODO in case of interleaved task execution, each task must clear the readervalues in its task slot
void TLoopManager::CleanUpTask(unsigned int slot)
{
for (auto &ptr : fBookedActions) ptr->ClearValueReaders(slot);
for (auto &ptr : fBookedFilters) ptr->ClearValueReaders(slot);
for (auto &pair : fBookedBranches) pair.second->ClearValueReaders(slot);
}

/// Jit all actions that required runtime column type inference, and clean the `fToJit` member variable.
void TLoopManager::JitActions()
{
Expand Down

0 comments on commit 26e8ace

Please sign in to comment.