Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId
}
LongTxWrites.erase(writeId);
return true;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_prepared_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
}
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_removed_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
}
return false;
}
Expand All @@ -283,9 +287,10 @@ void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTabl
}
}
if (failedAborts.size()) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
}
for (auto& writeId : failedAborts) {
InsertTable->MarkAsNotAbortable(writeId);
writesToAbort.erase(writeId);
}
if (!writesToAbort.empty()) {
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/engines/insert_table/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ struct TInsertedData {

private:
YDB_READONLY(ui64, SchemaVersion, 0);
YDB_READONLY_FLAG(NotAbortable, false);

public:
void MarkAsNotAbortable() {
NotAbortableFlag = true;
}

std::optional<TString> GetBlobData() const {
if (BlobDataGuard) {
return BlobDataGuard->GetData();
Expand Down Expand Up @@ -104,7 +109,7 @@ struct TInsertedData {
/// One of them wins and becomes committed. Original DedupId would be lost then.
/// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}.
/// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId}
void Commit(ui64 planStep, ui64 txId) {
void Commit(const ui64 planStep, const ui64 txId) {
DedupId = ToString(PlanStep) + ":" + ToString((ui64)WriteTxId);
PlanStep = planStep;
WriteTxId = txId;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class TInsertTable: public TInsertTableAccessor {
TInsertionSummary::TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId,
const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists);
void Abort(IDbWrapper& dbTable, const THashSet<TWriteId>& writeIds);
void MarkAsNotAbortable(const TWriteId writeId) {
Summary.MarkAsNotAbortable(writeId);
}
THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetExpiredInsertions(const
TInstant newMin = TInstant::Max();
for (auto& [writeId, data] : Inserted) {
const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
if (data.IsNotAbortable()) {
continue;
}
if (dataInsertTs < timeBorder && toAbort.size() < limit) {
toAbort.insert(writeId);
} else {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ class TInsertionSummary {
void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
static TAtomicCounter CriticalInserted;
public:
void MarkAsNotAbortable(const TWriteId writeId) {
auto it = Inserted.find(writeId);
if (it == Inserted.end()) {
return;
}
it->second.MarkAsNotAbortable();
}

THashSet<TWriteId> GetInsertedByPathId(const ui64 pathId) const;

THashSet<TWriteId> GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const;
Expand Down