@@ -1662,129 +1662,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {
16621662 }
16631663}
16641664
1665- Y_UNIT_TEST (MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
1666- TPortManager pm;
1667- NKikimrConfig::TAppConfig app;
1668- app.MutableTableServiceConfig ()->SetEnableKqpDataQueryStreamLookup (false );
1669- TServerSettings serverSettings (pm.GetPort (2134 ));
1670- serverSettings.SetDomainName (" Root" )
1671- .SetAppConfig (app)
1672- .SetUseRealThreads (false );
1673-
1674- Tests::TServer::TPtr server = new TServer (serverSettings);
1675- auto &runtime = *server->GetRuntime ();
1676- auto sender = runtime.AllocateEdgeActor ();
1677-
1678- // This test requires barrier to be disabled
1679- runtime.GetAppData ().FeatureFlags .SetDisableDataShardBarrier (true );
1680-
1681- runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
1682- runtime.SetLogPriority (NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
1683-
1684- const bool usesVolatileTxs = runtime.GetAppData (0 ).FeatureFlags .GetEnableDataShardVolatileTransactions ();
1685-
1686- InitRoot (server, sender);
1687-
1688- CreateShardedTable (server, sender, " /Root" , " table-1" , 1 );
1689- CreateShardedTable (server, sender, " /Root" , " table-2" , 1 );
1690- auto table1shards = GetTableShards (server, sender, " /Root/table-1" );
1691- auto table2shards = GetTableShards (server, sender, " /Root/table-2" );
1692-
1693- ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);" ));
1694- ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);" ));
1695-
1696- TString sessionId = CreateSessionRPC (runtime);
1697-
1698- TString txId;
1699- {
1700- auto result = KqpSimpleBegin (runtime, sessionId, txId, Q_ (R"(
1701- SELECT * FROM `/Root/table-1` WHERE key = 1
1702- UNION ALL
1703- SELECT * FROM `/Root/table-2` WHERE key = 2
1704- ORDER BY key)" ));
1705- UNIT_ASSERT_VALUES_EQUAL (
1706- result,
1707- " { items { uint32_value: 1 } items { uint32_value: 1 } }, "
1708- " { items { uint32_value: 2 } items { uint32_value: 1 } }" );
1709- }
1710-
1711- // Capture and block all readset messages
1712- TVector<THolder<IEventHandle>> readSets;
1713- auto captureRS = [&](TAutoPtr<IEventHandle> &event) -> auto {
1714- if (event->GetTypeRewrite () == TEvTxProcessing::EvReadSet) {
1715- readSets.push_back (std::move (event));
1716- return TTestActorRuntime::EEventAction::DROP;
1717- }
1718- return TTestActorRuntime::EEventAction::PROCESS;
1719- };
1720- auto prevObserverFunc = runtime.SetObserverFunc (captureRS);
1721-
1722- // Send a commit request, it would block on readset exchange
1723- SendRequest (runtime, MakeSimpleRequestRPC (Q_ (R"(
1724- UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
1725- UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))" ), sessionId, txId, true ));
1726-
1727- // Wait until we captured both readsets
1728- const size_t expectedReadSets = usesVolatileTxs ? 4 : 2 ;
1729- if (readSets.size () < expectedReadSets) {
1730- TDispatchOptions options;
1731- options.FinalEvents .emplace_back (
1732- [&](IEventHandle &) -> bool {
1733- return readSets.size () >= expectedReadSets;
1734- });
1735- runtime.DispatchEvents (options);
1736- }
1737- UNIT_ASSERT_VALUES_EQUAL (readSets.size (), expectedReadSets);
1738-
1739- // Reboot table-1 tablet
1740- readSets.clear ();
1741- RebootTablet (runtime, table1shards[0 ], sender);
1742-
1743- // Wait until we captured both readsets again
1744- if (readSets.size () < expectedReadSets) {
1745- TDispatchOptions options;
1746- options.FinalEvents .emplace_back (
1747- [&](IEventHandle &) -> bool {
1748- return readSets.size () >= expectedReadSets;
1749- });
1750- runtime.DispatchEvents (options);
1751- }
1752- UNIT_ASSERT_VALUES_EQUAL (readSets.size (), expectedReadSets);
1753-
1754- // Select keys 1 and 3, we expect this immediate tx to succeed
1755- // Note that key 3 is not written yet, but we pretend immediate tx
1756- // executes before that waiting transaction (no key 3 yet).
1757- // Note: volatile transactions block reads until they are resolved, so this read is skipped
1758- if (!usesVolatileTxs) {
1759- auto result = KqpSimpleExec (runtime, Q_ (" SELECT key, value FROM `/Root/table-1` WHERE key = 1 OR key = 3;" ));
1760- UNIT_ASSERT_VALUES_EQUAL (result, " { items { uint32_value: 1 } items { uint32_value: 1 } }" );
1761- }
1762-
1763- // Upsert key 1, we expect this immediate tx to be executed successfully because it lies to the right on the global timeline
1764- {
1765- auto result = KqpSimpleExec (runtime, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 3);" ));
1766- UNIT_ASSERT_VALUES_EQUAL (result, " <empty>" );
1767- }
1768-
1769- // Upsert key 5, this immediate tx should be executed successfully too
1770- {
1771- auto result = KqpSimpleExec (runtime, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3);" ));
1772- UNIT_ASSERT_VALUES_EQUAL (result, " <empty>" );
1773- }
1774-
1775- // Release readsets allowing tx to progress
1776- runtime.SetObserverFunc (prevObserverFunc);
1777- for (auto & ev : readSets) {
1778- runtime.Send (ev.Release (), 0 , /* viaActorSystem */ true );
1779- }
1780-
1781- // Select key 3, we expect a success
1782- {
1783- auto result = KqpSimpleExec (runtime, Q_ (" SELECT key, value FROM `/Root/table-1` WHERE key = 3;" ));
1784- UNIT_ASSERT_VALUES_EQUAL (result, " { items { uint32_value: 3 } items { uint32_value: 2 } }" );
1785- }
1786- }
1787-
17881665Y_UNIT_TEST_TWIN (TestOutOfOrderRestartLocksReorderedWithoutBarrier, StreamLookup) {
17891666 TPortManager pm;
17901667 NKikimrConfig::TAppConfig app;
0 commit comments