@@ -1662,129 +1662,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {
1662
1662
}
1663
1663
}
1664
1664
1665
- Y_UNIT_TEST (MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
1666
- TPortManager pm;
1667
- NKikimrConfig::TAppConfig app;
1668
- app.MutableTableServiceConfig ()->SetEnableKqpDataQueryStreamLookup (false );
1669
- TServerSettings serverSettings (pm.GetPort (2134 ));
1670
- serverSettings.SetDomainName (" Root" )
1671
- .SetAppConfig (app)
1672
- .SetUseRealThreads (false );
1673
-
1674
- Tests::TServer::TPtr server = new TServer (serverSettings);
1675
- auto &runtime = *server->GetRuntime ();
1676
- auto sender = runtime.AllocateEdgeActor ();
1677
-
1678
- // This test requires barrier to be disabled
1679
- runtime.GetAppData ().FeatureFlags .SetDisableDataShardBarrier (true );
1680
-
1681
- runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
1682
- runtime.SetLogPriority (NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
1683
-
1684
- const bool usesVolatileTxs = runtime.GetAppData (0 ).FeatureFlags .GetEnableDataShardVolatileTransactions ();
1685
-
1686
- InitRoot (server, sender);
1687
-
1688
- CreateShardedTable (server, sender, " /Root" , " table-1" , 1 );
1689
- CreateShardedTable (server, sender, " /Root" , " table-2" , 1 );
1690
- auto table1shards = GetTableShards (server, sender, " /Root/table-1" );
1691
- auto table2shards = GetTableShards (server, sender, " /Root/table-2" );
1692
-
1693
- ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);" ));
1694
- ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);" ));
1695
-
1696
- TString sessionId = CreateSessionRPC (runtime);
1697
-
1698
- TString txId;
1699
- {
1700
- auto result = KqpSimpleBegin (runtime, sessionId, txId, Q_ (R"(
1701
- SELECT * FROM `/Root/table-1` WHERE key = 1
1702
- UNION ALL
1703
- SELECT * FROM `/Root/table-2` WHERE key = 2
1704
- ORDER BY key)" ));
1705
- UNIT_ASSERT_VALUES_EQUAL (
1706
- result,
1707
- " { items { uint32_value: 1 } items { uint32_value: 1 } }, "
1708
- " { items { uint32_value: 2 } items { uint32_value: 1 } }" );
1709
- }
1710
-
1711
- // Capture and block all readset messages
1712
- TVector<THolder<IEventHandle>> readSets;
1713
- auto captureRS = [&](TAutoPtr<IEventHandle> &event) -> auto {
1714
- if (event->GetTypeRewrite () == TEvTxProcessing::EvReadSet) {
1715
- readSets.push_back (std::move (event));
1716
- return TTestActorRuntime::EEventAction::DROP;
1717
- }
1718
- return TTestActorRuntime::EEventAction::PROCESS;
1719
- };
1720
- auto prevObserverFunc = runtime.SetObserverFunc (captureRS);
1721
-
1722
- // Send a commit request, it would block on readset exchange
1723
- SendRequest (runtime, MakeSimpleRequestRPC (Q_ (R"(
1724
- UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
1725
- UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))" ), sessionId, txId, true ));
1726
-
1727
- // Wait until we captured both readsets
1728
- const size_t expectedReadSets = usesVolatileTxs ? 4 : 2 ;
1729
- if (readSets.size () < expectedReadSets) {
1730
- TDispatchOptions options;
1731
- options.FinalEvents .emplace_back (
1732
- [&](IEventHandle &) -> bool {
1733
- return readSets.size () >= expectedReadSets;
1734
- });
1735
- runtime.DispatchEvents (options);
1736
- }
1737
- UNIT_ASSERT_VALUES_EQUAL (readSets.size (), expectedReadSets);
1738
-
1739
- // Reboot table-1 tablet
1740
- readSets.clear ();
1741
- RebootTablet (runtime, table1shards[0 ], sender);
1742
-
1743
- // Wait until we captured both readsets again
1744
- if (readSets.size () < expectedReadSets) {
1745
- TDispatchOptions options;
1746
- options.FinalEvents .emplace_back (
1747
- [&](IEventHandle &) -> bool {
1748
- return readSets.size () >= expectedReadSets;
1749
- });
1750
- runtime.DispatchEvents (options);
1751
- }
1752
- UNIT_ASSERT_VALUES_EQUAL (readSets.size (), expectedReadSets);
1753
-
1754
- // Select keys 1 and 3, we expect this immediate tx to succeed
1755
- // Note that key 3 is not written yet, but we pretend immediate tx
1756
- // executes before that waiting transaction (no key 3 yet).
1757
- // Note: volatile transactions block reads until they are resolved, so this read is skipped
1758
- if (!usesVolatileTxs) {
1759
- auto result = KqpSimpleExec (runtime, Q_ (" SELECT key, value FROM `/Root/table-1` WHERE key = 1 OR key = 3;" ));
1760
- UNIT_ASSERT_VALUES_EQUAL (result, " { items { uint32_value: 1 } items { uint32_value: 1 } }" );
1761
- }
1762
-
1763
- // Upsert key 1, we expect this immediate tx to be executed successfully because it lies to the right on the global timeline
1764
- {
1765
- auto result = KqpSimpleExec (runtime, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 3);" ));
1766
- UNIT_ASSERT_VALUES_EQUAL (result, " <empty>" );
1767
- }
1768
-
1769
- // Upsert key 5, this immediate tx should be executed successfully too
1770
- {
1771
- auto result = KqpSimpleExec (runtime, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3);" ));
1772
- UNIT_ASSERT_VALUES_EQUAL (result, " <empty>" );
1773
- }
1774
-
1775
- // Release readsets allowing tx to progress
1776
- runtime.SetObserverFunc (prevObserverFunc);
1777
- for (auto & ev : readSets) {
1778
- runtime.Send (ev.Release (), 0 , /* viaActorSystem */ true );
1779
- }
1780
-
1781
- // Select key 3, we expect a success
1782
- {
1783
- auto result = KqpSimpleExec (runtime, Q_ (" SELECT key, value FROM `/Root/table-1` WHERE key = 3;" ));
1784
- UNIT_ASSERT_VALUES_EQUAL (result, " { items { uint32_value: 3 } items { uint32_value: 2 } }" );
1785
- }
1786
- }
1787
-
1788
1665
Y_UNIT_TEST_TWIN (TestOutOfOrderRestartLocksReorderedWithoutBarrier, StreamLookup) {
1789
1666
TPortManager pm;
1790
1667
NKikimrConfig::TAppConfig app;
0 commit comments