Skip to content

Commit

Permalink
[columnar] Critical fixes in columnar storage
Browse files Browse the repository at this point in the history
* When parallel scan is executing we need to flush in-memory changes
  (stripe, row_mask) in beginning of custom scan node, because updates &
  deletes that happen cannot be done in parallel mode.

* alter_table_set_access_method need to flush changes after renaming is
  done.

* Difference of numbers in stripe id is not always 1. So assigning next
  stripe id for workes should work for this scenario.
  • Loading branch information
mkaruza authored and wuputah committed Feb 8, 2023
1 parent f33b0bd commit 4f939f4
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 62 deletions.
54 changes: 49 additions & 5 deletions columnar/src/backend/columnar/columnar_customscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "access/amapi.h"
#include "access/skey.h"
#include "access/xact.h"
#include "catalog/pg_am.h"
#include "catalog/pg_statistic.h"
#include "commands/defrem.h"
Expand Down Expand Up @@ -77,6 +78,10 @@ typedef struct ColumnarScanState
List *vectorizedQualList;
List *constructedVectorizedQualList;
} vectorization;

/* Scan snapshot*/
Snapshot snapshot;
bool snapshotRegisteredByUs;
} ColumnarScanState;

typedef bool (*PathPredicate)(Path *path);
Expand Down Expand Up @@ -1820,6 +1825,23 @@ ColumnarScan_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int ef
columnarScanState->attrNeeded =
ColumnarAttrNeeded(&cscanstate->ss, columnarScanState->vectorization.vectorizedQualList);

/*
* If we have pending changes that need to be flushed (row_mask after update/delete)
* or new stripe we need to to them here because sequential columnar scan
* can have parallel execution and updated are not allowed in parallel mode.
*/
columnarScanState->snapshot = estate->es_snapshot;
columnarScanState->snapshotRegisteredByUs = false;
Oid relationOid = cscanstate->ss.ss_currentRelation->rd_node.relNode;

if(!IsInParallelMode())
{
RowMaskFlushWriteStateForRelfilenode(relationOid, GetCurrentSubTransactionId());

FlushWriteStateWithNewSnapshot(relationOid, &columnarScanState->snapshot,
&columnarScanState->snapshotRegisteredByUs);
}

/* scan slot is already initialized */
}

Expand Down Expand Up @@ -2207,7 +2229,7 @@ ColumnarScanNext(ColumnarScanState *columnarScanState)
* executing a scan that was planned to be parallel.
*/
scandesc = columnar_beginscan_extended(node->ss.ss_currentRelation,
estate->es_snapshot,
columnarScanState->snapshot,
0, NULL, NULL, flags,
columnarScanState->attrNeeded,
columnarScanState->qual,
Expand Down Expand Up @@ -2274,10 +2296,8 @@ ColumnarScan_ExecCustomScan(CustomScanState *node)
static void
ColumnarScan_EndCustomScan(CustomScanState *node)
{
/*
* get information from node
*/
TableScanDesc scanDesc = node->ss.ss_currentScanDesc;
ColumnarScanState *columnarScanState = (ColumnarScanState *) node;

/*
* Cleanup BMS of selected scan attributes
Expand Down Expand Up @@ -2305,6 +2325,12 @@ ColumnarScan_EndCustomScan(CustomScanState *node)
{
table_endscan(scanDesc);
}

/* Unregister snapshot */
if (columnarScanState->snapshotRegisteredByUs)
{
UnregisterSnapshot(columnarScanState->snapshot);
}
}


Expand Down Expand Up @@ -2381,7 +2407,15 @@ static Size
Columnar_EstimateDSMCustomScan(CustomScanState *node,
ParallelContext *pcxt)
{
return sizeof(ParallelColumnarScanData);
Size nbytes;

ColumnarScanState *columnarScanState = (ColumnarScanState *) node;

nbytes = offsetof(ParallelColumnarScanData, snapshotData);
nbytes = add_size(nbytes, EstimateSnapshotSpace(columnarScanState->snapshot));
nbytes = MAXALIGN(nbytes);

return nbytes;
}


Expand All @@ -2393,6 +2427,15 @@ Columnar_InitializeDSMCustomScan(CustomScanState *node,
ParallelColumnarScan pscan = (ParallelColumnarScan) coordinate;
ColumnarScanState *columnarScanState = (ColumnarScanState *) node;

/*
* Serialize scan snapshot for workers so they see changes
* if we have flushed stripe / row_mask during this scan.
*/
SerializeSnapshot(columnarScanState->snapshot, pscan->snapshotData);

/* Initialize parallel scan mutex */
SpinLockInit(&pscan->mutex);

/* Stripe numbers are starting from index 1 */
pg_atomic_init_u64(&pscan->nextStripeId, 1);

Expand Down Expand Up @@ -2429,6 +2472,7 @@ Columnar_InitializeWorkerCustomScan(CustomScanState *node,
ColumnarScanState *columnarScanState = (ColumnarScanState *) node;
ParallelColumnarScan pscan = (ParallelColumnarScan) coordinate;
columnarScanState->parallelColumnarScan = pscan;
columnarScanState->snapshot = RestoreSnapshot(pscan->snapshotData);
}


Expand Down
10 changes: 9 additions & 1 deletion columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,8 @@ FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber,
StripeMetadata *
FindNextStripeForParallelWorker(Relation relation,
Snapshot snapshot,
uint32 nextStripeId)
uint64 nextStripeId,
uint64 * nextHigherStripeId)
{
StripeMetadata *foundStripeMetadata = NULL;

Expand All @@ -1066,8 +1067,15 @@ FindNextStripeForParallelWorker(Relation relation,
if (HeapTupleIsValid(heapTuple))
{
foundStripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);

if (foundStripeMetadata->id == nextStripeId)
break;

if (foundStripeMetadata->id > nextStripeId)
{
*nextHigherStripeId = foundStripeMetadata->id;
break;
}
}
else
{
Expand Down
93 changes: 38 additions & 55 deletions columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,25 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
if (!randomAccess)
{
/*
* Flush any pending in-memory changes to row_mask metadata
* for next scan.
* If have parallel columnar scan we don't flush states.
*/
RowMaskFlushWriteStateForRelfilenode(readState->relation->rd_node.relNode,
GetCurrentSubTransactionId());
if (readState->parallelColumnarScan == NULL)
{
/*
* Flush any pending in-memory changes to row_mask metadata
* for next scan.
*/
RowMaskFlushWriteStateForRelfilenode(readState->relation->rd_node.relNode,
GetCurrentSubTransactionId());


/*
* When doing random access (i.e.: index scan), we don't need to flush
* pending writes until we need to read them.
* columnar_index_fetch_tuple would do so when needed.
*/
ColumnarReadFlushPendingWrites(readState);
/*
* When doing random access (i.e.: index scan), we don't need to flush
* pending writes until we need to read them.
* columnar_index_fetch_tuple would do so when needed.
*/
ColumnarReadFlushPendingWrites(readState);
}

/*
* AdvanceStripeRead sets currentStripeMetadata for the first stripe
Expand Down Expand Up @@ -290,49 +296,9 @@ ColumnarReadFlushPendingWrites(ColumnarReadState *readState)
Assert(!readState->snapshotRegisteredByUs);

Oid relfilenode = readState->relation->rd_node.relNode;
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());

if (readState->snapshot == InvalidSnapshot || !IsMVCCSnapshot(readState->snapshot))
{
return;
}

/*
* If we flushed any pending writes, then we should guarantee that
* those writes are visible to us too. For this reason, if given
* snapshot is an MVCC snapshot, then we set its curcid to current
* command id.
*
* For simplicity, we do that even if we didn't flush any writes
* since we don't see any problem with that.
*
* XXX: We should either not update cid if we are executing a FETCH
* (from cursor) command, or we should have a better way to deal with
* pending writes, see the discussion in
* https://github.com/citusdata/citus/issues/5231.
*/
PushCopiedSnapshot(readState->snapshot);

/* now our snapshot is the active one */
UpdateActiveSnapshotCommandId();
Snapshot newSnapshot = GetActiveSnapshot();
RegisterSnapshot(newSnapshot);

/*
* To be able to use UpdateActiveSnapshotCommandId, we pushed the
* copied snapshot to the stack. However, we don't need to keep it
* there since we will anyway rely on ColumnarReadState->snapshot
* during read operation.
*
* Note that since we registered the snapshot already, we guarantee
* that PopActiveSnapshot won't free it.
*/
PopActiveSnapshot();

readState->snapshot = newSnapshot;

/* not forget to unregister it when finishing read operation */
readState->snapshotRegisteredByUs = true;
FlushWriteStateWithNewSnapshot(relfilenode, &readState->snapshot,
&readState->snapshotRegisteredByUs);
}


Expand Down Expand Up @@ -753,13 +719,30 @@ AdvanceStripeRead(ColumnarReadState *readState)
readState->stripeReadState->chunkGroupsFiltered;
}

SpinLockAcquire(&readState->parallelColumnarScan->mutex);

/* Fetch atomic next stripe id to be read by this scan. */
uint32 nextStripeId =
uint64 nextStripeId =
pg_atomic_fetch_add_u64(&readState->parallelColumnarScan->nextStripeId, 1);

uint64 nextHigherStripeId = nextStripeId;

readState->currentStripeMetadata = FindNextStripeForParallelWorker(readState->relation,
readState->snapshot,
nextStripeId);
readState->snapshot,
nextStripeId,
&nextHigherStripeId);

/*
* There exists higher stripe id than this one so adjust and
* add +1 for next workers.
*/
if (nextHigherStripeId != nextStripeId)
{
pg_atomic_write_u64(&readState->parallelColumnarScan->nextStripeId,
nextHigherStripeId + 1);
}

SpinLockRelease(&readState->parallelColumnarScan->mutex);
}

if (readState->currentStripeMetadata &&
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
-- columnar--11.1-4--11.1-5.sql

#include "udfs/alter_table_set_access_method/11.1-5.sql"

SET search_path TO columnar;

CREATE SEQUENCE row_mask_seq START WITH 1 INCREMENT BY 1;
Expand Down
Loading

0 comments on commit 4f939f4

Please sign in to comment.