66#include < ydb/core/persqueue/events/global.h>
77#include < ydb/core/persqueue/user_info.h>
88#include < ydb/core/persqueue/write_meta.h>
9+ #include < ydb/core/testlib/actors/block_events.h>
910#include < ydb/core/tx/scheme_board/events.h>
1011#include < ydb/core/tx/scheme_board/events_internal.h>
1112#include < ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
@@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
19851986 return result;
19861987 }
19871988
1988- void WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
1989+ TVector<NJson::TJsonValue> WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
19891990 while (true ) {
19901991 const auto records = GetRecords (*server->GetRuntime (), sender, path, 0 );
19911992 for (ui32 i = 0 ; i < std::min (records.size (), expected.size ()); ++i) {
@@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
19951996 if (records.size () >= expected.size ()) {
19961997 UNIT_ASSERT_VALUES_EQUAL_C (records.size (), expected.size (),
19971998 " Unexpected record: " << records.at (expected.size ()).second );
1998- break ;
1999+ TVector<NJson::TJsonValue> values;
2000+ for (const auto & pr : records) {
2001+ bool ok = NJson::ReadJsonTree (pr.second , &values.emplace_back ());
2002+ Y_ABORT_UNLESS (ok);
2003+ }
2004+ return values;
19992005 }
20002006
20012007 SimulateSleep (server, TDuration::Seconds (1 ));
@@ -3692,6 +3698,136 @@ Y_UNIT_TEST_SUITE(Cdc) {
36923698 });
36933699 }
36943700
3701+ Y_UNIT_TEST (ResolvedTimestampForDisplacedUpsert) {
3702+ TPortManager portManager;
3703+ TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
3704+ .SetUseRealThreads (false )
3705+ .SetDomainName (" Root" )
3706+ );
3707+
3708+ TDisableDataShardLogBatching disableDataShardLogBatching;
3709+
3710+ auto & runtime = *server->GetRuntime ();
3711+ const auto edgeActor = runtime.AllocateEdgeActor ();
3712+
3713+ SetupLogging (runtime);
3714+ InitRoot (server, edgeActor);
3715+ SetSplitMergePartCountLimit (&runtime, -1 );
3716+ CreateShardedTable (server, edgeActor, " /Root" , " Table" , SimpleTable ());
3717+
3718+ WaitTxNotification (server, edgeActor, AsyncAlterAddStream (server, " /Root" , " Table" ,
3719+ WithVirtualTimestamps (WithResolvedTimestamps (
3720+ TDuration::Seconds (3 ), Updates (NKikimrSchemeOp::ECdcStreamFormatJson)))));
3721+
3722+ Cerr << " ... prepare" << Endl;
3723+ WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3724+ R"( {"resolved":"***"})" ,
3725+ });
3726+
3727+ KqpSimpleExec (runtime, R"(
3728+ UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
3729+ )" );
3730+
3731+ auto records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3732+ R"( {"resolved":"***"})" ,
3733+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3734+ R"( {"resolved":"***"})" ,
3735+ });
3736+
3737+ // Take the final step
3738+ ui64 lastStep = records.back ()[" resolved" ][0 ].GetUInteger ();
3739+ Cerr << " ... last heartbeat at " << lastStep << Endl;
3740+
3741+ const auto tableId = ResolveTableId (server, edgeActor, " /Root/Table" );
3742+ const auto shards = GetTableShards (server, edgeActor, " /Root/Table" );
3743+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 1u );
3744+
3745+ ui64 coordinator = ChangeStateStorage (Coordinator, server->GetSettings ().Domain );
3746+ ui64 snapshotStep = lastStep + 3000 - 1 ;
3747+ ForwardToTablet (runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps (coordinator, snapshotStep));
3748+
3749+ TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates (runtime,
3750+ [&](auto & ev) {
3751+ return ev->Get ()->Record .GetTimeBarrier () > snapshotStep;
3752+ });
3753+
3754+ Cerr << " ... performing a read from snapshot just before the next heartbeat" << Endl;
3755+ {
3756+ auto req = std::make_unique<TEvDataShard::TEvRead>();
3757+ {
3758+ auto & record = req->Record ;
3759+ record.SetReadId (1 );
3760+ record.MutableTableId ()->SetOwnerId (tableId.PathId .OwnerId );
3761+ record.MutableTableId ()->SetTableId (tableId.PathId .LocalPathId );
3762+ record.AddColumns (1 );
3763+ record.AddColumns (2 );
3764+ record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
3765+ ui32 key = 1 ;
3766+ TVector<TCell> keys;
3767+ keys.push_back (TCell::Make (key));
3768+ req->Keys .push_back (TSerializedCellVec (TSerializedCellVec::Serialize (keys)));
3769+ record.MutableSnapshot ()->SetStep (snapshotStep);
3770+ record.MutableSnapshot ()->SetTxId (Max<ui64>());
3771+ }
3772+ ForwardToTablet (runtime, shards.at (0 ), edgeActor, req.release ());
3773+ auto ev = runtime.GrabEdgeEventRethrow <TEvDataShard::TEvReadResult>(edgeActor);
3774+ auto * res = ev->Get ();
3775+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetStatus ().GetCode (), Ydb::StatusIds::SUCCESS);
3776+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetFinished (), true );
3777+ Cerr << " ... read finished" << Endl;
3778+ }
3779+ for (int i = 0 ; i < 10 ; ++i) {
3780+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3781+ }
3782+
3783+ Cerr << " ... starting upsert 1 (expected to displace)" << Endl;
3784+ auto upsert1 = KqpSimpleSend (runtime, R"(
3785+ UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
3786+ )" );
3787+ for (int i = 0 ; i < 10 ; ++i) {
3788+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3789+ }
3790+
3791+ Cerr << " ... starting upsert 2 (expected to displace)" << Endl;
3792+ auto upsert2 = KqpSimpleSend (runtime, R"(
3793+ UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
3794+ )" );
3795+ for (int i = 0 ; i < 10 ; ++i) {
3796+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3797+ }
3798+
3799+ Cerr << " ... unblocking updates" << Endl;
3800+ blockedUpdates.Unblock ().Stop ();
3801+ for (int i = 0 ; i < 10 ; ++i) {
3802+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3803+ }
3804+
3805+ Cerr << " ... checking the update is logged before the new resolved timestamp" << Endl;
3806+ records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3807+ R"( {"resolved":"***"})" ,
3808+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3809+ R"( {"resolved":"***"})" ,
3810+ R"( {"update":{"value":20},"key":[2],"ts":"***"})" ,
3811+ R"( {"update":{"value":30},"key":[3],"ts":"***"})" ,
3812+ R"( {"resolved":"***"})" ,
3813+ });
3814+
3815+ TRowVersion resolved (0 , 0 );
3816+ for (auto & record : records) {
3817+ if (record.Has (" resolved" )) {
3818+ resolved.Step = record[" resolved" ][0 ].GetUInteger ();
3819+ resolved.TxId = record[" resolved" ][1 ].GetUInteger ();
3820+ }
3821+ if (record.Has (" ts" )) {
3822+ TRowVersion ts (
3823+ record[" ts" ][0 ].GetUInteger (),
3824+ record[" ts" ][1 ].GetUInteger ());
3825+ UNIT_ASSERT_C (resolved < ts,
3826+ " Record with ts " << ts << " after resolved " << resolved);
3827+ }
3828+ }
3829+ }
3830+
36953831} // Cdc
36963832
36973833} // NKikimr
0 commit comments