Skip to content

Commit a1e2575

Browse files
authored
24-3: Preliminary fix for erase cache consistency problems (#13944)
1 parent cbc5af1 commit a1e2575

File tree

5 files changed

+204
-39
lines changed

5 files changed

+204
-39
lines changed

ydb/core/tablet_flat/flat_mem_iter.h

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -243,18 +243,20 @@ namespace NTable {
243243
NTable::ITransactionObserverSimplePtr transactionObserver,
244244
const NTable::ITransactionSet& decidedTransactions) noexcept
245245
{
246+
// Temporary: we don't cache erases when there are uncompacted deltas
247+
Y_UNUSED(decidedTransactions);
248+
246249
Y_DEBUG_ABORT_UNLESS(IsValid(), "Attempt to access an invalid row");
247250

248251
auto* chain = GetCurrentVersion();
249252
Y_DEBUG_ABORT_UNLESS(chain, "Unexpected empty chain");
250253

251254
// Skip uncommitted deltas
252255
while (chain->RowVersion.Step == Max<ui64>() && !committedTransactions.Find(chain->RowVersion.TxId)) {
256+
// We cannot cache when there are uncompacted deltas
257+
stats.UncertainErase = true;
258+
253259
transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId);
254-
if (chain->Rop != ERowOp::Erase && !decidedTransactions.Contains(chain->RowVersion.TxId)) {
255-
// This change may commit and change the iteration result
256-
stats.UncertainErase = true;
257-
}
258260
if (!(chain = chain->Next)) {
259261
CurrentVersion = nullptr;
260262
return false;
@@ -268,24 +270,23 @@ namespace NTable {
268270
return true;
269271
}
270272
transactionObserver.OnSkipCommitted(chain->RowVersion);
273+
if (chain->Rop != ERowOp::Erase) {
274+
// We are skipping non-erase op, so any erase below cannot be trusted
275+
stats.UncertainErase = true;
276+
}
271277
} else {
278+
// We cannot cache when there are uncompacted deltas
279+
stats.UncertainErase = true;
280+
272281
auto* commitVersion = committedTransactions.Find(chain->RowVersion.TxId);
273282
Y_ABORT_UNLESS(commitVersion);
274283
if (*commitVersion <= rowVersion) {
275-
if (!decidedTransactions.Contains(chain->RowVersion.TxId)) {
276-
// This change may rollback and change the iteration result
277-
stats.UncertainErase = true;
278-
}
279284
return true;
280285
}
281286
transactionObserver.OnSkipCommitted(*commitVersion, chain->RowVersion.TxId);
282287
}
283288

284289
stats.InvisibleRowSkips++;
285-
if (chain->Rop != ERowOp::Erase) {
286-
// We are skipping non-erase op, so any erase below cannot be trusted
287-
stats.UncertainErase = true;
288-
}
289290

290291
while ((chain = chain->Next)) {
291292
if (chain->RowVersion.Step != Max<ui64>()) {
@@ -296,13 +297,16 @@ namespace NTable {
296297

297298
transactionObserver.OnSkipCommitted(chain->RowVersion);
298299
stats.InvisibleRowSkips++;
300+
if (chain->Rop != ERowOp::Erase) {
301+
// We are skipping non-erase op, so any erase below cannot be trusted
302+
stats.UncertainErase = true;
303+
}
299304
} else {
305+
// We cannot cache when there are uncompacted deltas
306+
stats.UncertainErase = true;
307+
300308
auto* commitVersion = committedTransactions.Find(chain->RowVersion.TxId);
301309
if (commitVersion && *commitVersion <= rowVersion) {
302-
if (!decidedTransactions.Contains(chain->RowVersion.TxId)) {
303-
// This change may rollback and change the iteration result
304-
stats.UncertainErase = true;
305-
}
306310
CurrentVersion = chain;
307311
return true;
308312
}
@@ -312,17 +316,8 @@ namespace NTable {
312316
stats.InvisibleRowSkips++;
313317
} else {
314318
transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId);
315-
if (decidedTransactions.Contains(chain->RowVersion.TxId)) {
316-
// This is a decided uncommitted change and will never be committed
317-
// Make sure we don't mark possible erase below as uncertain
318-
continue;
319-
}
320319
}
321320
}
322-
if (chain->Rop != ERowOp::Erase) {
323-
// We are skipping non-erase op, so any erase below cannot be trusted
324-
stats.UncertainErase = true;
325-
}
326321
}
327322

328323
CurrentVersion = nullptr;

ydb/core/tablet_flat/flat_part_iter.h

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,9 @@ namespace NTable {
926926
NTable::ITransactionObserverSimplePtr transactionObserver,
927927
const NTable::ITransactionSet& decidedTransactions) noexcept
928928
{
929+
// Temporary: we don't cache erases when there are uncompacted deltas
930+
Y_UNUSED(decidedTransactions);
931+
929932
Y_DEBUG_ABORT_UNLESS(Main.IsValid(), "Attempt to use an invalid iterator");
930933

931934
// We cannot use min/max hints when part has uncommitted deltas
@@ -962,31 +965,22 @@ namespace NTable {
962965
const auto* data = Main.GetRecord()->GetAltRecord(SkipMainDeltas);
963966

964967
while (data->IsDelta()) {
968+
// We cannot cache when there are uncompacted deltas
969+
stats.UncertainErase = true;
970+
965971
ui64 txId = data->GetDeltaTxId(info);
966972
const auto* commitVersion = committedTransactions.Find(txId);
967973
if (commitVersion && *commitVersion <= rowVersion) {
968974
// Already committed and correct version
969-
if (!decidedTransactions.Contains(txId)) {
970-
// This change may rollback and change the iteration result
971-
stats.UncertainErase = true;
972-
}
973975
return EReady::Data;
974976
}
975977
if (commitVersion) {
976978
// Skipping a newer committed delta
977979
transactionObserver.OnSkipCommitted(*commitVersion, txId);
978980
stats.InvisibleRowSkips++;
979-
if (data->GetRop() != ERowOp::Erase) {
980-
// Skipping non-erase delta, so any erase below cannot be trusted
981-
stats.UncertainErase = true;
982-
}
983981
} else {
984982
// Skipping an uncommitted delta
985983
transactionObserver.OnSkipUncommitted(txId);
986-
if (data->GetRop() != ERowOp::Erase && !decidedTransactions.Contains(txId)) {
987-
// This change may commit and change the iteration result
988-
stats.UncertainErase = true;
989-
}
990984
}
991985
data = Main.GetRecord()->GetAltRecord(++SkipMainDeltas);
992986
if (!data) {

ydb/core/tablet_flat/flat_table.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -908,7 +908,7 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMe
908908
auto& memTable = MemTable();
909909
bool hadTxRef = memTable.GetTxIdStats().contains(txId);
910910

911-
if (ErasedKeysCache && rop != ERowOp::Erase) {
911+
if (ErasedKeysCache) {
912912
const TCelled cells(key, *Scheme->Keys, true);
913913
auto res = ErasedKeysCache->FindKey(cells);
914914
if (res.second) {

ydb/core/tablet_flat/ut/ut_db_iface.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ Y_UNIT_TEST_SUITE(DBase) {
929929
.Next().Is(*me.SchemedCookRow(table).Col(18_u64, 18_u64))
930930
.Next().Is(EReady::Gone);
931931

932-
UNIT_ASSERT_VALUES_EQUAL(dumpCache(), "TKeyRangeCache{ [{1}, {16}] }");
932+
UNIT_ASSERT_VALUES_EQUAL(dumpCache(), "TKeyRangeCache{ [{1}, {9}), [{10}, {16}] }");
933933
}
934934

935935
Y_UNIT_TEST(EraseCacheWithUncommittedChanges) {

ydb/core/tx/datashard/datashard_ut_volatile.cpp

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3282,6 +3282,182 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
32823282
"{ items { uint32_value: 11 } items { uint32_value: 11 } }");
32833283
}
32843284

3285+
// Regression test for KIKIMR-22506
3286+
Y_UNIT_TEST(NotCachingAbortingDeletes) {
3287+
TPortManager pm;
3288+
TServerSettings serverSettings(pm.GetPort(2134));
3289+
serverSettings.SetDomainName("Root")
3290+
.SetUseRealThreads(false)
3291+
.SetEnableDataShardVolatileTransactions(true);
3292+
3293+
Tests::TServer::TPtr server = new TServer(serverSettings);
3294+
auto &runtime = *server->GetRuntime();
3295+
auto sender = runtime.AllocateEdgeActor();
3296+
3297+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3298+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3299+
3300+
InitRoot(server, sender);
3301+
3302+
TDisableDataShardLogBatching disableDataShardLogBatching;
3303+
3304+
Cerr << "========= Creating table =========" << Endl;
3305+
UNIT_ASSERT_VALUES_EQUAL(
3306+
KqpSchemeExec(runtime, R"(
3307+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3308+
WITH (PARTITION_AT_KEYS = (100));
3309+
)"),
3310+
"SUCCESS");
3311+
3312+
const auto shards = GetTableShards(server, sender, "/Root/table");
3313+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
3314+
3315+
// We need to fill table with some data
3316+
Cerr << "========= Upserting initial values =========" << Endl;
3317+
UNIT_ASSERT_VALUES_EQUAL(
3318+
KqpSimpleExec(runtime, R"(
3319+
UPSERT INTO `/Root/table` (key, value)
3320+
VALUES
3321+
(1, 1), (2, 2), (3, 3), (4, 4), (5, 5),
3322+
(6, 6), (7, 7), (8, 8), (9, 9), (10, 10),
3323+
(11, 11), (12, 12), (13, 13), (14, 14), (15, 15),
3324+
(16, 16), (17, 17), (18, 18), (19, 19), (20, 20);
3325+
)"),
3326+
"<empty>");
3327+
3328+
// We need to delete the first key (will be the trigger)
3329+
UNIT_ASSERT_VALUES_EQUAL(
3330+
KqpSimpleExec(runtime, R"(
3331+
DELETE FROM `/Root/table` WHERE key = 1;
3332+
)"),
3333+
"<empty>");
3334+
3335+
// Start transaction that deletes many rows and reads the result
3336+
// It is not committed yet, so should not be cached
3337+
Cerr << "========= Deleting rows (uncommitted) =========" << Endl;
3338+
TString sessionId, txId;
3339+
UNIT_ASSERT_VALUES_EQUAL(
3340+
KqpSimpleBegin(runtime, sessionId, txId, R"(
3341+
DELETE FROM `/Root/table` WHERE key < 20;
3342+
SELECT key, value FROM `/Root/table` ORDER BY key;
3343+
)"),
3344+
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");
3345+
3346+
// Make sure the lock is broken at the second shard
3347+
Cerr << "========= Upserting key 200 (breaking lock) =========" << Endl;
3348+
UNIT_ASSERT_VALUES_EQUAL(
3349+
KqpSimpleExec(runtime, R"(
3350+
UPSERT INTO `/Root/table` (key, value)
3351+
VALUES (200, 200);
3352+
)"),
3353+
"<empty>");
3354+
3355+
Cerr << "========= Validating table contents =========" << Endl;
3356+
UNIT_ASSERT_VALUES_EQUAL(
3357+
KqpSimpleExec(runtime, R"(
3358+
SELECT key, value FROM `/Root/table` ORDER BY key;
3359+
)"),
3360+
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
3361+
"{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
3362+
"{ items { uint32_value: 4 } items { uint32_value: 4 } }, "
3363+
"{ items { uint32_value: 5 } items { uint32_value: 5 } }, "
3364+
"{ items { uint32_value: 6 } items { uint32_value: 6 } }, "
3365+
"{ items { uint32_value: 7 } items { uint32_value: 7 } }, "
3366+
"{ items { uint32_value: 8 } items { uint32_value: 8 } }, "
3367+
"{ items { uint32_value: 9 } items { uint32_value: 9 } }, "
3368+
"{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
3369+
"{ items { uint32_value: 11 } items { uint32_value: 11 } }, "
3370+
"{ items { uint32_value: 12 } items { uint32_value: 12 } }, "
3371+
"{ items { uint32_value: 13 } items { uint32_value: 13 } }, "
3372+
"{ items { uint32_value: 14 } items { uint32_value: 14 } }, "
3373+
"{ items { uint32_value: 15 } items { uint32_value: 15 } }, "
3374+
"{ items { uint32_value: 16 } items { uint32_value: 16 } }, "
3375+
"{ items { uint32_value: 17 } items { uint32_value: 17 } }, "
3376+
"{ items { uint32_value: 18 } items { uint32_value: 18 } }, "
3377+
"{ items { uint32_value: 19 } items { uint32_value: 19 } }, "
3378+
"{ items { uint32_value: 20 } items { uint32_value: 20 } }, "
3379+
"{ items { uint32_value: 200 } items { uint32_value: 200 } }");
3380+
3381+
TBlockEvents<TEvDataShard::TEvProposeTransactionResult> blockedResults(runtime,
3382+
[&](const auto& ev) {
3383+
auto* msg = ev->Get();
3384+
if (msg->Record.GetStatus() == NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED) {
3385+
return false;
3386+
}
3387+
return true;
3388+
});
3389+
3390+
size_t otherReadSets = 0;
3391+
TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime,
3392+
[&otherReadSets, actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) {
3393+
if (ev->GetRecipientRewrite() == actor) {
3394+
return true;
3395+
}
3396+
++otherReadSets;
3397+
return false;
3398+
});
3399+
3400+
Cerr << "========= Starting commit =========" << Endl;
3401+
auto commitFuture = KqpSimpleSendCommit(runtime, sessionId, txId, "SELECT 1");
3402+
3403+
runtime.WaitFor("blocked readsets", [&]{ return blockedReadSets.size() >= 1 && otherReadSets >= 1; });
3404+
UNIT_ASSERT_VALUES_EQUAL(blockedReadSets.size(), 1u);
3405+
UNIT_ASSERT_VALUES_EQUAL(otherReadSets, 1u);
3406+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3407+
3408+
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime,
3409+
[&](const auto& ev) {
3410+
auto* msg = ev->Get();
3411+
if (msg->Id.TabletID() == shards.at(0)) {
3412+
Cerr << "... blocking put " << msg->Id << Endl;
3413+
return true;
3414+
}
3415+
return false;
3416+
});
3417+
3418+
// Unblock readsets, but block commits, so abort can't commit
3419+
blockedReadSets.Stop().Unblock();
3420+
runtime.WaitFor("blocked commit", [&]{ return blockedCommits.size() >= 1; });
3421+
3422+
Cerr << "========= Starting a concurrent read =========" << Endl;
3423+
auto readFuture = KqpSimpleSend(runtime, R"(
3424+
SELECT key, value FROM `/Root/table` WHERE key <= 30 ORDER BY key;
3425+
)");
3426+
runtime.SimulateSleep(TDuration::Seconds(1));
3427+
3428+
Cerr << "========= Unblocking commits and checking results =========" << Endl;
3429+
blockedCommits.Stop().Unblock();
3430+
3431+
runtime.WaitFor("both results", [&]{ return blockedResults.size() >= 2; });
3432+
blockedResults.Stop().Unblock();
3433+
3434+
UNIT_ASSERT_VALUES_EQUAL(
3435+
FormatResult(runtime.WaitFuture(std::move(commitFuture))),
3436+
"ERROR: ABORTED");
3437+
3438+
UNIT_ASSERT_VALUES_EQUAL(
3439+
FormatResult(runtime.WaitFuture(std::move(readFuture))),
3440+
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
3441+
"{ items { uint32_value: 3 } items { uint32_value: 3 } }, "
3442+
"{ items { uint32_value: 4 } items { uint32_value: 4 } }, "
3443+
"{ items { uint32_value: 5 } items { uint32_value: 5 } }, "
3444+
"{ items { uint32_value: 6 } items { uint32_value: 6 } }, "
3445+
"{ items { uint32_value: 7 } items { uint32_value: 7 } }, "
3446+
"{ items { uint32_value: 8 } items { uint32_value: 8 } }, "
3447+
"{ items { uint32_value: 9 } items { uint32_value: 9 } }, "
3448+
"{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
3449+
"{ items { uint32_value: 11 } items { uint32_value: 11 } }, "
3450+
"{ items { uint32_value: 12 } items { uint32_value: 12 } }, "
3451+
"{ items { uint32_value: 13 } items { uint32_value: 13 } }, "
3452+
"{ items { uint32_value: 14 } items { uint32_value: 14 } }, "
3453+
"{ items { uint32_value: 15 } items { uint32_value: 15 } }, "
3454+
"{ items { uint32_value: 16 } items { uint32_value: 16 } }, "
3455+
"{ items { uint32_value: 17 } items { uint32_value: 17 } }, "
3456+
"{ items { uint32_value: 18 } items { uint32_value: 18 } }, "
3457+
"{ items { uint32_value: 19 } items { uint32_value: 19 } }, "
3458+
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");
3459+
}
3460+
32853461
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
32863462

32873463
} // namespace NKikimr

0 commit comments

Comments
 (0)