From 0f562577edba0da2bffa21f2c283b19fec0568ed Mon Sep 17 00:00:00 2001 From: Jerry Sievert Date: Tue, 17 Oct 2023 12:41:55 -0700 Subject: [PATCH] speculative insert checkpoint --- .../src/backend/columnar/columnar.control | 2 +- .../src/backend/columnar/columnar_metadata.c | 91 ++++- .../src/backend/columnar/columnar_reader.c | 22 +- .../src/backend/columnar/columnar_tableam.c | 316 +++++++++++++++++- .../sql/columnar--11.1-9--11.1-10.sql | 3 + .../backend/columnar/write_state_row_mask.c | 13 +- columnar/src/include/columnar/columnar.h | 7 +- .../src/include/columnar/columnar_metadata.h | 9 + .../columnar/columnar_write_state_row_mask.h | 5 + 9 files changed, 435 insertions(+), 33 deletions(-) create mode 100644 columnar/src/backend/columnar/sql/columnar--11.1-9--11.1-10.sql diff --git a/columnar/src/backend/columnar/columnar.control b/columnar/src/backend/columnar/columnar.control index aee7c034..4fed3c92 100644 --- a/columnar/src/backend/columnar/columnar.control +++ b/columnar/src/backend/columnar/columnar.control @@ -1,4 +1,4 @@ comment = 'Hydra Columnar extension' -default_version = '11.1-9' +default_version = '11.1-10' module_pathname = '$libdir/columnar' relocatable = false diff --git a/columnar/src/backend/columnar/columnar_metadata.c b/columnar/src/backend/columnar/columnar_metadata.c index 90b2e8d1..7143e835 100644 --- a/columnar/src/backend/columnar/columnar_metadata.c +++ b/columnar/src/backend/columnar/columnar_metadata.c @@ -20,6 +20,7 @@ */ +#include "c.h" #include "postgres.h" #include "safe_lib.h" @@ -29,6 +30,7 @@ #include "columnar/columnar_metadata.h" #include "columnar/columnar_storage.h" #include "columnar/columnar_version_compat.h" +#include "columnar/columnar_write_state_row_mask.h" #include "columnar/utils/listutils.h" #include @@ -46,6 +48,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/spi.h" +#include "fmgr.h" #include "miscadmin.h" #include "nodes/execnodes.h" #include "lib/stringinfo.h" @@ -208,7 +211,7 @@ typedef FormData_columnar_options *Form_columnar_options; #define Anum_columnar_chunk_value_count 14 /* constants for columnar.row_mask */ -#define Natts_columnar_row_mask 8 +#define Natts_columnar_row_mask 9 #define Anum_columnar_row_mask_id 1 #define Anum_columnar_row_mask_storage_id 2 #define Anum_columnar_row_mask_stripe_id 3 @@ -217,6 +220,7 @@ typedef FormData_columnar_options *Form_columnar_options; #define Anum_columnar_row_mask_end_row_number 6 #define Anum_columnar_row_mask_deleted_rows 7 #define Anum_columnar_row_mask_mask 8 +#define Anum_columnar_row_mask_spec_mask 9 /* @@ -619,6 +623,9 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeId, bytea *initialLookupRecord = (bytea *) palloc0(maskSize + VARHDRSZ); SET_VARSIZE(initialLookupRecord, maskSize + VARHDRSZ); + bytea *initialSpecRecord = (bytea *) palloc0(maskSize + VARHDRSZ); + SET_VARSIZE(initialSpecRecord, maskSize + VARHDRSZ); + int64 nextSeqId = nextval_internal(columnarRowMaskSeq, false); Datum values[Natts_columnar_row_mask] = { @@ -630,11 +637,15 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeId, Int64GetDatum(chunkIterationEndRowNumber), Int32GetDatum(0), 0, /* to be filled below */ + 0, }; values[Anum_columnar_row_mask_mask - 1] = PointerGetDatum(initialLookupRecord); + values[Anum_columnar_row_mask_spec_mask - 1] = + PointerGetDatum(initialSpecRecord); + bool nulls[Natts_columnar_row_mask] = { false }; /* @@ -798,7 +809,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri /* * ReadChunkRowMask fetches chunk row mask for columnar relation. */ -bytea * +ChunkRowMasks * ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot, MemoryContext cxt, uint64 stripeFirstRowNumber, int rowCount) @@ -820,7 +831,12 @@ ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot, (rowCount / 8); bytea *chunkRowMaskBytea = (bytea *) palloc0(chunkMaskSize + VARHDRSZ); + bytea *chunkSpecMaskBytea = (bytea *) palloc(chunkMaskSize + VARHDRSZ); + SET_VARSIZE(chunkRowMaskBytea, chunkMaskSize + VARHDRSZ); + SET_VARSIZE(chunkSpecMaskBytea, chunkMaskSize + VARHDRSZ); + + ChunkRowMasks *masks = palloc0(sizeof(ChunkRowMasks)); ScanKeyInit(&scanKey[0], Anum_columnar_row_mask_storage_id, BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(storageId)); @@ -847,6 +863,12 @@ ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot, VARDATA(currentRowMask), VARSIZE_ANY_EXHDR(currentRowMask)); + bytea * currentSpecMask = DatumGetByteaP(datumArray[Anum_columnar_row_mask_spec_mask - 1]); + + memcpy(VARDATA(chunkSpecMaskBytea) + pos, + VARDATA(currentSpecMask), + VARSIZE_ANY_EXHDR(currentSpecMask)); + pos += VARSIZE_ANY_EXHDR(currentRowMask); } @@ -856,15 +878,19 @@ ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot, index_close(index, AccessShareLock); table_close(columnarRowMask, AccessShareLock); - return chunkRowMaskBytea; + masks->chunkRowDeletedMaskBytea = chunkRowMaskBytea; + masks->chunkRowSpecMaskBytea = chunkSpecMaskBytea; + + return masks; } bool UpdateRowMask(RelFileNode relfilenode, uint64 storageId, - Snapshot snapshot, uint64 rowNumber) + Snapshot snapshot, uint64 rowNumber, enum UpdateMaskType updateType) { bytea *rowMask = NULL; + bytea *specMask = NULL; RowMaskWriteStateEntry *rowMaskEntry = RowMaskFindWriteState(relfilenode.relNode, GetCurrentSubTransactionId(), rowNumber); @@ -901,10 +927,15 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId, fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_mask, tupleDescriptor, &isnull)); + Pointer sMask = DatumGetPointer( + fastgetattr(rowMaskHeapTuple, Anum_columnar_row_mask_spec_mask, + tupleDescriptor, &isnull)); + rowMaskEntry = RowMaskInitWriteState(relfilenode.relNode, storageId, GetCurrentSubTransactionId(), - DatumGetByteaP(mask)); + DatumGetByteaP(mask), + DatumGetByteaP(sMask)); /* * Populate row mask cache with values from heap table */ @@ -937,6 +968,7 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId, tupleDescriptor, &isnull)); rowMask = rowMaskEntry->mask; + specMask = rowMaskEntry->specMask; } systable_endscan_ordered(scanDescriptor); @@ -946,22 +978,49 @@ UpdateRowMask(RelFileNode relfilenode, uint64 storageId, else { rowMask = rowMaskEntry->mask; + specMask = rowMaskEntry->specMask; } int16 rowByteMask = rowNumber - rowMaskEntry->startRowNumber; - /* - * IF we have been blocked by advisory lock for storage, maybe row - * was delete by some other transaction. - */ - if (VARDATA(rowMask)[rowByteMask / 8] & (1 << (rowByteMask % 8))) - return false; + if (updateType == Delete || updateType == ClearSpeculativeInsertAndDelete) + { + /* + * IF we have been blocked by advisory lock for storage, maybe row + * was delete by some other transaction. + */ + if (VARDATA(rowMask)[rowByteMask / 8] & (1 << (rowByteMask % 8))) + return false; - VARDATA(rowMask)[rowByteMask / 8] |= 1 << (rowByteMask % 8); + VARDATA(rowMask)[rowByteMask / 8] |= 1 << (rowByteMask % 8); - rowMaskEntry->deletedRows++; + rowMaskEntry->deletedRows++; - CommandCounterIncrement(); + if (updateType == ClearSpeculativeInsertAndDelete) + { + VARDATA(specMask)[rowByteMask / 8] |= 0 << (rowByteMask % 8); + } + + CommandCounterIncrement(); + + return true; + } + else if (updateType == ClearSpeculativeInsert) + { + VARDATA(specMask)[rowByteMask / 8] |= 0 << (rowByteMask % 8); + + CommandCounterIncrement(); + + return true; + } + else if (updateType == SpeculativeInsert) + { + VARDATA(specMask)[rowByteMask / 8] |= 1 << (rowByteMask % 8); + + CommandCounterIncrement(); + + return true; + } return true; } @@ -1005,6 +1064,10 @@ void FlushRowMaskCache(RowMaskWriteStateEntry *rowMaskEntry) update[Anum_columnar_row_mask_mask - 1] = true; values[Anum_columnar_row_mask_mask - 1] = PointerGetDatum(rowMaskEntry->mask); + // Update the speculative mask array + update[Anum_columnar_row_mask_spec_mask - 1] = true; + values[Anum_columnar_row_mask_spec_mask - 1] = PointerGetDatum(rowMaskEntry->specMask); + HeapTuple newHeapTuple = heap_modify_tuple(oldHeapTuple, tupleDescriptor, values, nulls, update); diff --git a/columnar/src/backend/columnar/columnar_reader.c b/columnar/src/backend/columnar/columnar_reader.c index 4aa963f2..f93bdd29 100644 --- a/columnar/src/backend/columnar/columnar_reader.c +++ b/columnar/src/backend/columnar/columnar_reader.c @@ -38,6 +38,7 @@ #include "utils/rel.h" #include "columnar/columnar.h" +#include "columnar/columnar_metadata.h" #include "columnar/columnar_storage.h" #include "columnar/columnar_tableam.h" #include "columnar/columnar_version_compat.h" @@ -54,6 +55,7 @@ typedef struct ChunkGroupReadState List *projectedColumnList; /* borrowed reference */ ChunkData *chunkGroupData; bytea *rowMask; + bytea *specMask; bool rowMaskCached; /* If rowMask metadata is cached and borrowed */ uint32 chunkStripeRowOffset; uint32 chunkGroupDeletedRows; @@ -590,18 +592,21 @@ ReadStripeRowByRowNumber(ColumnarReadState *readState, } else if (stripeReadState->chunkGroupReadState->chunkGroupDeletedRows > 0) { - stripeReadState->chunkGroupReadState->rowMask = - ReadChunkRowMask(stripeReadState->relation->rd_node, + ChunkRowMasks *masks = ReadChunkRowMask(stripeReadState->relation->rd_node, readState->snapshot, stripeReadState->stripeReadContext, chunkFirstRowNumber, stripeReadState->chunkGroupReadState->rowCount); + stripeReadState->chunkGroupReadState->rowMask = masks->chunkRowDeletedMaskBytea; + stripeReadState->chunkGroupReadState->specMask = masks->chunkRowSpecMaskBytea; + stripeReadState->chunkGroupReadState->rowMaskCached = false; } } else { stripeReadState->chunkGroupReadState->rowMask = NULL; + stripeReadState->chunkGroupReadState->specMask = NULL; } } @@ -936,17 +941,20 @@ ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues, stripeFirstRowNumber + stripeReadState->chunkGroupReadState->chunkStripeRowOffset; - stripeReadState->chunkGroupReadState->rowMask = - ReadChunkRowMask(stripeReadState->relation->rd_node, + ChunkRowMasks *masks = ReadChunkRowMask(stripeReadState->relation->rd_node, snapshot, stripeReadState->stripeReadContext, chunkFirstRowNumber, stripeReadState->chunkGroupReadState->rowCount); + stripeReadState->chunkGroupReadState->rowMask = masks->chunkRowDeletedMaskBytea; + stripeReadState->chunkGroupReadState->specMask = masks->chunkRowSpecMaskBytea; + stripeReadState->chunkGroupReadState->rowMaskCached = false; } else { stripeReadState->chunkGroupReadState->rowMask = NULL; + stripeReadState->chunkGroupReadState->specMask = NULL; } } @@ -2035,17 +2043,19 @@ ReadStripeNextVector(StripeReadState *stripeReadState, Datum *columnValues, if (columnar_enable_dml && stripeReadState->chunkGroupReadState->chunkGroupDeletedRows != 0) { - stripeReadState->chunkGroupReadState->rowMask = - ReadChunkRowMask(stripeReadState->relation->rd_node, + ChunkRowMasks *masks = ReadChunkRowMask(stripeReadState->relation->rd_node, snapshot, stripeReadState->stripeReadContext, chunkFirstRowNumber, stripeReadState->chunkGroupReadState->rowCount); + stripeReadState->chunkGroupReadState->rowMask = masks->chunkRowDeletedMaskBytea; + stripeReadState->chunkGroupReadState->specMask = masks->chunkRowSpecMaskBytea; } else { stripeReadState->chunkGroupReadState->rowMask = NULL; + stripeReadState->chunkGroupReadState->specMask = NULL; } } else diff --git a/columnar/src/backend/columnar/columnar_tableam.c b/columnar/src/backend/columnar/columnar_tableam.c index f0f56e32..4b2a4495 100644 --- a/columnar/src/backend/columnar/columnar_tableam.c +++ b/columnar/src/backend/columnar/columnar_tableam.c @@ -861,7 +861,36 @@ columnar_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate, uint32 specToken) { - elog(ERROR, "columnar_tuple_insert_speculative not implemented"); + previousCacheEnabledState = columnar_enable_page_cache; + columnar_enable_page_cache = false; + + /* + * columnar_init_write_state allocates the write state in a longer + * lasting context, so no need to worry about it. + */ + ColumnarWriteState *writeState = columnar_init_write_state(relation, + RelationGetDescr(relation), + slot->tts_tableOid, + GetCurrentSubTransactionId()); + MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext( + writeState)); + + ColumnarCheckLogicalReplication(relation); + + slot_getallattrs(slot); + + Datum *values = detoast_values(slot->tts_tupleDescriptor, + slot->tts_values, slot->tts_isnull); + + uint64 storageId = LookupStorageId(relation->rd_node); + + uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull); + UpdateRowMask(relation->rd_node, storageId, NULL, writtenRowNumber, SpeculativeInsert); + + slot->tts_tid = row_number_to_tid(writtenRowNumber); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(ColumnarWritePerTupleContext(writeState)); } @@ -869,7 +898,27 @@ static void columnar_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, uint32 specToken, bool succeeded) { - elog(ERROR, "columnar_tuple_complete_speculative not implemented"); + uint64 rowNumber = tid_to_row_number(slot->tts_tid); + + uint64 storageId = LookupStorageId(relation->rd_node); + + /* Set lock for relation until transaction ends */ + DirectFunctionCall1(pg_advisory_xact_lock_int8, + Int64GetDatum((int64) storageId)); + + + if (succeeded) + { + UpdateRowMask(relation->rd_node, storageId, NULL, rowNumber, ClearSpeculativeInsert); + pgstat_count_heap_insert(relation, 1); + } + else + { + UpdateRowMask(relation->rd_node, storageId, NULL, rowNumber, ClearSpeculativeInsertAndDelete); + pgstat_count_heap_delete(relation); + } + + columnar_enable_page_cache = previousCacheEnabledState; } @@ -922,7 +971,7 @@ columnar_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, DirectFunctionCall1(pg_advisory_xact_lock_int8, Int64GetDatum((int64) storageId)); - if (!UpdateRowMask(relation->rd_node, storageId, snapshot, rowNumber)) + if (!UpdateRowMask(relation->rd_node, storageId, snapshot, rowNumber, Delete)) return TM_Deleted; pgstat_count_heap_delete(relation); @@ -945,7 +994,7 @@ columnar_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, DirectFunctionCall1(pg_advisory_xact_lock_int8, Int64GetDatum((int64) storageId)); - if (!UpdateRowMask(relation->rd_node, storageId, snapshot, rowNumber)) + if (!UpdateRowMask(relation->rd_node, storageId, snapshot, rowNumber, Delete)) return TM_Deleted; columnar_tuple_insert(relation, slot, cid, 0, NULL); @@ -958,13 +1007,270 @@ columnar_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, } +static TM_Result +columnar_tuple_satisfies_update(HeapTuple htup, CommandId curcid, + Buffer buffer) +{ + HeapTupleHeader tuple = htup->t_data; + + Assert(ItemPointerIsValid(&htup->t_self)); + Assert(htup->t_tableOid != InvalidOid); +#if 0 + if (!HeapTupleHeaderXminCommitted(tuple)) + { + if (HeapTupleHeaderXminInvalid(tuple)) + return TM_Invisible; + + /* Used by pre-9.0 binary upgrades */ + if (tuple->t_infomask & HEAP_MOVED_OFF) + { + TransactionId xvac = HeapTupleHeaderGetXvac(tuple); + + if (TransactionIdIsCurrentTransactionId(xvac)) + return TM_Invisible; + if (!TransactionIdIsInProgress(xvac)) + { + if (TransactionIdDidCommit(xvac)) + { + return TM_Invisible; + } + } + } + + } +#endif + return TM_Ok; +} + + + static TM_Result columnar_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd) { - elog(ERROR, "columnar_tuple_lock not implemented"); + BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; + TM_Result result; + Buffer buffer; + HeapTuple tuple = &bslot->base.tupdata; + bool follow_updates; + BlockNumber block; + follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0; + tmfd->traversed = false; + + return TM_Ok; +elog(WARNING, "before assert"); + Assert(TTS_IS_BUFFERTUPLE(slot)); +elog(WARNING, "after assert"); + +tuple_lock_retry: + tuple->t_self = *tid; + + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); + block = ItemPointerGetBlockNumber(tid); + elog(WARNING, "buffer = %d, block = %d", buffer, block); + + result = columnar_tuple_satisfies_update(tuple, cid, buffer); + elog(WARNING, "result = %d", result); +#if 0 + result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, + follow_updates, &buffer, tmfd); + + elog(WARNING, "result = %d", result); + if (result == TM_Updated && + (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) + { + elog(WARNING, "in if"); + /* Should not encounter speculative tuple on recheck */ + Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data)); + + ReleaseBuffer(buffer); + + if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self)) + { + SnapshotData SnapshotDirty; + TransactionId priorXmax; + + /* it was updated, so look at the updated version */ + *tid = tmfd->ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = tmfd->xmax; + + /* signal that a tuple later in the chain is getting locked */ + tmfd->traversed = true; + + /* + * fetch target tuple + * + * Loop here to deal with updated or busy tuples + */ + InitDirtySnapshot(SnapshotDirty); + for (;;) + { + if (ItemPointerIndicatesMovedPartitions(tid)) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update"))); + + tuple->t_self = *tid; + if (heap_fetch_extended(relation, &SnapshotDirty, tuple, + &buffer, true)) + { + /* + * If xmin isn't what we're expecting, the slot must have + * been recycled and reused for an unrelated tuple. This + * implies that the latest version of the row was deleted, + * so we need do nothing. (Should be safe to examine xmin + * without getting buffer's content lock. We assume + * reading a TransactionId to be atomic, and Xmin never + * changes in an existing tuple, except to invalid or + * frozen, and neither of those can match priorXmax.) + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), + priorXmax)) + { + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* otherwise xmin should not be dirty... */ + if (TransactionIdIsValid(SnapshotDirty.xmin)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("t_xmin %u is uncommitted in tuple (%u,%u) to be updated in table \"%s\"", + SnapshotDirty.xmin, + ItemPointerGetBlockNumber(&tuple->t_self), + ItemPointerGetOffsetNumber(&tuple->t_self), + RelationGetRelationName(relation)))); + + /* + * If tuple is being updated by other transaction then we + * have to wait for its commit/abort, or die trying. + */ + if (TransactionIdIsValid(SnapshotDirty.xmax)) + { + ReleaseBuffer(buffer); + switch (wait_policy) + { + case LockWaitBlock: + XactLockTableWait(SnapshotDirty.xmax, + relation, &tuple->t_self, + XLTW_FetchUpdated); + break; + case LockWaitSkip: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + /* skip instead of waiting */ + return TM_WouldBlock; + break; + case LockWaitError: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on row in relation \"%s\"", + RelationGetRelationName(relation)))); + break; + } + continue; /* loop back to repeat heap_fetch */ + } + + /* + * If tuple was inserted by our own transaction, we have + * to check cmin against cid: cmin >= current CID means + * our command cannot see the tuple, so we should ignore + * it. Otherwise heap_lock_tuple() will throw an error, + * and so would any later attempt to update or delete the + * tuple. (We need not check cmax because + * HeapTupleSatisfiesDirty will consider a tuple deleted + * by our transaction dead, regardless of cmax.) We just + * checked that priorXmax == xmin, so we can test that + * variable instead of doing HeapTupleHeaderGetXmin again. + */ + if (TransactionIdIsCurrentTransactionId(priorXmax) && + HeapTupleHeaderGetCmin(tuple->t_data) >= cid) + { + tmfd->xmax = priorXmax; + + /* + * Cmin is the problematic value, so store that. See + * above. + */ + tmfd->cmax = HeapTupleHeaderGetCmin(tuple->t_data); + ReleaseBuffer(buffer); + return TM_SelfModified; + } + + /* + * This is a live tuple, so try to lock it again. + */ + ReleaseBuffer(buffer); + goto tuple_lock_retry; + } + + /* + * If the referenced slot was actually empty, the latest + * version of the row must have been deleted, so we need do + * nothing. + */ + if (tuple->t_data == NULL) + { + Assert(!BufferIsValid(buffer)); + return TM_Deleted; + } + + /* + * As above, if xmin isn't what we're expecting, do nothing. + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), + priorXmax)) + { + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* + * If we get here, the tuple was found but failed + * SnapshotDirty. Assuming the xmin is either a committed xact + * or our own xact (as it certainly should be if we're trying + * to modify the tuple), this must mean that the row was + * updated or deleted by either a committed xact or our own + * xact. If it was deleted, we can ignore it; if it was + * updated then chain up to the next version and repeat the + * whole process. + * + * As above, it should be safe to examine xmax and t_ctid + * without the buffer content lock, because they can't be + * changing. We'd better hold a buffer pin though. + */ + if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid)) + { + /* deleted, so forget about it */ + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* updated, so look at the updated row */ + *tid = tuple->t_data->t_ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + ReleaseBuffer(buffer); + /* loop back to fetch next in chain */ + } + } + else + { + /* tuple was deleted, so give up */ + return TM_Deleted; + } + } + + slot->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slot->tts_tableOid; + + /* store in slot, transferring existing pin */ + ExecStorePinnedBufferHeapTuple(tuple, slot, buffer); +#endif + return result; } diff --git a/columnar/src/backend/columnar/sql/columnar--11.1-9--11.1-10.sql b/columnar/src/backend/columnar/sql/columnar--11.1-9--11.1-10.sql new file mode 100644 index 00000000..daea0b1c --- /dev/null +++ b/columnar/src/backend/columnar/sql/columnar--11.1-9--11.1-10.sql @@ -0,0 +1,3 @@ +-- columnar--11.1-9--11.1-10.sql + +ALTER TABLE columnar.row_mask ADD spec_mask BYTEA; diff --git a/columnar/src/backend/columnar/write_state_row_mask.c b/columnar/src/backend/columnar/write_state_row_mask.c index ce831f6a..3ea65222 100644 --- a/columnar/src/backend/columnar/write_state_row_mask.c +++ b/columnar/src/backend/columnar/write_state_row_mask.c @@ -25,7 +25,7 @@ #include "columnar/columnar_tableam.h" static RowMaskWriteStateEntry *InitRowMaskEntry(uint64 storageId, - bytea *rowMask); + bytea *rowMask, bytea *specMask); static void RowMaskFlushPendingWriteState(List *rowMaskWriteStateList); /* @@ -92,7 +92,8 @@ RowMaskWriteStateEntry * RowMaskInitWriteState(Oid relfilenode, uint64 storageId, SubTransactionId currentSubXid, - bytea *rowMask) + bytea *rowMask, + bytea *specMask) { bool found; @@ -156,7 +157,7 @@ RowMaskInitWriteState(Oid relfilenode, hashEntry->writeStateStack = stackEntry; } - RowMaskWriteStateEntry *RowMaskWriteState = InitRowMaskEntry(storageId, rowMask); + RowMaskWriteStateEntry *RowMaskWriteState = InitRowMaskEntry(storageId, rowMask, specMask); stackEntry->rowMaskWriteStateEntryList = lappend(stackEntry->rowMaskWriteStateEntryList, RowMaskWriteState); @@ -167,13 +168,17 @@ RowMaskInitWriteState(Oid relfilenode, } static RowMaskWriteStateEntry * -InitRowMaskEntry(uint64 storageId, bytea *mask) +InitRowMaskEntry(uint64 storageId, bytea *mask, bytea *specMask) { RowMaskWriteStateEntry *rowMask = palloc0(sizeof(RowMaskWriteStateEntry)); rowMask->storageId = storageId; rowMask->mask = (bytea *)palloc0(VARSIZE(mask) + VARHDRSZ); memcpy(rowMask->mask, mask, VARSIZE(mask) + VARHDRSZ); + + rowMask->specMask = (bytea *)palloc0(VARSIZE(specMask) + VARHDRSZ); + memcpy(rowMask->specMask, specMask, VARSIZE(specMask) + VARHDRSZ); + // rest of structure members needs be populated where RowMaskState was created return rowMask; diff --git a/columnar/src/include/columnar/columnar.h b/columnar/src/include/columnar/columnar.h index 463e6e23..1af3b342 100644 --- a/columnar/src/include/columnar/columnar.h +++ b/columnar/src/include/columnar/columnar.h @@ -369,9 +369,9 @@ extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS); extern bool SaveEmptyRowMask(uint64 storageId, uint64 stripeId, uint64 stripeStartRowNumber, List *chunkGroupRowCounts); extern bool UpdateRowMask(RelFileNode relfilenode, uint64 storageId, - Snapshot snapshot, uint64 rowNumber); + Snapshot snapshot, uint64 rowNumber, enum UpdateMaskType updateType); extern void FlushRowMaskCache(RowMaskWriteStateEntry *rowMaskEntry); -extern bytea * ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot, +extern ChunkRowMasks * ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot, MemoryContext ctx, uint64 stripeFirstRowNumber, int rowCount); extern Datum create_table_row_mask(PG_FUNCTION_ARGS); @@ -413,7 +413,8 @@ extern MemoryContext GetColumnarWriteContextForDebug(void); extern RowMaskWriteStateEntry * RowMaskInitWriteState(Oid relfilenode, uint64 storageId, SubTransactionId currentSubXid, - bytea *rowMask); + bytea *rowMask, + bytea *specMask); extern void RowMaskFlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid); extern RowMaskWriteStateEntry * RowMaskFindWriteState(Oid relfilenode, diff --git a/columnar/src/include/columnar/columnar_metadata.h b/columnar/src/include/columnar/columnar_metadata.h index 16d9641d..9b40c6e2 100644 --- a/columnar/src/include/columnar/columnar_metadata.h +++ b/columnar/src/include/columnar/columnar_metadata.h @@ -51,6 +51,15 @@ typedef struct EmptyStripeReservation uint64 stripeFirstRowNumber; } EmptyStripeReservation; +/* + * Contains any row masks, currently deletion and speculative. + */ +typedef struct ChunkRowMasks +{ + bytea *chunkRowDeletedMaskBytea; + bytea *chunkRowSpecMaskBytea; +} ChunkRowMasks; + extern List * StripesForRelfilenode(RelFileNode relfilenode, ScanDirection scanDirection); extern uint32 DeletedRowsForStripe(RelFileNode relfilenode, uint32 chunkCount, diff --git a/columnar/src/include/columnar/columnar_write_state_row_mask.h b/columnar/src/include/columnar/columnar_write_state_row_mask.h index 9cb48302..e3f1aef6 100644 --- a/columnar/src/include/columnar/columnar_write_state_row_mask.h +++ b/columnar/src/include/columnar/columnar_write_state_row_mask.h @@ -22,6 +22,11 @@ struct RowMaskWriteStateEntry int64 endRowNumber; int32 deletedRows; bytea *mask; + bytea *specMask; }; +enum +UpdateMaskType +{ Delete, SpeculativeInsert, ClearSpeculativeInsert, ClearSpeculativeInsertAndDelete }; + #endif \ No newline at end of file