Skip to content

Commit

Permalink
HPCC-32480 Capture "look ahead" timings for unordered concat (paralle…
Browse files Browse the repository at this point in the history
…l funnel)

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Oct 30, 2024
1 parent 9d3734f commit bf09241
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,24 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
unsigned numRows = 0;
try
{
funnel.activity.startInput(inputIndex);
{
LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities());
funnel.activity.startInput(inputIndex);
}
started = true;
inputStream = funnel.activity.queryInputStream(inputIndex);
while (!stopping)
{
numRows = 0;
for (;numRows < chunkSize; numRows++)
{
const void * row = inputStream->ungroupedNextRow();
if (!row)
break;
rows[numRows] = row;
LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities());
for (;numRows < chunkSize; numRows++)
{
const void * row = inputStream->ungroupedNextRow();
if (!row)
break;
rows[numRows] = row;
}
}

if (numRows == 0) break;
Expand Down Expand Up @@ -141,13 +147,12 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
Linked<IOutputRowSerializer> serializer;

void push(const void *row)
{
{
size32_t rowSize = thorRowMemoryFootprint(serializer, row);

bool waitForSpace = false;
// only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time
{

CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit.
if (stopped)
{
Expand Down Expand Up @@ -376,7 +381,16 @@ class FunnelSlaveActivity : public CSlaveActivity

auto startInputNFunc = [&](unsigned i)
{
try { startInput(i); }
try
{
if (i == 0) // 1st input is started synchronously, so time already included in start() timing.
startInput(i);
else
{
LookAheadTimer timer(slaveTimerStats, timeActivities);
startInput(i);
}
}
catch (CATCHALL)
{
ActPrintLog("FUNNEL(%" ACTPF "d): Error staring input %d", container.queryId(), i);
Expand Down

0 comments on commit bf09241

Please sign in to comment.