Skip to content

Commit

Permalink
speculative insert checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
JerrySievert committed Oct 17, 2023
1 parent 50e38d4 commit 0f56257
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 33 deletions.
2 changes: 1 addition & 1 deletion columnar/src/backend/columnar/columnar.control
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
comment = 'Hydra Columnar extension'
default_version = '11.1-9'
default_version = '11.1-10'
module_pathname = '$libdir/columnar'
relocatable = false
91 changes: 77 additions & 14 deletions columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/


#include "c.h"
#include "postgres.h"

#include "safe_lib.h"
Expand All @@ -29,6 +30,7 @@
#include "columnar/columnar_metadata.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_version_compat.h"
#include "columnar/columnar_write_state_row_mask.h"
#include "columnar/utils/listutils.h"

#include <sys/stat.h>
Expand All @@ -46,6 +48,7 @@
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "lib/stringinfo.h"
Expand Down Expand Up @@ -208,7 +211,7 @@ typedef FormData_columnar_options *Form_columnar_options;
#define Anum_columnar_chunk_value_count 14

/* constants for columnar.row_mask */
#define Natts_columnar_row_mask 8
#define Natts_columnar_row_mask 9
#define Anum_columnar_row_mask_id 1
#define Anum_columnar_row_mask_storage_id 2
#define Anum_columnar_row_mask_stripe_id 3
Expand All @@ -217,6 +220,7 @@ typedef FormData_columnar_options *Form_columnar_options;
#define Anum_columnar_row_mask_end_row_number 6
#define Anum_columnar_row_mask_deleted_rows 7
#define Anum_columnar_row_mask_mask 8
#define Anum_columnar_row_mask_spec_mask 9


/*
Expand Down Expand Up @@ -619,6 +623,9 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeId,
bytea *initialLookupRecord = (bytea *) palloc0(maskSize + VARHDRSZ);
SET_VARSIZE(initialLookupRecord, maskSize + VARHDRSZ);

bytea *initialSpecRecord = (bytea *) palloc0(maskSize + VARHDRSZ);
SET_VARSIZE(initialSpecRecord, maskSize + VARHDRSZ);

int64 nextSeqId = nextval_internal(columnarRowMaskSeq, false);

Datum values[Natts_columnar_row_mask] = {
Expand All @@ -630,11 +637,15 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeId,
Int64GetDatum(chunkIterationEndRowNumber),
Int32GetDatum(0),
0, /* to be filled below */
0,
};

values[Anum_columnar_row_mask_mask - 1] =
PointerGetDatum(initialLookupRecord);

values[Anum_columnar_row_mask_spec_mask - 1] =
PointerGetDatum(initialSpecRecord);

bool nulls[Natts_columnar_row_mask] = { false };

/*
Expand Down Expand Up @@ -798,7 +809,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
/*
* ReadChunkRowMask fetches chunk row mask for columnar relation.
*/
bytea *
ChunkRowMasks *
ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot,
MemoryContext cxt,
uint64 stripeFirstRowNumber, int rowCount)
Expand All @@ -820,7 +831,12 @@ ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot,
(rowCount / 8);

bytea *chunkRowMaskBytea = (bytea *) palloc0(chunkMaskSize + VARHDRSZ);
bytea *chunkSpecMaskBytea = (bytea *) palloc(chunkMaskSize + VARHDRSZ);

SET_VARSIZE(chunkRowMaskBytea, chunkMaskSize + VARHDRSZ);
SET_VARSIZE(chunkSpecMaskBytea, chunkMaskSize + VARHDRSZ);

ChunkRowMasks *masks = palloc0(sizeof(ChunkRowMasks));

ScanKeyInit(&scanKey[0], Anum_columnar_row_mask_storage_id,
BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(storageId));
Expand All @@ -847,6 +863,12 @@ ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot,
VARDATA(currentRowMask),
VARSIZE_ANY_EXHDR(currentRowMask));

bytea * currentSpecMask = DatumGetByteaP(datumArray[Anum_columnar_row_mask_spec_mask - 1]);

memcpy(VARDATA(chunkSpecMaskBytea) + pos,
VARDATA(currentSpecMask),
VARSIZE_ANY_EXHDR(currentSpecMask));

pos += VARSIZE_ANY_EXHDR(currentRowMask);
}

Expand All @@ -856,15 +878,19 @@ ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot,
index_close(index, AccessShareLock);
table_close(columnarRowMask, AccessShareLock);

return chunkRowMaskBytea;
masks->chunkRowDeletedMaskBytea = chunkRowMaskBytea;
masks->chunkRowSpecMaskBytea = chunkSpecMaskBytea;

return masks;
}


bool
UpdateRowMask(RelFileNode relfilenode, uint64 storageId,
Snapshot snapshot, uint64 rowNumber)
Snapshot snapshot, uint64 rowNumber, enum UpdateMaskType updateType)
{
bytea *rowMask = NULL;
bytea *specMask = NULL;

RowMaskWriteStateEntry *rowMaskEntry =
RowMaskFindWriteState(relfilenode.relNode, GetCurrentSubTransactionId(), rowNumber);
Expand Down Expand Up @@ -901,10 +927,15 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId,
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_mask,
tupleDescriptor, &isnull));

Pointer sMask = DatumGetPointer(
fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_spec_mask,
tupleDescriptor, &isnull));

rowMaskEntry = RowMaskInitWriteState(relfilenode.relNode,
storageId,
GetCurrentSubTransactionId(),
DatumGetByteaP(mask));
DatumGetByteaP(mask),
DatumGetByteaP(sMask));
/*
* Populate row mask cache with values from heap table
*/
Expand Down Expand Up @@ -937,6 +968,7 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId,
tupleDescriptor, &isnull));

rowMask = rowMaskEntry->mask;
specMask = rowMaskEntry->specMask;
}

systable_endscan_ordered(scanDescriptor);
Expand All @@ -946,22 +978,49 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId,
else
{
rowMask = rowMaskEntry->mask;
specMask = rowMaskEntry->specMask;
}

int16 rowByteMask = rowNumber - rowMaskEntry->startRowNumber;

/*
* IF we have been blocked by advisory lock for storage, maybe row
* was delete by some other transaction.
*/
if (VARDATA(rowMask)[rowByteMask / 8] & (1 << (rowByteMask % 8)))
return false;
if (updateType == Delete || updateType == ClearSpeculativeInsertAndDelete)
{
/*
* IF we have been blocked by advisory lock for storage, maybe row
* was delete by some other transaction.
*/
if (VARDATA(rowMask)[rowByteMask / 8] & (1 << (rowByteMask % 8)))
return false;

VARDATA(rowMask)[rowByteMask / 8] |= 1 << (rowByteMask % 8);
VARDATA(rowMask)[rowByteMask / 8] |= 1 << (rowByteMask % 8);

rowMaskEntry->deletedRows++;
rowMaskEntry->deletedRows++;

CommandCounterIncrement();
if (updateType == ClearSpeculativeInsertAndDelete)
{
VARDATA(specMask)[rowByteMask / 8] |= 0 << (rowByteMask % 8);
}

CommandCounterIncrement();

return true;
}
else if (updateType == ClearSpeculativeInsert)
{
VARDATA(specMask)[rowByteMask / 8] |= 0 << (rowByteMask % 8);

CommandCounterIncrement();

return true;
}
else if (updateType == SpeculativeInsert)
{
VARDATA(specMask)[rowByteMask / 8] |= 1 << (rowByteMask % 8);

CommandCounterIncrement();

return true;
}

return true;
}
Expand Down Expand Up @@ -1005,6 +1064,10 @@ void FlushRowMaskCache(RowMaskWriteStateEntry *rowMaskEntry)
update[Anum_columnar_row_mask_mask - 1] = true;
values[Anum_columnar_row_mask_mask - 1] = PointerGetDatum(rowMaskEntry->mask);

// Update the speculative mask array
update[Anum_columnar_row_mask_spec_mask - 1] = true;
values[Anum_columnar_row_mask_spec_mask - 1] = PointerGetDatum(rowMaskEntry->specMask);

HeapTuple newHeapTuple = heap_modify_tuple(oldHeapTuple, tupleDescriptor,
values, nulls, update);

Expand Down
22 changes: 16 additions & 6 deletions columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "utils/rel.h"

#include "columnar/columnar.h"
#include "columnar/columnar_metadata.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_tableam.h"
#include "columnar/columnar_version_compat.h"
Expand All @@ -54,6 +55,7 @@ typedef struct ChunkGroupReadState
List *projectedColumnList; /* borrowed reference */
ChunkData *chunkGroupData;
bytea *rowMask;
bytea *specMask;
bool rowMaskCached; /* If rowMask metadata is cached and borrowed */
uint32 chunkStripeRowOffset;
uint32 chunkGroupDeletedRows;
Expand Down Expand Up @@ -590,18 +592,21 @@ ReadStripeRowByRowNumber(ColumnarReadState *readState,
}
else if (stripeReadState->chunkGroupReadState->chunkGroupDeletedRows > 0)
{
stripeReadState->chunkGroupReadState->rowMask =
ReadChunkRowMask(stripeReadState->relation->rd_node,
ChunkRowMasks *masks = ReadChunkRowMask(stripeReadState->relation->rd_node,
readState->snapshot,
stripeReadState->stripeReadContext,
chunkFirstRowNumber,
stripeReadState->chunkGroupReadState->rowCount);
stripeReadState->chunkGroupReadState->rowMask = masks->chunkRowDeletedMaskBytea;
stripeReadState->chunkGroupReadState->specMask = masks->chunkRowSpecMaskBytea;

stripeReadState->chunkGroupReadState->rowMaskCached = false;
}
}
else
{
stripeReadState->chunkGroupReadState->rowMask = NULL;
stripeReadState->chunkGroupReadState->specMask = NULL;
}
}

Expand Down Expand Up @@ -936,17 +941,20 @@ ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
stripeFirstRowNumber +
stripeReadState->chunkGroupReadState->chunkStripeRowOffset;

stripeReadState->chunkGroupReadState->rowMask =
ReadChunkRowMask(stripeReadState->relation->rd_node,
ChunkRowMasks *masks = ReadChunkRowMask(stripeReadState->relation->rd_node,
snapshot,
stripeReadState->stripeReadContext,
chunkFirstRowNumber,
stripeReadState->chunkGroupReadState->rowCount);
stripeReadState->chunkGroupReadState->rowMask = masks->chunkRowDeletedMaskBytea;
stripeReadState->chunkGroupReadState->specMask = masks->chunkRowSpecMaskBytea;

stripeReadState->chunkGroupReadState->rowMaskCached = false;
}
else
{
stripeReadState->chunkGroupReadState->rowMask = NULL;
stripeReadState->chunkGroupReadState->specMask = NULL;
}
}

Expand Down Expand Up @@ -2035,17 +2043,19 @@ ReadStripeNextVector(StripeReadState *stripeReadState, Datum *columnValues,
if (columnar_enable_dml &&
stripeReadState->chunkGroupReadState->chunkGroupDeletedRows != 0)
{
stripeReadState->chunkGroupReadState->rowMask =
ReadChunkRowMask(stripeReadState->relation->rd_node,
ChunkRowMasks *masks = ReadChunkRowMask(stripeReadState->relation->rd_node,
snapshot,
stripeReadState->stripeReadContext,
chunkFirstRowNumber,
stripeReadState->chunkGroupReadState->rowCount);

stripeReadState->chunkGroupReadState->rowMask = masks->chunkRowDeletedMaskBytea;
stripeReadState->chunkGroupReadState->specMask = masks->chunkRowSpecMaskBytea;
}
else
{
stripeReadState->chunkGroupReadState->rowMask = NULL;
stripeReadState->chunkGroupReadState->specMask = NULL;
}
}
else
Expand Down
Loading

0 comments on commit 0f56257

Please sign in to comment.