66#include < ydb/core/persqueue/events/global.h>
77#include < ydb/core/persqueue/user_info.h>
88#include < ydb/core/persqueue/write_meta.h>
9+ #include < ydb/core/testlib/actors/block_events.h>
910#include < ydb/core/tx/scheme_board/events.h>
1011#include < ydb/core/tx/scheme_board/events_internal.h>
1112#include < ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
@@ -2065,7 +2066,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
20652066 return result;
20662067 }
20672068
2068- void WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
2069+ TVector<NJson::TJsonValue> WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
20692070 while (true ) {
20702071 const auto records = GetRecords (*server->GetRuntime (), sender, path, 0 );
20712072 for (ui32 i = 0 ; i < std::min (records.size (), expected.size ()); ++i) {
@@ -2075,7 +2076,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
20752076 if (records.size () >= expected.size ()) {
20762077 UNIT_ASSERT_VALUES_EQUAL_C (records.size (), expected.size (),
20772078 " Unexpected record: " << records.at (expected.size ()).second );
2078- break ;
2079+ TVector<NJson::TJsonValue> values;
2080+ for (const auto & pr : records) {
2081+ bool ok = NJson::ReadJsonTree (pr.second , &values.emplace_back ());
2082+ Y_ABORT_UNLESS (ok);
2083+ }
2084+ return values;
20792085 }
20802086
20812087 SimulateSleep (server, TDuration::Seconds (1 ));
@@ -3779,6 +3785,141 @@ Y_UNIT_TEST_SUITE(Cdc) {
37793785 });
37803786 }
37813787
3788+ Y_UNIT_TEST (ResolvedTimestampForDisplacedUpsert) {
3789+ TPortManager portManager;
3790+ TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
3791+ .SetUseRealThreads (false )
3792+ .SetDomainName (" Root" )
3793+ );
3794+
3795+ TDisableDataShardLogBatching disableDataShardLogBatching;
3796+
3797+ auto & runtime = *server->GetRuntime ();
3798+ const auto edgeActor = runtime.AllocateEdgeActor ();
3799+
3800+ SetupLogging (runtime);
3801+ InitRoot (server, edgeActor);
3802+ SetSplitMergePartCountLimit (&runtime, -1 );
3803+ CreateShardedTable (server, edgeActor, " /Root" , " Table" , SimpleTable ());
3804+
3805+ WaitTxNotification (server, edgeActor, AsyncAlterAddStream (server, " /Root" , " Table" ,
3806+ WithVirtualTimestamps (WithResolvedTimestamps (
3807+ TDuration::Seconds (3 ), Updates (NKikimrSchemeOp::ECdcStreamFormatJson)))));
3808+
3809+ Cerr << " ... prepare" << Endl;
3810+ WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3811+ R"( {"resolved":"***"})" ,
3812+ });
3813+
3814+ KqpSimpleExec (runtime, R"(
3815+ UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
3816+ )" );
3817+
3818+ auto records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3819+ R"( {"resolved":"***"})" ,
3820+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3821+ R"( {"resolved":"***"})" ,
3822+ });
3823+
3824+ // Take the final step
3825+ ui64 lastStep = records.back ()[" resolved" ][0 ].GetUInteger ();
3826+ Cerr << " ... last heartbeat at " << lastStep << Endl;
3827+
3828+ const auto tableId = ResolveTableId (server, edgeActor, " /Root/Table" );
3829+ const auto shards = GetTableShards (server, edgeActor, " /Root/Table" );
3830+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 1u );
3831+
3832+ ui64 coordinator = ChangeStateStorage (Coordinator, server->GetSettings ().Domain );
3833+ ui64 snapshotStep = lastStep + 3000 - 1 ;
3834+ ForwardToTablet (runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps (coordinator, snapshotStep));
3835+
3836+ TBlockEvents<TEvMediatorTimecast::TEvGranularUpdate> blockedGranularUpdates (runtime,
3837+ [&](auto & ev) {
3838+ return ev->Get ()->Record .GetLatestStep () > snapshotStep;
3839+ });
3840+ TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates (runtime,
3841+ [&](auto & ev) {
3842+ return ev->Get ()->Record .GetTimeBarrier () > snapshotStep;
3843+ });
3844+
3845+ Cerr << " ... performing a read from snapshot just before the next heartbeat" << Endl;
3846+ {
3847+ auto req = std::make_unique<TEvDataShard::TEvRead>();
3848+ {
3849+ auto & record = req->Record ;
3850+ record.SetReadId (1 );
3851+ record.MutableTableId ()->SetOwnerId (tableId.PathId .OwnerId );
3852+ record.MutableTableId ()->SetTableId (tableId.PathId .LocalPathId );
3853+ record.AddColumns (1 );
3854+ record.AddColumns (2 );
3855+ record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
3856+ ui32 key = 1 ;
3857+ TVector<TCell> keys;
3858+ keys.push_back (TCell::Make (key));
3859+ req->Keys .push_back (TSerializedCellVec (TSerializedCellVec::Serialize (keys)));
3860+ record.MutableSnapshot ()->SetStep (snapshotStep);
3861+ record.MutableSnapshot ()->SetTxId (Max<ui64>());
3862+ }
3863+ ForwardToTablet (runtime, shards.at (0 ), edgeActor, req.release ());
3864+ auto ev = runtime.GrabEdgeEventRethrow <TEvDataShard::TEvReadResult>(edgeActor);
3865+ auto * res = ev->Get ();
3866+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetStatus ().GetCode (), Ydb::StatusIds::SUCCESS);
3867+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetFinished (), true );
3868+ Cerr << " ... read finished" << Endl;
3869+ }
3870+ for (int i = 0 ; i < 10 ; ++i) {
3871+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3872+ }
3873+
3874+ Cerr << " ... starting upsert 1 (expected to displace)" << Endl;
3875+ auto upsert1 = KqpSimpleSend (runtime, R"(
3876+ UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
3877+ )" );
3878+ for (int i = 0 ; i < 10 ; ++i) {
3879+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3880+ }
3881+
3882+ Cerr << " ... starting upsert 2 (expected to displace)" << Endl;
3883+ auto upsert2 = KqpSimpleSend (runtime, R"(
3884+ UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
3885+ )" );
3886+ for (int i = 0 ; i < 10 ; ++i) {
3887+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3888+ }
3889+
3890+ Cerr << " ... unblocking updates" << Endl;
3891+ blockedGranularUpdates.Unblock ().Stop ();
3892+ blockedUpdates.Unblock ().Stop ();
3893+ for (int i = 0 ; i < 10 ; ++i) {
3894+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3895+ }
3896+
3897+ Cerr << " ... checking the update is logged before the new resolved timestamp" << Endl;
3898+ records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3899+ R"( {"resolved":"***"})" ,
3900+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3901+ R"( {"resolved":"***"})" ,
3902+ R"( {"update":{"value":20},"key":[2],"ts":"***"})" ,
3903+ R"( {"update":{"value":30},"key":[3],"ts":"***"})" ,
3904+ R"( {"resolved":"***"})" ,
3905+ });
3906+
3907+ TRowVersion resolved (0 , 0 );
3908+ for (auto & record : records) {
3909+ if (record.Has (" resolved" )) {
3910+ resolved.Step = record[" resolved" ][0 ].GetUInteger ();
3911+ resolved.TxId = record[" resolved" ][1 ].GetUInteger ();
3912+ }
3913+ if (record.Has (" ts" )) {
3914+ TRowVersion ts (
3915+ record[" ts" ][0 ].GetUInteger (),
3916+ record[" ts" ][1 ].GetUInteger ());
3917+ UNIT_ASSERT_C (resolved < ts,
3918+ " Record with ts " << ts << " after resolved " << resolved);
3919+ }
3920+ }
3921+ }
3922+
37823923} // Cdc
37833924
37843925} // NKikimr
0 commit comments