@@ -4551,6 +4551,130 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
45514551 }
45524552 }
45534553
4554+ void CompactBorrowed (TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) {
4555+ auto msg = MakeHolder<TEvDataShard::TEvCompactBorrowed>(tableId.PathId );
4556+ auto sender = runtime.AllocateEdgeActor ();
4557+ runtime.SendToPipe (shardId, sender, msg.Release (), 0 , GetPipeConfigWithRetries ());
4558+ runtime.GrabEdgeEventRethrow <TEvDataShard::TEvCompactBorrowedResult>(sender);
4559+ }
4560+
4561+ Y_UNIT_TEST (PostMergeNotCompactedTooEarly) {
4562+ TPortManager pm;
4563+ TServerSettings serverSettings (pm.GetPort (2134 ));
4564+ serverSettings.SetDomainName (" Root" )
4565+ .SetUseRealThreads (false )
4566+ .SetDomainPlanResolution (100 );
4567+
4568+ Tests::TServer::TPtr server = new TServer (serverSettings);
4569+ auto &runtime = *server->GetRuntime ();
4570+ auto sender = runtime.AllocateEdgeActor ();
4571+
4572+ runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4573+
4574+ InitRoot (server, sender);
4575+
4576+ TDisableDataShardLogBatching disableDataShardLogBatching;
4577+
4578+ KqpSchemeExec (runtime, R"(
4579+ CREATE TABLE `/Root/table` (key int, value bytes, PRIMARY KEY (key))
4580+ WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
4581+ PARTITION_AT_KEYS = (5));
4582+ )" );
4583+
4584+ const auto shards = GetTableShards (server, sender, " /Root/table" );
4585+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 2u );
4586+ const auto tableId = ResolveTableId (server, sender, " /Root/table" );
4587+
4588+ for (int i = 0 ; i < 20 ; ++i) {
4589+ Cerr << " ... upserting key " << i << Endl;
4590+ auto query = Sprintf (R"(
4591+ UPSERT INTO `/Root/table` (key, value) VALUES (%d, '%s');
4592+ )" , i, TString (128 * 1024 , ' x' ).c_str ());
4593+ ExecSQL (server, sender, query);
4594+ if (i >= 5 ) {
4595+ Cerr << " ... compacting shard " << shards.at (1 ) << Endl;
4596+ CompactTable (runtime, shards.at (1 ), tableId, false );
4597+ } else if (i == 4 ) {
4598+ Cerr << " ... compacting shard " << shards.at (0 ) << Endl;
4599+ CompactTable (runtime, shards.at (0 ), tableId, false );
4600+ }
4601+ }
4602+
4603+ // Read (and snapshot) current data, so it doesn't go away on compaction
4604+ UNIT_ASSERT_VALUES_EQUAL (
4605+ KqpSimpleExec (runtime, " SELECT COUNT(*) FROM `/Root/table`;" ),
4606+ " { items { uint64_value: 20 } }" );
4607+
4608+ // Delete all the data in shard 0, this is small and will stay in memtable
4609+ // But when borrowed dst compaction will have pressure to compact it all
4610+ ExecSQL (server, sender, " DELETE FROM `/Root/table` WHERE key < 5" );
4611+
4612+ std::vector<TEvDataShard::TEvSplitTransferSnapshot::TPtr> snapshots;
4613+ auto captureSnapshots = runtime.AddObserver <TEvDataShard::TEvSplitTransferSnapshot>(
4614+ [&](TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev) {
4615+ auto * msg = ev->Get ();
4616+ Cerr << " ... captured snapshot from " << msg->Record .GetSrcTabletId () << Endl;
4617+ snapshots.emplace_back (ev.Release ());
4618+ });
4619+
4620+ Cerr << " ... merging table" << Endl;
4621+ SetSplitMergePartCountLimit (server->GetRuntime (), -1 );
4622+ ui64 txId = AsyncMergeTable (server, sender, " /Root/table" , shards);
4623+ Cerr << " ... started merge " << txId << Endl;
4624+ WaitFor (runtime, [&]{ return snapshots.size () >= 2 ; }, " both src tablet snapshots" );
4625+
4626+ std::vector<TEvBlobStorage::TEvGet::TPtr> gets;
4627+ auto captureGets = runtime.AddObserver <TEvBlobStorage::TEvGet>(
4628+ [&](TEvBlobStorage::TEvGet::TPtr& ev) {
4629+ auto * msg = ev->Get ();
4630+ if (msg->Queries [0 ].Id .TabletID () == shards.at (1 )) {
4631+ Cerr << " ... blocking blob get of " << msg->Queries [0 ].Id << Endl;
4632+ gets.emplace_back (ev.Release ());
4633+ }
4634+ });
4635+
4636+ // Release snapshot for shard 0 then shard 1
4637+ captureSnapshots.Remove ();
4638+ Cerr << " ... unlocking snapshots from tablet " << shards.at (0 ) << Endl;
4639+ for (auto & ev : snapshots) {
4640+ if (ev && ev->Get ()->Record .GetSrcTabletId () == shards.at (0 )) {
4641+ runtime.Send (ev.Release (), 0 , true );
4642+ }
4643+ }
4644+ Cerr << " ... unblocking snapshots from tablet " << shards.at (1 ) << Endl;
4645+ for (auto & ev : snapshots) {
4646+ if (ev && ev->Get ()->Record .GetSrcTabletId () == shards.at (1 )) {
4647+ runtime.Send (ev.Release (), 0 , true );
4648+ }
4649+ }
4650+
4651+ // Let it commit above snapshots and incorrectly compact after the first one is loaded and merged
4652+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4653+ UNIT_ASSERT (gets.size () > 0 );
4654+
4655+ Cerr << " ... unblocking blob gets" << Endl;
4656+ captureGets.Remove ();
4657+ for (auto & ev : gets) {
4658+ runtime.Send (ev.Release (), 0 , true );
4659+ }
4660+
4661+ // Let it finish loading the second snapshot
4662+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4663+
4664+ // Wait for merge to complete and start a borrowed compaction
4665+ // When bug is present it will cause newly compacted to part to have epoch larger than previously compacted
4666+ WaitTxNotification (server, sender, txId);
4667+ const auto merged = GetTableShards (server, sender, " /Root/table" );
4668+ UNIT_ASSERT_VALUES_EQUAL (merged.size (), 1u );
4669+ Cerr << " ... compacting borrowed parts in shard " << merged.at (0 ) << Endl;
4670+ CompactBorrowed (runtime, merged.at (0 ), tableId);
4671+
4672+ // Validate we have an expected number of rows
4673+ UNIT_ASSERT_VALUES_EQUAL (
4674+ KqpSimpleExec (runtime, " SELECT COUNT(*) FROM `/Root/table`;" ),
4675+ " { items { uint64_value: 15 } }" );
4676+ }
4677+
45544678}
45554679
45564680} // namespace NKikimr
0 commit comments