@@ -3282,6 +3282,182 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
32823282 " { items { uint32_value: 11 } items { uint32_value: 11 } }" );
32833283 }
32843284
3285+ // Regression test for KIKIMR-22506
3286+ Y_UNIT_TEST (NotCachingAbortingDeletes) {
3287+ TPortManager pm;
3288+ TServerSettings serverSettings (pm.GetPort (2134 ));
3289+ serverSettings.SetDomainName (" Root" )
3290+ .SetUseRealThreads (false )
3291+ .SetEnableDataShardVolatileTransactions (true );
3292+
3293+ Tests::TServer::TPtr server = new TServer (serverSettings);
3294+ auto &runtime = *server->GetRuntime ();
3295+ auto sender = runtime.AllocateEdgeActor ();
3296+
3297+ runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3298+ runtime.SetLogPriority (NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3299+
3300+ InitRoot (server, sender);
3301+
3302+ TDisableDataShardLogBatching disableDataShardLogBatching;
3303+
3304+ Cerr << " ========= Creating table =========" << Endl;
3305+ UNIT_ASSERT_VALUES_EQUAL (
3306+ KqpSchemeExec (runtime, R"(
3307+ CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3308+ WITH (PARTITION_AT_KEYS = (100));
3309+ )" ),
3310+ " SUCCESS" );
3311+
3312+ const auto shards = GetTableShards (server, sender, " /Root/table" );
3313+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 2u );
3314+
3315+ // We need to fill table with some data
3316+ Cerr << " ========= Upserting initial values =========" << Endl;
3317+ UNIT_ASSERT_VALUES_EQUAL (
3318+ KqpSimpleExec (runtime, R"(
3319+ UPSERT INTO `/Root/table` (key, value)
3320+ VALUES
3321+ (1, 1), (2, 2), (3, 3), (4, 4), (5, 5),
3322+ (6, 6), (7, 7), (8, 8), (9, 9), (10, 10),
3323+ (11, 11), (12, 12), (13, 13), (14, 14), (15, 15),
3324+ (16, 16), (17, 17), (18, 18), (19, 19), (20, 20);
3325+ )" ),
3326+ " <empty>" );
3327+
3328+ // We need to delete the first key (will be the trigger)
3329+ UNIT_ASSERT_VALUES_EQUAL (
3330+ KqpSimpleExec (runtime, R"(
3331+ DELETE FROM `/Root/table` WHERE key = 1;
3332+ )" ),
3333+ " <empty>" );
3334+
3335+ // Start transaction that deletes many rows and reads the result
3336+ // It is not committed yet, so should not be cached
3337+ Cerr << " ========= Deleting rows (uncommitted) =========" << Endl;
3338+ TString sessionId, txId;
3339+ UNIT_ASSERT_VALUES_EQUAL (
3340+ KqpSimpleBegin (runtime, sessionId, txId, R"(
3341+ DELETE FROM `/Root/table` WHERE key < 20;
3342+ SELECT key, value FROM `/Root/table` ORDER BY key;
3343+ )" ),
3344+ " { items { uint32_value: 20 } items { uint32_value: 20 } }" );
3345+
3346+ // Make sure the lock is broken at the second shard
3347+ Cerr << " ========= Upserting key 200 (breaking lock) =========" << Endl;
3348+ UNIT_ASSERT_VALUES_EQUAL (
3349+ KqpSimpleExec (runtime, R"(
3350+ UPSERT INTO `/Root/table` (key, value)
3351+ VALUES (200, 200);
3352+ )" ),
3353+ " <empty>" );
3354+
3355+ Cerr << " ========= Validating table contents =========" << Endl;
3356+ UNIT_ASSERT_VALUES_EQUAL (
3357+ KqpSimpleExec (runtime, R"(
3358+ SELECT key, value FROM `/Root/table` ORDER BY key;
3359+ )" ),
3360+ " { items { uint32_value: 2 } items { uint32_value: 2 } }, "
3361+ " { items { uint32_value: 3 } items { uint32_value: 3 } }, "
3362+ " { items { uint32_value: 4 } items { uint32_value: 4 } }, "
3363+ " { items { uint32_value: 5 } items { uint32_value: 5 } }, "
3364+ " { items { uint32_value: 6 } items { uint32_value: 6 } }, "
3365+ " { items { uint32_value: 7 } items { uint32_value: 7 } }, "
3366+ " { items { uint32_value: 8 } items { uint32_value: 8 } }, "
3367+ " { items { uint32_value: 9 } items { uint32_value: 9 } }, "
3368+ " { items { uint32_value: 10 } items { uint32_value: 10 } }, "
3369+ " { items { uint32_value: 11 } items { uint32_value: 11 } }, "
3370+ " { items { uint32_value: 12 } items { uint32_value: 12 } }, "
3371+ " { items { uint32_value: 13 } items { uint32_value: 13 } }, "
3372+ " { items { uint32_value: 14 } items { uint32_value: 14 } }, "
3373+ " { items { uint32_value: 15 } items { uint32_value: 15 } }, "
3374+ " { items { uint32_value: 16 } items { uint32_value: 16 } }, "
3375+ " { items { uint32_value: 17 } items { uint32_value: 17 } }, "
3376+ " { items { uint32_value: 18 } items { uint32_value: 18 } }, "
3377+ " { items { uint32_value: 19 } items { uint32_value: 19 } }, "
3378+ " { items { uint32_value: 20 } items { uint32_value: 20 } }, "
3379+ " { items { uint32_value: 200 } items { uint32_value: 200 } }" );
3380+
3381+ TBlockEvents<TEvDataShard::TEvProposeTransactionResult> blockedResults (runtime,
3382+ [&](const auto & ev) {
3383+ auto * msg = ev->Get ();
3384+ if (msg->Record .GetStatus () == NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED) {
3385+ return false ;
3386+ }
3387+ return true ;
3388+ });
3389+
3390+ size_t otherReadSets = 0 ;
3391+ TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets (runtime,
3392+ [&otherReadSets, actor = ResolveTablet (runtime, shards.at (0 ))](const auto & ev) {
3393+ if (ev->GetRecipientRewrite () == actor) {
3394+ return true ;
3395+ }
3396+ ++otherReadSets;
3397+ return false ;
3398+ });
3399+
3400+ Cerr << " ========= Starting commit =========" << Endl;
3401+ auto commitFuture = KqpSimpleSendCommit (runtime, sessionId, txId, " SELECT 1" );
3402+
3403+ runtime.WaitFor (" blocked readsets" , [&]{ return blockedReadSets.size () >= 1 && otherReadSets >= 1 ; });
3404+ UNIT_ASSERT_VALUES_EQUAL (blockedReadSets.size (), 1u );
3405+ UNIT_ASSERT_VALUES_EQUAL (otherReadSets, 1u );
3406+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3407+
3408+ TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits (runtime,
3409+ [&](const auto & ev) {
3410+ auto * msg = ev->Get ();
3411+ if (msg->Id .TabletID () == shards.at (0 )) {
3412+ Cerr << " ... blocking put " << msg->Id << Endl;
3413+ return true ;
3414+ }
3415+ return false ;
3416+ });
3417+
3418+ // Unblock readsets, but block commits, so abort can't commit
3419+ blockedReadSets.Stop ().Unblock ();
3420+ runtime.WaitFor (" blocked commit" , [&]{ return blockedCommits.size () >= 1 ; });
3421+
3422+ Cerr << " ========= Starting a concurrent read =========" << Endl;
3423+ auto readFuture = KqpSimpleSend (runtime, R"(
3424+ SELECT key, value FROM `/Root/table` WHERE key <= 30 ORDER BY key;
3425+ )" );
3426+ runtime.SimulateSleep (TDuration::Seconds (1 ));
3427+
3428+ Cerr << " ========= Unblocking commits and checking results =========" << Endl;
3429+ blockedCommits.Stop ().Unblock ();
3430+
3431+ runtime.WaitFor (" both results" , [&]{ return blockedResults.size () >= 2 ; });
3432+ blockedResults.Stop ().Unblock ();
3433+
3434+ UNIT_ASSERT_VALUES_EQUAL (
3435+ FormatResult (runtime.WaitFuture (std::move (commitFuture))),
3436+ " ERROR: ABORTED" );
3437+
3438+ UNIT_ASSERT_VALUES_EQUAL (
3439+ FormatResult (runtime.WaitFuture (std::move (readFuture))),
3440+ " { items { uint32_value: 2 } items { uint32_value: 2 } }, "
3441+ " { items { uint32_value: 3 } items { uint32_value: 3 } }, "
3442+ " { items { uint32_value: 4 } items { uint32_value: 4 } }, "
3443+ " { items { uint32_value: 5 } items { uint32_value: 5 } }, "
3444+ " { items { uint32_value: 6 } items { uint32_value: 6 } }, "
3445+ " { items { uint32_value: 7 } items { uint32_value: 7 } }, "
3446+ " { items { uint32_value: 8 } items { uint32_value: 8 } }, "
3447+ " { items { uint32_value: 9 } items { uint32_value: 9 } }, "
3448+ " { items { uint32_value: 10 } items { uint32_value: 10 } }, "
3449+ " { items { uint32_value: 11 } items { uint32_value: 11 } }, "
3450+ " { items { uint32_value: 12 } items { uint32_value: 12 } }, "
3451+ " { items { uint32_value: 13 } items { uint32_value: 13 } }, "
3452+ " { items { uint32_value: 14 } items { uint32_value: 14 } }, "
3453+ " { items { uint32_value: 15 } items { uint32_value: 15 } }, "
3454+ " { items { uint32_value: 16 } items { uint32_value: 16 } }, "
3455+ " { items { uint32_value: 17 } items { uint32_value: 17 } }, "
3456+ " { items { uint32_value: 18 } items { uint32_value: 18 } }, "
3457+ " { items { uint32_value: 19 } items { uint32_value: 19 } }, "
3458+ " { items { uint32_value: 20 } items { uint32_value: 20 } }" );
3459+ }
3460+
32853461} // Y_UNIT_TEST_SUITE(DataShardVolatile)
32863462
32873463} // namespace NKikimr
0 commit comments