@@ -2326,6 +2326,100 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) {
23262326 });
23272327}
23282328
2329+ Y_UNIT_TEST (TestReadAndDeleteConsumer) {
2330+ TTestContext tc;
2331+ RunTestWithReboots (tc.TabletIds , [&]() {
2332+ return tc.InitialEventsFilter .Prepare ();
2333+ }, [&](const TString& dispatchName, std::function<void (TTestActorRuntime&)> setup, bool & activeZone) {
2334+ TFinalizer finalizer (tc);
2335+ tc.Prepare (dispatchName, setup, activeZone);
2336+ activeZone = false ;
2337+ tc.Runtime ->SetScheduledLimit (2000 );
2338+ tc.Runtime ->SetScheduledEventFilter (&tc.ImmediateLogFlushAndRequestTimeoutFilter );
2339+
2340+ TVector<std::pair<ui64, TString>> data;
2341+ TString msg;
2342+ msg.resize (102400 , ' a' );
2343+ for (ui64 i = 1 ; i <= 1000 ; ++i) {
2344+ data.emplace_back (i, msg);
2345+ }
2346+
2347+ PQTabletPrepare ({.maxCountInPartition =100 , .deleteTime =TDuration::Days (2 ).Seconds (), .partitions =1 },
2348+ {{" user1" , true }, {" user2" , true }}, tc);
2349+ CmdWrite (0 , " sourceid1" , data, tc, false , {}, true );
2350+
2351+ // Reset tablet cache
2352+ PQTabletRestart (tc);
2353+
2354+ TAutoPtr<IEventHandle> handle;
2355+ TEvPersQueue::TEvResponse* readResult = nullptr ;
2356+ THolder<TEvPersQueue::TEvRequest> readRequest;
2357+ TEvPersQueue::TEvUpdateConfigResponse* consumerDeleteResult = nullptr ;
2358+ THolder<TEvPersQueue::TEvUpdateConfig> consumerDeleteRequest;
2359+
2360+ // Read request
2361+ {
2362+ readRequest.Reset (new TEvPersQueue::TEvRequest);
2363+ auto req = readRequest->Record .MutablePartitionRequest ();
2364+ req->SetPartition (0 );
2365+ auto read = req->MutableCmdRead ();
2366+ read->SetOffset (1 );
2367+ read->SetClientId (" user1" );
2368+ read->SetCount (1 );
2369+ read->SetBytes (1'000'000 );
2370+ read->SetTimeoutMs (5000 );
2371+ }
2372+
2373+ // Consumer delete request
2374+ {
2375+ consumerDeleteRequest.Reset (new TEvPersQueue::TEvUpdateConfig ());
2376+ consumerDeleteRequest->MutableRecord ()->SetTxId (42 );
2377+ auto & cfg = *consumerDeleteRequest->MutableRecord ()->MutableTabletConfig ();
2378+ cfg.SetVersion (42 );
2379+ cfg.AddPartitionIds (0 );
2380+ cfg.AddPartitions ()->SetPartitionId (0 );
2381+ cfg.SetLocalDC (true );
2382+ cfg.SetTopic (" topic" );
2383+ auto & cons = *cfg.AddConsumers ();
2384+ cons.SetName (" user2" );
2385+ cons.SetImportant (true );
2386+ }
2387+
2388+ TActorId edge = tc.Runtime ->AllocateEdgeActor ();
2389+
2390+ // Delete consumer during read request
2391+ tc.Runtime ->SendToPipe (tc.TabletId , tc.Edge , readRequest.Release (), 0 , GetPipeConfigWithRetries ());
2392+
2393+ // Intercept TEvPQ::TEvBlobResponse event
2394+ std::vector<TEvPQ::TEvBlobResponse::TPtr> capturedBlobResponses;
2395+ auto captureBlobResponsesObserver = tc.Runtime ->AddObserver <TEvPQ::TEvBlobResponse>([&](TEvPQ::TEvBlobResponse::TPtr& ev) {
2396+ capturedBlobResponses.emplace_back ().Swap (ev);
2397+ });
2398+
2399+ // Delete consumer while read request is still in progress
2400+ tc.Runtime ->SendToPipe (tc.TabletId , edge, consumerDeleteRequest.Release (), 0 , GetPipeConfigWithRetries ());
2401+ consumerDeleteResult = tc.Runtime ->GrabEdgeEvent <TEvPersQueue::TEvUpdateConfigResponse>(handle);
2402+ {
2403+ // Cerr << "Got consumer delete response: " << consumerDeleteResult->Record << Endl;
2404+ UNIT_ASSERT (consumerDeleteResult->Record .HasStatus ());
2405+ UNIT_ASSERT_EQUAL (consumerDeleteResult->Record .GetStatus (), NKikimrPQ::EStatus::OK);
2406+ }
2407+
2408+ // Resend intercepted blob responses and wait for read result
2409+ captureBlobResponsesObserver.Remove ();
2410+ for (auto & ev : capturedBlobResponses) {
2411+ tc.Runtime ->Send (ev.Release (), 0 , true );
2412+ }
2413+
2414+ readResult = tc.Runtime ->GrabEdgeEvent <TEvPersQueue::TEvResponse>(handle);
2415+ {
2416+ // Cerr << "Got read response: " << readResult->Record << Endl;
2417+ UNIT_ASSERT (readResult->Record .HasStatus ());
2418+ UNIT_ASSERT_EQUAL (readResult->Record .GetErrorCode (), NPersQueue::NErrorCode::BAD_REQUEST);
2419+ UNIT_ASSERT_STRING_CONTAINS_C (readResult->Record .GetErrorReason (), " Consumer user1 is gone from partition" , readResult->Record .Utf8DebugString ());
2420+ }
2421+ });
2422+ }
23292423
23302424
23312425} // Y_UNIT_TEST_SUITE(TPQTest)
0 commit comments