Skip to content

Commit

Permalink
[columnar] Add deleted rows count in columnar.chunk_group and columna…
Browse files Browse the repository at this point in the history
…r.row_mask

* Add deleted rows count in `chunk_group` and `row_mask` metadata table
* Skip loading of row mask for chunk if there is no deleted rows
  • Loading branch information
mkaruza authored Feb 17, 2023
1 parent 209fbbc commit c987c6e
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 57 deletions.
165 changes: 131 additions & 34 deletions columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeI
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
static StripeMetadata * BuildStripeMetadata(Relation columnarStripes,
HeapTuple heapTuple);
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
chunkGroupCount, Snapshot snapshot);
static void ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe,
uint32 chunkGroupCount,
uint32 **chunkGroupRowCounts,
uint32 **chunkGroupDeletedRows,
Snapshot snapshot);
static Oid ColumnarStorageIdSequenceRelationId(void);
static Oid ColumnarStripeRelationId(void);
static Oid ColumnarStripePKeyIndexRelationId(void);
Expand Down Expand Up @@ -171,11 +174,12 @@ typedef FormData_columnar_options *Form_columnar_options;
#define Anum_columnar_stripe_first_row_number 9

/* constants for columnar.chunk_group */
#define Natts_columnar_chunkgroup 4
#define Natts_columnar_chunkgroup 5
#define Anum_columnar_chunkgroup_storageid 1
#define Anum_columnar_chunkgroup_stripe 2
#define Anum_columnar_chunkgroup_chunk 3
#define Anum_columnar_chunkgroup_row_count 4
#define Anum_columnar_chunkgroup_deleted_rows 5

/* constants for columnar.chunk */
#define Natts_columnar_chunk 14
Expand All @@ -195,12 +199,16 @@ typedef FormData_columnar_options *Form_columnar_options;
#define Anum_columnar_chunk_value_count 14

/* constants for columnar.row_mask */
#define Natts_columnar_row_mask 5
#define Natts_columnar_row_mask 8
#define Anum_columnar_row_mask_id 1
#define Anum_columnar_row_mask_storage_id 2
#define Anum_columnar_row_mask_start_row_number 3
#define Anum_columnar_row_mask_end_row_number 4
#define Anum_columnar_row_mask_mask 5
#define Anum_columnar_row_mask_stripe_id 3
#define Anum_columnar_row_mask_chunk_id 4
#define Anum_columnar_row_mask_start_row_number 5
#define Anum_columnar_row_mask_end_row_number 6
#define Anum_columnar_row_mask_deleted_rows 7
#define Anum_columnar_row_mask_mask 8


/*
* InitColumnarOptions initialized the columnar table options. Meaning it writes the
Expand Down Expand Up @@ -532,7 +540,8 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
UInt64GetDatum(storageId),
Int64GetDatum(stripe),
Int32GetDatum(chunkId),
Int64GetDatum(rowCount)
Int64GetDatum(rowCount),
Int32GetDatum(0)
};

bool nulls[Natts_columnar_chunkgroup] = { false };
Expand All @@ -550,8 +559,8 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
* SaveEmptyRowMask saves the metadata for inserted rows in columnar.mask_row
*/
extern bool
SaveEmptyRowMask(uint64 storageId, uint64 stripeStartRowNumber,
List *chunkGroupRowCounts)
SaveEmptyRowMask(uint64 storageId, uint64 stripeId,
uint64 stripeStartRowNumber, List *chunkGroupRowCounts)
{
Oid columnarRowMaskOid = ColumnarRowMaskRelationId();
Oid columnarRowMaskSeq = ColumnarRowMaskSeqId();
Expand All @@ -562,6 +571,7 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeStartRowNumber,
uint64 chunkIterationEndRowNumber = stripeStartRowNumber - 1;

ListCell *lc = NULL;
int chunkId = 0;

bool chunkInserted = true;

Expand Down Expand Up @@ -604,9 +614,12 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeStartRowNumber,

Datum values[Natts_columnar_row_mask] = {
Int64GetDatum(nextSeqId),
UInt64GetDatum(storageId),
Int64GetDatum(storageId),
Int64GetDatum(stripeId),
Int32GetDatum(chunkId),
Int64GetDatum(chunkIterationStartRowNumber),
Int64GetDatum(chunkIterationEndRowNumber),
Int32GetDatum(0),
0, /* to be filled below */
};

Expand Down Expand Up @@ -638,6 +651,7 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeStartRowNumber,
}

chunkIterationStartRowNumber = chunkIterationEndRowNumber + 1;
chunkId++;
}

FinishModifyRelation(modifyState);
Expand Down Expand Up @@ -755,8 +769,10 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
index_close(index, AccessShareLock);
table_close(columnarChunk, AccessShareLock);

chunkList->chunkGroupRowCounts =
ReadChunkGroupRowCounts(storageId, stripe, chunkCount, snapshot);
ReadChunkGroupRowCounts(storageId, stripe, chunkCount,
&chunkList->chunkGroupRowCounts,
&chunkList->chunkGroupDeletedRows,
snapshot);

chunkList->chunkGroupRowOffset = palloc0(chunkCount * sizeof(uint32));

Expand Down Expand Up @@ -883,20 +899,32 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId,
/*
* Populate row mask cache with values from heap table
*/
rowMaskEntry->id = DatumGetInt64(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_id,
rowMaskEntry->id = DatumGetUInt64(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_id,
tupleDescriptor, &isnull));

rowMaskEntry->storageId = DatumGetInt64(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_storage_id,
rowMaskEntry->storageId = DatumGetUInt64(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_storage_id,
tupleDescriptor, &isnull));

rowMaskEntry->stripeId = DatumGetUInt64(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_stripe_id,
tupleDescriptor, &isnull));

rowMaskEntry->chunkId = DatumGetInt32(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_chunk_id,
tupleDescriptor, &isnull));

rowMaskEntry->startRowNumber = DatumGetInt64(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_start_row_number,
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_start_row_number,
tupleDescriptor, &isnull));

rowMaskEntry->deletedRows = DatumGetInt32(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_deleted_rows,
tupleDescriptor, &isnull));

rowMaskEntry->endRowNumber = DatumGetInt64(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_end_row_number,
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_end_row_number,
tupleDescriptor, &isnull));

rowMask = rowMaskEntry->mask;
Expand All @@ -922,6 +950,8 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId,

VARDATA(rowMask)[rowByteMask / 8] |= 1 << (rowByteMask % 8);

rowMaskEntry->deletedRows++;

CommandCounterIncrement();

return true;
Expand Down Expand Up @@ -958,12 +988,16 @@ void FlushRowMaskCache(RowMaskWriteStateEntry *rowMaskEntry)
bool nulls[Natts_columnar_row_mask] = { 0 };
Datum values[Natts_columnar_row_mask] = { 0 };

update[Natts_columnar_row_mask - 1] = true;
// Update deleted row count
update[Anum_columnar_row_mask_deleted_rows - 1] = true;
values[Anum_columnar_row_mask_deleted_rows - 1] = PointerGetDatum(rowMaskEntry->deletedRows);

// Update mask byte array
update[Anum_columnar_row_mask_mask - 1] = true;
values[Anum_columnar_row_mask_mask - 1] = PointerGetDatum(rowMaskEntry->mask);

HeapTuple newHeapTuple = heap_modify_tuple(oldHeapTuple, tupleDescriptor,
values, nulls, update);
values, nulls, update);

CatalogTupleUpdate(columnarChunkGroupMask, &oldHeapTuple->t_self, newHeapTuple);

Expand Down Expand Up @@ -1296,8 +1330,9 @@ FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
* given stripe.
*/
static uint32 *
static void
ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
uint32 **chunkGroupRowCounts, uint32 **chunkGroupDeletedRows,
Snapshot snapshot)
{
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
Expand All @@ -1313,13 +1348,23 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
SysScanDesc scanDescriptor =
systable_beginscan_ordered(columnarChunkGroup, index, snapshot, 2, scanKey);

uint32 chunkGroupIndex = 0;
HeapTuple heapTuple = NULL;
uint32 *chunkGroupRowCounts = palloc0(chunkGroupCount * sizeof(uint32));

*chunkGroupRowCounts = palloc0(chunkGroupCount * sizeof(uint32));
*chunkGroupDeletedRows = palloc(chunkGroupCount * sizeof(uint32));

/*
* Since we have now updates of `chunk_group`, there could be multiple tuples
* retrieved with changed only deleted_row count. We expect that last modified
* version will be retrieved last so it we just update information based on tuple
* chunk group index.
*/
while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection)))
{
if (HeapTupleHeaderIsHotUpdated(heapTuple->t_data))
continue;

Datum datumArray[Natts_columnar_chunkgroup];
bool isNullArray[Natts_columnar_chunkgroup];

Expand All @@ -1329,27 +1374,78 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,

uint32 tupleChunkGroupIndex =
DatumGetUInt32(datumArray[Anum_columnar_chunkgroup_chunk - 1]);
if (chunkGroupIndex >= chunkGroupCount ||
tupleChunkGroupIndex != chunkGroupIndex)

if (tupleChunkGroupIndex > chunkGroupCount)
{
elog(ERROR, "unexpected chunk group");
elog(ERROR, "Tuple chunk group higher than chunk group count");
}

chunkGroupRowCounts[chunkGroupIndex] =
(*chunkGroupRowCounts)[tupleChunkGroupIndex] =
(uint32) DatumGetUInt64(datumArray[Anum_columnar_chunkgroup_row_count - 1]);
chunkGroupIndex++;

(*chunkGroupDeletedRows)[tupleChunkGroupIndex] =
(uint32) DatumGetUInt64(datumArray[Anum_columnar_chunkgroup_deleted_rows - 1]);
}

if (chunkGroupIndex != chunkGroupCount)
systable_endscan_ordered(scanDescriptor);
index_close(index, AccessShareLock);
table_close(columnarChunkGroup, AccessShareLock);
}


/*
* ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
* given stripe.
*/
void
UpdateChunkGroupDeletedRows(uint64 storageId, uint64 stripe,
uint32 chunkGroupId, uint32 deletedRowNumber)
{

HeapTuple oldHeapTuple = NULL;

Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(columnarChunkGroup);
Relation index = index_open(ColumnarChunkGroupIndexRelationId(), AccessShareLock);

ScanKeyData scanKey[3];
ScanKeyInit(&scanKey[0], Anum_columnar_chunkgroup_storageid,
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[1], Anum_columnar_chunkgroup_stripe,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
ScanKeyInit(&scanKey[2], Anum_columnar_chunkgroup_chunk,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(chunkGroupId));

SysScanDesc scanDescriptor =
systable_beginscan_ordered(columnarChunkGroup, index, NULL, 3, scanKey);

oldHeapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection);

index_close(index, AccessShareLock);

if (HeapTupleIsValid(oldHeapTuple))
{
elog(ERROR, "unexpected chunk group count");
bool update[Natts_columnar_chunkgroup] = { 0 };
bool nulls[Natts_columnar_chunkgroup] = { 0 };
Datum values[Natts_columnar_chunkgroup] = { 0 };

// Update deleted row count
update[Anum_columnar_chunkgroup_deleted_rows - 1] = true;
values[Anum_columnar_chunkgroup_deleted_rows - 1] = (deletedRowNumber);

HeapTuple newHeapTuple = heap_modify_tuple(oldHeapTuple, tupleDescriptor,
values, nulls, update);

CatalogTupleUpdate(columnarChunkGroup, &oldHeapTuple->t_self, newHeapTuple);

heap_freetuple(newHeapTuple);
}

systable_endscan_ordered(scanDescriptor);
index_close(index, AccessShareLock);
table_close(columnarChunkGroup, AccessShareLock);

return chunkGroupRowCounts;
CommandCounterIncrement();
}


Expand Down Expand Up @@ -2205,7 +2301,8 @@ create_table_row_mask(PG_FUNCTION_ARGS)
chunkGroupRowCount =
lappend_int(chunkGroupRowCount, lastChunkRowCount);

if(!SaveEmptyRowMask(storageId, stripeMetadata->firstRowNumber, chunkGroupRowCount))
if(!SaveEmptyRowMask(storageId, stripeMetadata->id,
stripeMetadata->firstRowNumber, chunkGroupRowCount))
{
elog(WARNING, "relation \"%s\" already has columnar.row_mask populated.",
RelationGetRelationName(relation));
Expand Down
Loading

0 comments on commit c987c6e

Please sign in to comment.