66#include < ydb/core/kqp/ut/common/kqp_ut_common.h>
77#include < ydb/core/protos/schemeshard/operations.pb.h>
88#include < ydb/core/tablet/resource_broker.h>
9+ #include < ydb/core/testlib/actors/block_events.h>
910#include < ydb/core/tx/schemeshard/ut_helpers/helpers.h>
1011#include < ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.h>
1112#include < ydb/core/tx/schemeshard/schemeshard_private.h>
@@ -1208,30 +1209,21 @@ value {
12081209 UpdateRow (runtime, " Original" , 2 , " valueB" , secondTablet);
12091210
12101211 // Add delay after copying tables
1211- bool dropNotification = false ;
1212- THolder<IEventHandle> delayed;
1212+ ui64 copyTablesTxId;
12131213 auto prevObserver = runtime.SetObserverFunc ([&](TAutoPtr<IEventHandle>& ev) {
1214- switch (ev->GetTypeRewrite ()) {
1215- case TEvSchemeShard::EvModifySchemeTransaction:
1216- break ;
1217- case TEvSchemeShard::EvNotifyTxCompletionResult:
1218- if (dropNotification) {
1219- delayed.Reset (ev.Release ());
1220- return TTestActorRuntime::EEventAction::DROP;
1214+ if (ev->GetTypeRewrite () == TEvSchemeShard::EvModifySchemeTransaction) {
1215+ const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
1216+ if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1217+ copyTablesTxId = msg->Record .GetTxId ();
12211218 }
1222- return TTestActorRuntime::EEventAction::PROCESS;
1223- default :
1224- return TTestActorRuntime::EEventAction::PROCESS;
1225- }
1226-
1227- const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
1228- if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1229- dropNotification = true ;
12301219 }
1231-
12321220 return TTestActorRuntime::EEventAction::PROCESS;
12331221 });
12341222
1223+ TBlockEvents<TEvSchemeShard::TEvNotifyTxCompletionResult> delay (runtime, [&](auto & ev) {
1224+ return ev->Get ()->Record .GetTxId () == copyTablesTxId;
1225+ });
1226+
12351227 // Start exporting table
12361228 TestExport (runtime, ++txId, " /MyRoot" , Sprintf (R"(
12371229 ExportToS3Settings {
@@ -1246,14 +1238,7 @@ value {
12461238 const ui64 exportId = txId;
12471239
12481240 // Wait for delay after copying tables
1249- if (!delayed) {
1250- TDispatchOptions opts;
1251- opts.FinalEvents .emplace_back ([&delayed](IEventHandle&) -> bool {
1252- return bool (delayed);
1253- });
1254- runtime.DispatchEvents (opts);
1255- }
1256- runtime.SetObserverFunc (prevObserver);
1241+ runtime.WaitFor (" delay after copying tables" , [&]{ return delay.size () >= 1 ; });
12571242
12581243 // Merge 2 tablets in 1 during the delay
12591244 TestAlterTable (runtime, ++txId, " /MyRoot" , R"(
@@ -1274,7 +1259,7 @@ value {
12741259 env.TestWaitNotification (runtime, txId);
12751260
12761261 // Finish the delay and continue exporting
1277- runtime. Send (delayed. Release (), 0 , true );
1262+ delay. Unblock ( );
12781263 env.TestWaitNotification (runtime, exportId);
12791264
12801265 // Check export
@@ -1373,30 +1358,21 @@ value {
13731358 UpdateRow (runtime, " Original" , 2 , " valueB" , firstTablet);
13741359
13751360 // Add delay after copying tables
1376- bool dropNotification = false ;
1377- THolder<IEventHandle> delayed;
1361+ ui64 copyTablesTxId;
13781362 auto prevObserver = runtime.SetObserverFunc ([&](TAutoPtr<IEventHandle>& ev) {
1379- switch (ev->GetTypeRewrite ()) {
1380- case TEvSchemeShard::EvModifySchemeTransaction:
1381- break ;
1382- case TEvSchemeShard::EvNotifyTxCompletionResult:
1383- if (dropNotification) {
1384- delayed.Reset (ev.Release ());
1385- return TTestActorRuntime::EEventAction::DROP;
1363+ if (ev->GetTypeRewrite () == TEvSchemeShard::EvModifySchemeTransaction) {
1364+ const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
1365+ if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1366+ copyTablesTxId = msg->Record .GetTxId ();
13861367 }
1387- return TTestActorRuntime::EEventAction::PROCESS;
1388- default :
1389- return TTestActorRuntime::EEventAction::PROCESS;
1390- }
1391-
1392- const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
1393- if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables) {
1394- dropNotification = true ;
13951368 }
1396-
13971369 return TTestActorRuntime::EEventAction::PROCESS;
13981370 });
13991371
1372+ TBlockEvents<TEvSchemeShard::TEvNotifyTxCompletionResult> delay (runtime, [&](auto & ev) {
1373+ return ev->Get ()->Record .GetTxId () == copyTablesTxId;
1374+ });
1375+
14001376 // Start exporting table
14011377 TestExport (runtime, ++txId, " /MyRoot" , Sprintf (R"(
14021378 ExportToS3Settings {
@@ -1410,15 +1386,8 @@ value {
14101386 )" , port));
14111387 const ui64 exportId = txId;
14121388
1413- // Wait for delay after copying tables
1414- if (!delayed) {
1415- TDispatchOptions opts;
1416- opts.FinalEvents .emplace_back ([&delayed](IEventHandle&) -> bool {
1417- return bool (delayed);
1418- });
1419- runtime.DispatchEvents (opts);
1420- }
1421- runtime.SetObserverFunc (prevObserver);
1389+ // Wait for delay after copying tables
1390+ runtime.WaitFor (" delay after copying tables" , [&]{ return delay.size () >= 1 ; });
14221391
14231392 // Split 2 tablets in 3 during the delay
14241393 TestAlterTable (runtime, ++txId, " /MyRoot" , R"(
@@ -1443,7 +1412,7 @@ value {
14431412 env.TestWaitNotification (runtime, txId);
14441413
14451414 // Finish the delay and continue exporting
1446- runtime. Send (delayed. Release (), 0 , true );
1415+ delay. Unblock ( );
14471416 env.TestWaitNotification (runtime, exportId);
14481417
14491418 // Check export
@@ -4521,30 +4490,21 @@ Y_UNIT_TEST_SUITE(TImportTests) {
45214490 UNIT_ASSERT (s3Mock.Start ());
45224491
45234492 // Add delay after creating table
4524- bool dropNotification = false ;
4525- THolder<IEventHandle> delayed;
4493+ ui64 createTableTxId;
45264494 auto prevObserver = runtime.SetObserverFunc ([&](TAutoPtr<IEventHandle>& ev) {
4527- switch (ev->GetTypeRewrite ()) {
4528- case TEvSchemeShard::EvModifySchemeTransaction:
4529- break ;
4530- case TEvSchemeShard::EvNotifyTxCompletionResult:
4531- if (dropNotification) {
4532- delayed.Reset (ev.Release ());
4533- return TTestActorRuntime::EEventAction::DROP;
4495+ if (ev->GetTypeRewrite () == TEvSchemeShard::EvModifySchemeTransaction) {
4496+ const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
4497+ if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateIndexedTable) {
4498+ createTableTxId = msg->Record .GetTxId ();
45344499 }
4535- return TTestActorRuntime::EEventAction::PROCESS;
4536- default :
4537- return TTestActorRuntime::EEventAction::PROCESS;
45384500 }
4539-
4540- const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
4541- if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateIndexedTable) {
4542- dropNotification = true ;
4543- }
4544-
45454501 return TTestActorRuntime::EEventAction::PROCESS;
45464502 });
45474503
4504+ TBlockEvents<TEvSchemeShard::TEvNotifyTxCompletionResult> delay (runtime, [&](auto & ev) {
4505+ return ev->Get ()->Record .GetTxId () == createTableTxId;
4506+ });
4507+
45484508 // Start importing table
45494509 TestImport (runtime, ++txId, " /MyRoot" , Sprintf (R"(
45504510 ImportFromS3Settings {
@@ -4559,14 +4519,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
45594519 const ui64 importId = txId;
45604520
45614521 // Wait for delay after creating table
4562- if (!delayed) {
4563- TDispatchOptions opts;
4564- opts.FinalEvents .emplace_back ([&delayed](IEventHandle&) -> bool {
4565- return bool (delayed);
4566- });
4567- runtime.DispatchEvents (opts);
4568- }
4569- runtime.SetObserverFunc (prevObserver);
4522+ runtime.WaitFor (" delay after creating table" , [&]{ return delay.size () >= 1 ; });
45704523
45714524 // Merge tablets during the delay should be blocked
45724525 const TVector<TExpectedResult> expectedError = {{ NKikimrScheme::StatusInvalidParameter }};
@@ -4576,7 +4529,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
45764529 )" , TTestTxConfig::FakeHiveTablets, TTestTxConfig::FakeHiveTablets + 1 ), expectedError);
45774530
45784531 // Finish the delay and continue importing
4579- runtime. Send (delayed. Release (), 0 , true );
4532+ delay. Unblock ( );
45804533 env.TestWaitNotification (runtime, importId);
45814534
45824535 // Check import
@@ -4623,30 +4576,21 @@ Y_UNIT_TEST_SUITE(TImportTests) {
46234576 UNIT_ASSERT (s3Mock.Start ());
46244577
46254578 // Add delay after creating table
4626- bool dropNotification = false ;
4627- THolder<IEventHandle> delayed;
4579+ ui64 createTableTxId;
46284580 auto prevObserver = runtime.SetObserverFunc ([&](TAutoPtr<IEventHandle>& ev) {
4629- switch (ev->GetTypeRewrite ()) {
4630- case TEvSchemeShard::EvModifySchemeTransaction:
4631- break ;
4632- case TEvSchemeShard::EvNotifyTxCompletionResult:
4633- if (dropNotification) {
4634- delayed.Reset (ev.Release ());
4635- return TTestActorRuntime::EEventAction::DROP;
4581+ if (ev->GetTypeRewrite () == TEvSchemeShard::EvModifySchemeTransaction) {
4582+ const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
4583+ if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateIndexedTable) {
4584+ createTableTxId = msg->Record .GetTxId ();
46364585 }
4637- return TTestActorRuntime::EEventAction::PROCESS;
4638- default :
4639- return TTestActorRuntime::EEventAction::PROCESS;
46404586 }
4641-
4642- const auto * msg = ev->Get <TEvSchemeShard::TEvModifySchemeTransaction>();
4643- if (msg->Record .GetTransaction (0 ).GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateIndexedTable) {
4644- dropNotification = true ;
4645- }
4646-
46474587 return TTestActorRuntime::EEventAction::PROCESS;
46484588 });
46494589
4590+ TBlockEvents<TEvSchemeShard::TEvNotifyTxCompletionResult> delay (runtime, [&](auto & ev) {
4591+ return ev->Get ()->Record .GetTxId () == createTableTxId;
4592+ });
4593+
46504594 // Start importing table
46514595 TestImport (runtime, ++txId, " /MyRoot" , Sprintf (R"(
46524596 ImportFromS3Settings {
@@ -4661,14 +4605,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
46614605 const ui64 importId = txId;
46624606
46634607 // Wait for delay after creating table
4664- if (!delayed) {
4665- TDispatchOptions opts;
4666- opts.FinalEvents .emplace_back ([&delayed](IEventHandle&) -> bool {
4667- return bool (delayed);
4668- });
4669- runtime.DispatchEvents (opts);
4670- }
4671- runtime.SetObserverFunc (prevObserver);
4608+ runtime.WaitFor (" delay after creating table" , [&]{ return delay.size () >= 1 ; });
46724609
46734610 // Split tablet during the delay should be blocked
46744611 const TVector<TExpectedResult> expectedError = {{ NKikimrScheme::StatusInvalidParameter }};
@@ -4682,7 +4619,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
46824619 )" , TTestTxConfig::FakeHiveTablets), expectedError);
46834620
46844621 // Finish the delay and continue importing
4685- runtime. Send (delayed. Release (), 0 , true );
4622+ delay. Unblock ( );
46864623 env.TestWaitNotification (runtime, importId);
46874624
46884625 // Check import
0 commit comments