Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions ydb/core/tablet_flat/flat_mem_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,18 +243,20 @@ namespace NTable {
NTable::ITransactionObserverSimplePtr transactionObserver,
const NTable::ITransactionSet& decidedTransactions) noexcept
{
// Temporary: we don't cache erases when there are uncompacted deltas
Y_UNUSED(decidedTransactions);

Y_DEBUG_ABORT_UNLESS(IsValid(), "Attempt to access an invalid row");

auto* chain = GetCurrentVersion();
Y_DEBUG_ABORT_UNLESS(chain, "Unexpected empty chain");

// Skip uncommitted deltas
while (chain->RowVersion.Step == Max<ui64>() && !committedTransactions.Find(chain->RowVersion.TxId)) {
// We cannot cache when there are uncompacted deltas
stats.UncertainErase = true;

transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId);
if (chain->Rop != ERowOp::Erase && !decidedTransactions.Contains(chain->RowVersion.TxId)) {
// This change may commit and change the iteration result
stats.UncertainErase = true;
}
if (!(chain = chain->Next)) {
CurrentVersion = nullptr;
return false;
Expand All @@ -268,24 +270,23 @@ namespace NTable {
return true;
}
transactionObserver.OnSkipCommitted(chain->RowVersion);
if (chain->Rop != ERowOp::Erase) {
// We are skipping non-erase op, so any erase below cannot be trusted
stats.UncertainErase = true;
}
} else {
// We cannot cache when there are uncompacted deltas
stats.UncertainErase = true;

auto* commitVersion = committedTransactions.Find(chain->RowVersion.TxId);
Y_ABORT_UNLESS(commitVersion);
if (*commitVersion <= rowVersion) {
if (!decidedTransactions.Contains(chain->RowVersion.TxId)) {
// This change may rollback and change the iteration result
stats.UncertainErase = true;
}
return true;
}
transactionObserver.OnSkipCommitted(*commitVersion, chain->RowVersion.TxId);
}

stats.InvisibleRowSkips++;
if (chain->Rop != ERowOp::Erase) {
// We are skipping non-erase op, so any erase below cannot be trusted
stats.UncertainErase = true;
}

while ((chain = chain->Next)) {
if (chain->RowVersion.Step != Max<ui64>()) {
Expand All @@ -296,13 +297,16 @@ namespace NTable {

transactionObserver.OnSkipCommitted(chain->RowVersion);
stats.InvisibleRowSkips++;
if (chain->Rop != ERowOp::Erase) {
// We are skipping non-erase op, so any erase below cannot be trusted
stats.UncertainErase = true;
}
} else {
// We cannot cache when there are uncompacted deltas
stats.UncertainErase = true;

auto* commitVersion = committedTransactions.Find(chain->RowVersion.TxId);
if (commitVersion && *commitVersion <= rowVersion) {
if (!decidedTransactions.Contains(chain->RowVersion.TxId)) {
// This change may rollback and change the iteration result
stats.UncertainErase = true;
}
CurrentVersion = chain;
return true;
}
Expand All @@ -312,17 +316,8 @@ namespace NTable {
stats.InvisibleRowSkips++;
} else {
transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId);
if (decidedTransactions.Contains(chain->RowVersion.TxId)) {
// This is a decided uncommitted change and will never be committed
// Make sure we don't mark possible erase below as uncertain
continue;
}
}
}
if (chain->Rop != ERowOp::Erase) {
// We are skipping non-erase op, so any erase below cannot be trusted
stats.UncertainErase = true;
}
}

CurrentVersion = nullptr;
Expand Down
18 changes: 6 additions & 12 deletions ydb/core/tablet_flat/flat_part_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,9 @@ namespace NTable {
NTable::ITransactionObserverSimplePtr transactionObserver,
const NTable::ITransactionSet& decidedTransactions) noexcept
{
// Temporary: we don't cache erases when there are uncompacted deltas
Y_UNUSED(decidedTransactions);

Y_DEBUG_ABORT_UNLESS(Main.IsValid(), "Attempt to use an invalid iterator");

// We cannot use min/max hints when part has uncommitted deltas
Expand Down Expand Up @@ -962,31 +965,22 @@ namespace NTable {
const auto* data = Main.GetRecord()->GetAltRecord(SkipMainDeltas);

while (data->IsDelta()) {
// We cannot cache when there are uncompacted deltas
stats.UncertainErase = true;

ui64 txId = data->GetDeltaTxId(info);
const auto* commitVersion = committedTransactions.Find(txId);
if (commitVersion && *commitVersion <= rowVersion) {
// Already committed and correct version
if (!decidedTransactions.Contains(txId)) {
// This change may rollback and change the iteration result
stats.UncertainErase = true;
}
return EReady::Data;
}
if (commitVersion) {
// Skipping a newer committed delta
transactionObserver.OnSkipCommitted(*commitVersion, txId);
stats.InvisibleRowSkips++;
if (data->GetRop() != ERowOp::Erase) {
// Skipping non-erase delta, so any erase below cannot be trusted
stats.UncertainErase = true;
}
} else {
// Skipping an uncommitted delta
transactionObserver.OnSkipUncommitted(txId);
if (data->GetRop() != ERowOp::Erase && !decidedTransactions.Contains(txId)) {
// This change may commit and change the iteration result
stats.UncertainErase = true;
}
}
data = Main.GetRecord()->GetAltRecord(++SkipMainDeltas);
if (!data) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMe
auto& memTable = MemTable();
bool hadTxRef = memTable.GetTxIdStats().contains(txId);

if (ErasedKeysCache && rop != ERowOp::Erase) {
if (ErasedKeysCache) {
const TCelled cells(key, *Scheme->Keys, true);
auto res = ErasedKeysCache->FindKey(cells);
if (res.second) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/ut/ut_db_iface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ Y_UNIT_TEST_SUITE(DBase) {
.Next().Is(*me.SchemedCookRow(table).Col(18_u64, 18_u64))
.Next().Is(EReady::Gone);

UNIT_ASSERT_VALUES_EQUAL(dumpCache(), "TKeyRangeCache{ [{1}, {16}] }");
UNIT_ASSERT_VALUES_EQUAL(dumpCache(), "TKeyRangeCache{ [{1}, {9}), [{10}, {16}] }");
}

Y_UNIT_TEST(EraseCacheWithUncommittedChanges) {
Expand Down
176 changes: 176 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3282,6 +3282,182 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 11 } items { uint32_value: 11 } }");
}

// Regression test for KIKIMR-22506
Y_UNIT_TEST(NotCachingAbortingDeletes) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

Cerr << "========= Creating table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (100));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES
(1, 1), (2, 2), (3, 3), (4, 4), (5, 5),
(6, 6), (7, 7), (8, 8), (9, 9), (10, 10),
(11, 11), (12, 12), (13, 13), (14, 14), (15, 15),
(16, 16), (17, 17), (18, 18), (19, 19), (20, 20);
)"),
"<empty>");

// We need to delete the first key (will be the trigger)
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
DELETE FROM `/Root/table` WHERE key = 1;
)"),
"<empty>");

// Start transaction that deletes many rows and reads the result
// It is not committed yet, so should not be cached
Cerr << "========= Deleting rows (uncommitted) =========" << Endl;
TString sessionId, txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
DELETE FROM `/Root/table` WHERE key < 20;
SELECT key, value FROM `/Root/table` ORDER BY key;
)"),
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");

// Make sure the lock is broken at the second shard
Cerr << "========= Upserting key 200 (breaking lock) =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (200, 200);
)"),
"<empty>");

Cerr << "========= Validating table contents =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table` ORDER BY key;
)"),
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 4 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 5 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 6 } }, "
"{ items { uint32_value: 7 } items { uint32_value: 7 } }, "
"{ items { uint32_value: 8 } items { uint32_value: 8 } }, "
"{ items { uint32_value: 9 } items { uint32_value: 9 } }, "
"{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 11 } items { uint32_value: 11 } }, "
"{ items { uint32_value: 12 } items { uint32_value: 12 } }, "
"{ items { uint32_value: 13 } items { uint32_value: 13 } }, "
"{ items { uint32_value: 14 } items { uint32_value: 14 } }, "
"{ items { uint32_value: 15 } items { uint32_value: 15 } }, "
"{ items { uint32_value: 16 } items { uint32_value: 16 } }, "
"{ items { uint32_value: 17 } items { uint32_value: 17 } }, "
"{ items { uint32_value: 18 } items { uint32_value: 18 } }, "
"{ items { uint32_value: 19 } items { uint32_value: 19 } }, "
"{ items { uint32_value: 20 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 200 } items { uint32_value: 200 } }");

TBlockEvents<TEvDataShard::TEvProposeTransactionResult> blockedResults(runtime,
[&](const auto& ev) {
auto* msg = ev->Get();
if (msg->Record.GetStatus() == NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED) {
return false;
}
return true;
});

size_t otherReadSets = 0;
TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime,
[&otherReadSets, actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) {
if (ev->GetRecipientRewrite() == actor) {
return true;
}
++otherReadSets;
return false;
});

Cerr << "========= Starting commit =========" << Endl;
auto commitFuture = KqpSimpleSendCommit(runtime, sessionId, txId, "SELECT 1");

runtime.WaitFor("blocked readsets", [&]{ return blockedReadSets.size() >= 1 && otherReadSets >= 1; });
UNIT_ASSERT_VALUES_EQUAL(blockedReadSets.size(), 1u);
UNIT_ASSERT_VALUES_EQUAL(otherReadSets, 1u);
runtime.SimulateSleep(TDuration::MilliSeconds(1));

TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime,
[&](const auto& ev) {
auto* msg = ev->Get();
if (msg->Id.TabletID() == shards.at(0)) {
Cerr << "... blocking put " << msg->Id << Endl;
return true;
}
return false;
});

// Unblock readsets, but block commits, so abort can't commit
blockedReadSets.Stop().Unblock();
runtime.WaitFor("blocked commit", [&]{ return blockedCommits.size() >= 1; });

Cerr << "========= Starting a concurrent read =========" << Endl;
auto readFuture = KqpSimpleSend(runtime, R"(
SELECT key, value FROM `/Root/table` WHERE key <= 30 ORDER BY key;
)");
runtime.SimulateSleep(TDuration::Seconds(1));

Cerr << "========= Unblocking commits and checking results =========" << Endl;
blockedCommits.Stop().Unblock();

runtime.WaitFor("both results", [&]{ return blockedResults.size() >= 2; });
blockedResults.Stop().Unblock();

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(commitFuture))),
"ERROR: ABORTED");

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(readFuture))),
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 4 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 5 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 6 } }, "
"{ items { uint32_value: 7 } items { uint32_value: 7 } }, "
"{ items { uint32_value: 8 } items { uint32_value: 8 } }, "
"{ items { uint32_value: 9 } items { uint32_value: 9 } }, "
"{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 11 } items { uint32_value: 11 } }, "
"{ items { uint32_value: 12 } items { uint32_value: 12 } }, "
"{ items { uint32_value: 13 } items { uint32_value: 13 } }, "
"{ items { uint32_value: 14 } items { uint32_value: 14 } }, "
"{ items { uint32_value: 15 } items { uint32_value: 15 } }, "
"{ items { uint32_value: 16 } items { uint32_value: 16 } }, "
"{ items { uint32_value: 17 } items { uint32_value: 17 } }, "
"{ items { uint32_value: 18 } items { uint32_value: 18 } }, "
"{ items { uint32_value: 19 } items { uint32_value: 19 } }, "
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
Loading