Skip to content

Commit

Permalink
[columnar] JOIN should work with parallel execution
Browse files Browse the repository at this point in the history
* Columnar is collecting all JOIN clauses and uses them in their non
  parallel scan. For parallel execution this should be discarded. JOIN
  clauses will be used in nodes that are responsible for them.

* Rather than assign each worker an id, we are now assigning stripe id
  to be used based on order in which they arrive.
  • Loading branch information
mkaruza authored and wuputah committed Nov 30, 2022
1 parent d0db6a2 commit 3370bf9
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 286 deletions.
76 changes: 38 additions & 38 deletions columnar/src/backend/columnar/columnar_customscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
* ColumnarScanState represents the state for a columnar scan. It's a
* CustomScanState with additional fields specific to columnar scans.
*/

typedef struct ColumnarScanState
{
CustomScanState custom_scanstate; /* must be first field */
Expand All @@ -66,8 +65,7 @@ typedef struct ColumnarScanState
List *qual;

/* Parallel execution */
uint32 nWorkers;
uint32 workerId;
ParallelColumnarScan parallelColumnarScan;

/* Vectorization */
struct
Expand All @@ -81,14 +79,6 @@ typedef struct ColumnarScanState
} vectorization;
} ColumnarScanState;

typedef struct ParallelColumnarTableScanData
{
pg_atomic_uint32 nextWorkerID; /* fetch and increment for next worker id */
uint32 numberOfWorkers; /* Total number of workers */
} ParallelColumnarTableScanData;
typedef struct ParallelColumnarTableScanData *ParallelColumnarTableScan;


typedef bool (*PathPredicate)(Path *path);


Expand Down Expand Up @@ -161,7 +151,7 @@ static void Columnar_ReinitializeDSMCustomScan(CustomScanState *node,
void *coordinate);
static void Columnar_InitializeWorkerCustomScan(CustomScanState *node,
shm_toc *toc,
void *coordinate);
void *coordinate);

/* helper functions to build strings for EXPLAIN */
static const char * ColumnarPushdownClausesStr(List *context, List *clauses);
Expand Down Expand Up @@ -1217,13 +1207,12 @@ AddColumnarScanPathsRec(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte,
Assert(!bms_overlap(paramRelids, candidateRelids));

Path *columnarScanPath = AddColumnarScanPath(root, rel, rte, paramRelids);

add_path(rel, columnarScanPath);
if (columnar_enable_parallel_execution)
columnarScanPath->total_cost += columnarScanPath->rows * 0.1;

/* For columnar custom scan we should also do planning for parallel exeuction
* if it is enabled.
* if it is enabled with GUC.
*/
if (columnar_enable_parallel_execution)
{
Expand All @@ -1232,19 +1221,31 @@ AddColumnarScanPathsRec(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte,
if (columnar_min_parallel_processes > max_parallel_workers)
{
elog(DEBUG1, "columnar.min_parallel_proceses is set higher than max_parallel_workers.");
elog(DEBUG1, "Using max_parallel_workers instead for columnar scan.");
elog(DEBUG1, "Using max_parallel_workers instead for parallel columnar scan.");
columnar_min_parallel_process_running =
Min(max_parallel_workers, columnar_min_parallel_processes);
}

if (parallel_leader_participation)
columnar_min_parallel_process_running--;

/*
* We know how many workers are we going to run. Since workers are using single stripe
* probably it doesn't have much sense to spawn more workers than stripes in table.
*/
columnar_min_parallel_process_running = Min(columnar_min_parallel_process_running,
parallel_leader_participation ?
ColumnarTableStripeCount(rte->relid) - 1 :
ColumnarTableStripeCount(rte->relid));

if (rel->consider_parallel && rel->lateral_relids == NULL &&
columnar_min_parallel_process_running > 0)
{
/*
* Passing NULL for JOIN quals in parallel execution.
*/
Path *parallelColumnarScanPath =
AddColumnarScanPath(root, rel, rte, paramRelids);
AddColumnarScanPath(root, rel, rte, NULL);

parallelColumnarScanPath->parallel_workers = columnar_min_parallel_process_running;
parallelColumnarScanPath->parallel_aware = true;
Expand Down Expand Up @@ -2193,8 +2194,7 @@ ColumnarScanNext(ColumnarScanState *columnarScanState)
0, NULL, NULL, flags,
columnarScanState->attrNeeded,
columnarScanState->qual,
columnarScanState->workerId,
columnarScanState->nWorkers,
columnarScanState->parallelColumnarScan,
vectorizationEnabled);

node->ss.ss_currentScanDesc = scandesc;
Expand Down Expand Up @@ -2363,7 +2363,7 @@ static Size
Columnar_EstimateDSMCustomScan(CustomScanState *node,
ParallelContext *pcxt)
{
return sizeof(ParallelColumnarTableScanData);
return sizeof(ParallelColumnarScanData);
}


Expand All @@ -2372,24 +2372,16 @@ Columnar_InitializeDSMCustomScan(CustomScanState *node,
ParallelContext *pcxt,
void *coordinate)
{
ParallelColumnarTableScan pscan = (ParallelColumnarTableScan) coordinate;
ParallelColumnarScan pscan = (ParallelColumnarScan) coordinate;
ColumnarScanState *columnarScanState = (ColumnarScanState *) node;

/* Stripe numbers are starting from index 1 */
pg_atomic_init_u64(&pscan->nextStripeId, 1);

/* Check if leader is also doing in participation in
* query execution and adjust workers
*/
if (parallel_leader_participation)
{
columnarScanState->nWorkers = pcxt->nworkers + 1;
columnarScanState->workerId = 0;
pscan->numberOfWorkers = pcxt->nworkers + 1;
pg_atomic_init_u32(&pscan->nextWorkerID, 1);
}
if(parallel_leader_participation)
columnarScanState->parallelColumnarScan = pscan;
else
{
pscan->numberOfWorkers = pcxt->nworkers;
pg_atomic_init_u32(&pscan->nextWorkerID, 0);
}
columnarScanState->parallelColumnarScan = NULL;
}


Expand All @@ -2398,7 +2390,16 @@ Columnar_ReinitializeDSMCustomScan(CustomScanState *node,
ParallelContext *pcxt,
void *coordinate)
{
elog(DEBUG1, "ReinitializeDSMCustomScan not supported.");
ParallelColumnarScan pscan = (ParallelColumnarScan) coordinate;
ColumnarScanState *columnarScanState = (ColumnarScanState *) node;

/* Reset atomic nextStripeId to initial value */
pg_atomic_init_u64(&pscan->nextStripeId, 1);

if(parallel_leader_participation)
columnarScanState->parallelColumnarScan = pscan;
else
columnarScanState->parallelColumnarScan = NULL;
}


Expand All @@ -2408,9 +2409,8 @@ Columnar_InitializeWorkerCustomScan(CustomScanState *node,
void *coordinate)
{
ColumnarScanState *columnarScanState = (ColumnarScanState *) node;
ParallelColumnarTableScan pscan = (ParallelColumnarTableScan) coordinate;
columnarScanState->workerId = pg_atomic_fetch_add_u32(&pscan->nextWorkerID, 1);
columnarScanState->nWorkers = pscan->numberOfWorkers;
ParallelColumnarScan pscan = (ParallelColumnarScan) coordinate;
columnarScanState->parallelColumnarScan = pscan;
}


Expand Down
18 changes: 3 additions & 15 deletions columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,7 @@ FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber,
StripeMetadata *
FindNextStripeForParallelWorker(Relation relation,
Snapshot snapshot,
uint32 workerId,
uint32 nWorkers,
uint32 lastWorkerStripeModuloRowIdx)
uint32 nextStripeId)
{
StripeMetadata *foundStripeMetadata = NULL;

Expand All @@ -735,25 +733,15 @@ FindNextStripeForParallelWorker(Relation relation,
snapshot, 1,
&scanKey);

uint64 modCount = 0;

while(true)
{

HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);

if (HeapTupleIsValid(heapTuple))
{
foundStripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
int mod = foundStripeMetadata->id % nWorkers;

if (mod == workerId)
{
modCount++;

if (modCount == lastWorkerStripeModuloRowIdx)
break;
}
if (foundStripeMetadata->id == nextStripeId)
break;
}
else
{
Expand Down
22 changes: 9 additions & 13 deletions columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ struct ColumnarReadState
bool snapshotRegisteredByUs;

/* Parallel exeuction */

uint32 workerId;
uint32 nWorkers;
uint64 lastReadStripeRowId;
ParallelColumnarScan parallelColumnarScan;
};

/* static function declarations */
Expand Down Expand Up @@ -194,7 +191,7 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
List *projectedColumnList, List *whereClauseList,
MemoryContext scanContext, Snapshot snapshot,
bool randomAccess,
uint32 workerId, uint32 nWorkers)
ParallelColumnarScan parallelColumnarScan)
{
/*
* We allocate all stripe specific data in the stripeReadContext, and reset
Expand Down Expand Up @@ -222,8 +219,7 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
readState->snapshotRegisteredByUs = false;

/* Parallel execution */
readState->nWorkers = nWorkers;
readState->workerId = workerId;
readState->parallelColumnarScan = parallelColumnarScan;

if (!randomAccess)
{
Expand Down Expand Up @@ -707,7 +703,7 @@ AdvanceStripeRead(ColumnarReadState *readState)
{
MemoryContext oldContext = MemoryContextSwitchTo(readState->scanContext);

if (readState->nWorkers == 0)
if (readState->parallelColumnarScan == 0)
{
/* if not read any stripes yet, start from the first one .. */
uint64 lastReadRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
Expand All @@ -726,19 +722,19 @@ AdvanceStripeRead(ColumnarReadState *readState)
}
else
{
readState->lastReadStripeRowId++;

if (StripeReadInProgress(readState))
{
readState->chunkGroupsFiltered +=
readState->stripeReadState->chunkGroupsFiltered;
}

/* Fetch atomic next stripe id to be read by this scan. */
uint32 nextStripeId =
pg_atomic_fetch_add_u64(&readState->parallelColumnarScan->nextStripeId, 1);

readState->currentStripeMetadata = FindNextStripeForParallelWorker(readState->relation,
readState->snapshot,
readState->workerId,
readState->nWorkers,
readState->lastReadStripeRowId);
nextStripeId);
}

if (readState->currentStripeMetadata &&
Expand Down
25 changes: 12 additions & 13 deletions columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ typedef struct ColumnarScanDescData
Bitmapset *attr_needed;
List *scanQual;

/* Parallel Scan */
uint32 workerId;
uint32 nWorkers;
/* Parallel Scan Data */
ParallelColumnarScan parallelColumnarScan;

/* Vectorization */
bool returnVectorizedTuple;
Expand Down Expand Up @@ -206,7 +205,8 @@ columnar_beginscan(Relation relation, Snapshot snapshot,
TableScanDesc scandesc = columnar_beginscan_extended(relation, snapshot, nkeys, key,
parallel_scan,
flags, attr_needed, NULL,
0, 0, false);
NULL,
false);

bms_free(attr_needed);

Expand All @@ -219,7 +219,7 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
ParallelTableScanDesc parallel_scan,
uint32 flags, Bitmapset *attr_needed, List *scanQual,
uint32 workerId, uint32 nWorkers,
ParallelColumnarScan parallelColumnarScan,
bool returnVectorizedTuple)
{
CheckCitusColumnarVersion(ERROR);
Expand Down Expand Up @@ -253,9 +253,8 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot,
scan->scanQual = copyObject(scanQual);
scan->scanContext = scanContext;

/* Parallel execution */;
scan->workerId = workerId;
scan->nWorkers = nWorkers;
/* Parallel execution scan data */;
scan->parallelColumnarScan = parallelColumnarScan;

/* Vectorized result */
scan->returnVectorizedTuple = returnVectorizedTuple;
Expand Down Expand Up @@ -292,15 +291,15 @@ static ColumnarReadState *
init_columnar_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
List *scanQual, MemoryContext scanContext, Snapshot snapshot,
bool randomAccess,
uint32 workerId, uint32 nWorkers)
ParallelColumnarScan parallelColumnarScan)
{
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);

List *neededColumnList = NeededColumnsList(tupdesc, attr_needed);
ColumnarReadState *readState = ColumnarBeginRead(relation, tupdesc, neededColumnList,
scanQual, scanContext, snapshot,
randomAccess,
workerId, nWorkers);
parallelColumnarScan);

MemoryContextSwitchTo(oldContext);

Expand Down Expand Up @@ -357,7 +356,7 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo
scan->attr_needed, scan->scanQual,
scan->scanContext, scan->cs_base.rs_snapshot,
randomAccess,
scan->workerId, scan->nWorkers);
scan->parallelColumnarScan);
}

ExecClearTuple(slot);
Expand Down Expand Up @@ -567,7 +566,7 @@ columnar_index_fetch_tuple(struct IndexFetchTableData *sscan,
attr_needed, scanQual,
scan->scanContext,
snapshot, randomAccess,
0, 0);
NULL);
}

uint64 rowNumber = tid_to_row_number(*tid);
Expand Down Expand Up @@ -1030,7 +1029,7 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
attr_needed, scanQual,
scanContext, snapshot,
randomAccess,
0, 0);
NULL);

Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
Expand Down
Loading

0 comments on commit 3370bf9

Please sign in to comment.