@@ -3682,6 +3682,44 @@ Y_UNIT_TEST_SUITE(Cdc) {
36823682 MustNotLoseSchemaSnapshot (true );
36833683 }
36843684
3685+ Y_UNIT_TEST (ShouldBreakLocksOnConcurrentSchemeTx) {
3686+ TPortManager portManager;
3687+ TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
3688+ .SetUseRealThreads (false )
3689+ .SetDomainName (" Root" )
3690+ );
3691+
3692+ auto & runtime = *server->GetRuntime ();
3693+ const auto edgeActor = runtime.AllocateEdgeActor ();
3694+
3695+ SetupLogging (runtime);
3696+ InitRoot (server, edgeActor);
3697+ CreateShardedTable (server, edgeActor, " /Root" , " Table" , SimpleTable ());
3698+
3699+ WaitTxNotification (server, edgeActor, AsyncAlterAddStream (server, " /Root" , " Table" ,
3700+ Updates (NKikimrSchemeOp::ECdcStreamFormatJson)));
3701+
3702+ ExecSQL (server, edgeActor, " UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);" );
3703+
3704+ TString sessionId;
3705+ TString txId;
3706+ KqpSimpleBegin (runtime, sessionId, txId, " UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);" );
3707+
3708+ UNIT_ASSERT_VALUES_EQUAL (
3709+ KqpSimpleContinue (runtime, sessionId, txId, " SELECT key, value FROM `/Root/Table`;" ),
3710+ " { items { uint32_value: 1 } items { uint32_value: 11 } }" );
3711+
3712+ WaitTxNotification (server, edgeActor, AsyncAlterAddExtraColumn (server, " /Root" , " Table" ));
3713+
3714+ UNIT_ASSERT_VALUES_EQUAL (
3715+ KqpSimpleCommit (runtime, sessionId, txId, " SELECT 1;" ),
3716+ " ERROR: ABORTED" );
3717+
3718+ WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3719+ R"( {"update":{"value":10},"key":[1]})" ,
3720+ });
3721+ }
3722+
36853723 Y_UNIT_TEST (ResolvedTimestampsContinueAfterMerge) {
36863724 TPortManager portManager;
36873725 TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
0 commit comments