Skip to content

Commit

Permalink
Merge branch 'spec-insert' of github.com:hydradatabase/hydra into spe…
Browse files Browse the repository at this point in the history
…c-insert
  • Loading branch information
JerrySievert committed Oct 18, 2023
2 parents 0f56257 + 0f3acd4 commit a6e6d9e
Showing 1 changed file with 22 additions and 210 deletions.
232 changes: 22 additions & 210 deletions columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -1050,227 +1050,39 @@ columnar_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
LockWaitPolicy wait_policy, uint8 flags,
TM_FailureData *tmfd)
{
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
TM_Result result;
Buffer buffer;
HeapTuple tuple = &bslot->base.tupdata;
bool follow_updates;
BlockNumber block;
follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0;
tmfd->traversed = false;
// copy/paste from columnar_fetch_row_version

return TM_Ok;
elog(WARNING, "before assert");
Assert(TTS_IS_BUFFERTUPLE(slot));
elog(WARNING, "after assert");

tuple_lock_retry:
tuple->t_self = *tid;

buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
block = ItemPointerGetBlockNumber(tid);
elog(WARNING, "buffer = %d, block = %d", buffer, block);

result = columnar_tuple_satisfies_update(tuple, cid, buffer);
elog(WARNING, "result = %d", result);
#if 0
result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
follow_updates, &buffer, tmfd);

elog(WARNING, "result = %d", result);
if (result == TM_Updated &&
(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
{
elog(WARNING, "in if");
/* Should not encounter speculative tuple on recheck */
Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));

ReleaseBuffer(buffer);

if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
{
SnapshotData SnapshotDirty;
TransactionId priorXmax;

/* it was updated, so look at the updated version */
*tid = tmfd->ctid;
/* updated row should have xmin matching this xmax */
priorXmax = tmfd->xmax;

/* signal that a tuple later in the chain is getting locked */
tmfd->traversed = true;

/*
* fetch target tuple
*
* Loop here to deal with updated or busy tuples
*/
InitDirtySnapshot(SnapshotDirty);
for (;;)
{
if (ItemPointerIndicatesMovedPartitions(tid))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("tuple to be locked was already moved to another partition due to concurrent update")));

tuple->t_self = *tid;
if (heap_fetch_extended(relation, &SnapshotDirty, tuple,
&buffer, true))
{
/*
* If xmin isn't what we're expecting, the slot must have
* been recycled and reused for an unrelated tuple. This
* implies that the latest version of the row was deleted,
* so we need do nothing. (Should be safe to examine xmin
* without getting buffer's content lock. We assume
* reading a TransactionId to be atomic, and Xmin never
* changes in an existing tuple, except to invalid or
* frozen, and neither of those can match priorXmax.)
*/
if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
priorXmax))
{
ReleaseBuffer(buffer);
return TM_Deleted;
}
uint64 rowNumber = tid_to_row_number(*tid);
ColumnarReadState *readState = NULL;

/* otherwise xmin should not be dirty... */
if (TransactionIdIsValid(SnapshotDirty.xmin))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg_internal("t_xmin %u is uncommitted in tuple (%u,%u) to be updated in table \"%s\"",
SnapshotDirty.xmin,
ItemPointerGetBlockNumber(&tuple->t_self),
ItemPointerGetOffsetNumber(&tuple->t_self),
RelationGetRelationName(relation))));

/*
* If tuple is being updated by other transaction then we
* have to wait for its commit/abort, or die trying.
*/
if (TransactionIdIsValid(SnapshotDirty.xmax))
{
ReleaseBuffer(buffer);
switch (wait_policy)
{
case LockWaitBlock:
XactLockTableWait(SnapshotDirty.xmax,
relation, &tuple->t_self,
XLTW_FetchUpdated);
break;
case LockWaitSkip:
if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
/* skip instead of waiting */
return TM_WouldBlock;
break;
case LockWaitError:
if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on row in relation \"%s\"",
RelationGetRelationName(relation))));
break;
}
continue; /* loop back to repeat heap_fetch */
}
int natts = relation->rd_att->natts;
Bitmapset *attr_needed = bms_add_range(NULL, 0, natts - 1);

/*
* If tuple was inserted by our own transaction, we have
* to check cmin against cid: cmin >= current CID means
* our command cannot see the tuple, so we should ignore
* it. Otherwise heap_lock_tuple() will throw an error,
* and so would any later attempt to update or delete the
* tuple. (We need not check cmax because
* HeapTupleSatisfiesDirty will consider a tuple deleted
* by our transaction dead, regardless of cmax.) We just
* checked that priorXmax == xmin, so we can test that
* variable instead of doing HeapTupleHeaderGetXmin again.
*/
if (TransactionIdIsCurrentTransactionId(priorXmax) &&
HeapTupleHeaderGetCmin(tuple->t_data) >= cid)
{
tmfd->xmax = priorXmax;

/*
* Cmin is the problematic value, so store that. See
* above.
*/
tmfd->cmax = HeapTupleHeaderGetCmin(tuple->t_data);
ReleaseBuffer(buffer);
return TM_SelfModified;
}
List *scanQual = NIL;

/*
* This is a live tuple, so try to lock it again.
*/
ReleaseBuffer(buffer);
goto tuple_lock_retry;
}
bool randomAccess = true;

/*
* If the referenced slot was actually empty, the latest
* version of the row must have been deleted, so we need do
* nothing.
*/
if (tuple->t_data == NULL)
{
Assert(!BufferIsValid(buffer));
return TM_Deleted;
}
readState = init_columnar_read_state(relation,
slot->tts_tupleDescriptor,
attr_needed, scanQual,
CurrentMemoryContext, // to be checked
snapshot, randomAccess,
NULL);

/*
* As above, if xmin isn't what we're expecting, do nothing.
*/
if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
priorXmax))
{
ReleaseBuffer(buffer);
return TM_Deleted;
}
// MemoryContext oldContext = MemoryContextSwitchTo(GetColumnarReadStateCache());

/*
* If we get here, the tuple was found but failed
* SnapshotDirty. Assuming the xmin is either a committed xact
* or our own xact (as it certainly should be if we're trying
* to modify the tuple), this must mean that the row was
* updated or deleted by either a committed xact or our own
* xact. If it was deleted, we can ignore it; if it was
* updated then chain up to the next version and repeat the
* whole process.
*
* As above, it should be safe to examine xmax and t_ctid
* without the buffer content lock, because they can't be
* changing. We'd better hold a buffer pin though.
*/
if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid))
{
/* deleted, so forget about it */
ReleaseBuffer(buffer);
return TM_Deleted;
}
ColumnarReadRowByRowNumber(readState, rowNumber,
slot->tts_values, slot->tts_isnull);

/* updated, so look at the updated row */
*tid = tuple->t_data->t_ctid;
/* updated row should have xmin matching this xmax */
priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
ReleaseBuffer(buffer);
/* loop back to fetch next in chain */
}
}
else
{
/* tuple was deleted, so give up */
return TM_Deleted;
}
}
// MemoryContextSwitchTo(oldContext);

slot->tts_tableOid = RelationGetRelid(relation);
tuple->t_tableOid = slot->tts_tableOid;
slot->tts_tid = *tid;

/* store in slot, transferring existing pin */
ExecStorePinnedBufferHeapTuple(tuple, slot, buffer);
#endif
return result;
if (TTS_EMPTY(slot))
ExecStoreVirtualTuple(slot);

return TM_Ok;
}


Expand Down

0 comments on commit a6e6d9e

Please sign in to comment.