diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index b2e5f034417..a2fc290cf8c 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -859,7 +859,11 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl } if (aborted) break; - const void *row = input->ungroupedNextRow(); + const void *row; + { + LookAheadTimer t(owner.activity->getActivityTimerAccumulator(), owner.activity->queryTimeActivities()); + row = input->ungroupedNextRow(); + } if (!row) break; @@ -1060,7 +1064,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ::ActPrintLogEx(&activity->queryContainer(), e, thorlog_all, MCexception(e), "%s", msg.str()); } protected: - CActivityBase *activity; + CSlaveActivity *activity; size32_t inputBufferSize, pullBufferSize; unsigned writerPoolSize; unsigned self; @@ -1078,7 +1082,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl public: IMPLEMENT_IINTERFACE_USING(CInterface); - CDistributorBase(CActivityBase *_activity, bool _doDedup, bool _isall, IStopInput *_istop, const char *_id) + CDistributorBase(CSlaveActivity *_activity, bool _doDedup, bool _isall, IStopInput *_istop, const char *_id) : activity(_activity), recvthread(this), sendthread(this), sender(*this), id(_id) { aborted = connected = false; @@ -1530,7 +1534,7 @@ class CRowDistributor: public CDistributorBase ICommunicator &comm; bool stopping; public: - CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) + CRowDistributor(CSlaveActivity *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) : CDistributorBase(activity, doDedup, isAll, istop, id), comm(_comm), tag(_tag) { stopping = false; @@ -1791,7 +1795,7 @@ class CRowPullDistributor: public CDistributorBase selfdone.reinit(); } public: - CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id) + CRowPullDistributor(CSlaveActivity *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id) : CDistributorBase(activity, doDedup, false, istop, id), comm(_comm), tag(_tag) { pull = true; @@ -2086,12 +2090,12 @@ class CRowPullDistributor: public CDistributorBase //================================================================================================== -IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) +IHashDistributor *createHashDistributor(CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) { return new CRowDistributor(activity, comm, tag, doDedup, isAll, istop, id); } -IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL) +IHashDistributor *createPullHashDistributor(CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL) { return new CRowPullDistributor(activity, comm, tag, doDedup, istop, id); } diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp index 862ec32bdd9..900d2f0bf11 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp @@ -36,7 +36,7 @@ interface IHashDistributor : extends IInterface interface IStopInput; IHashDistributor *createHashDistributor( - CActivityBase *activity, + CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool dedup,