Skip to content

Commit bc145a8

Browse files
authored
Fix topic read crash (ydb-platform#11043)
1 parent 9e46884 commit bc145a8

File tree

4 files changed

+128
-8
lines changed

4 files changed

+128
-8
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,18 +1233,25 @@ void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorCont
12331233

12341234
void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
12351235
const ui64 cookie = ev->Get()->GetCookie();
1236-
Y_ABORT_UNLESS(ReadInfo.contains(cookie));
1237-
12381236
auto it = ReadInfo.find(cookie);
1239-
Y_ABORT_UNLESS(it != ReadInfo.end());
1237+
1238+
// If there is no such cookie, then read was canceled.
1239+
// For example, it can be after consumer deletion
1240+
if (it == ReadInfo.end()) {
1241+
return;
1242+
}
12401243

12411244
TReadInfo info = std::move(it->second);
12421245
ReadInfo.erase(it);
12431246

1244-
//make readinfo class
1245-
auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx);
1247+
auto* userInfo = UsersInfoStorage->GetIfExists(info.User);
1248+
if (!userInfo) {
1249+
ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(info.User));
1250+
OnReadRequestFinished(info.Destination, 0, info.User, ctx);
1251+
}
1252+
12461253
TReadAnswer answer(info.FormAnswer(
1247-
ctx, *ev->Get(), EndOffset, Partition, &userInfo,
1254+
ctx, *ev->Get(), EndOffset, Partition, userInfo,
12481255
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
12491256
));
12501257
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
@@ -2434,6 +2441,20 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
24342441
}
24352442

24362443
UsersInfoStorage->Remove(user, ctx);
2444+
2445+
// Finish all ongoing reads
2446+
std::unordered_set<ui64> readCookies;
2447+
for (auto& [cookie, info] : ReadInfo) {
2448+
if (info.User == user) {
2449+
readCookies.insert(cookie);
2450+
ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(user));
2451+
OnReadRequestFinished(info.Destination, 0, user, ctx);
2452+
}
2453+
}
2454+
for (ui64 cookie : readCookies) {
2455+
ReadInfo.erase(cookie);
2456+
}
2457+
24372458
Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
24382459
}
24392460
}

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
623623
static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);
624624
void RemovePendingRequests(TMessageQueue& requests);
625625
void RemoveMessagesToQueue(TMessageQueue& requests);
626+
static TString GetConsumerDeletedMessage(TStringBuf consumerName);
626627

627628
private:
628629
ui64 TabletID;

ydb/core/persqueue/partition_read.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -754,8 +754,8 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
754754
auto* read = readEvent->Get();
755755
const TString& user = read->ClientId;
756756
auto userInfo = UsersInfoStorage->GetIfExists(user);
757-
if(!userInfo) {
758-
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "cannot finish read request. Consumer " << read->ClientId << " is gone from partition");
757+
if (!userInfo) {
758+
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(read->ClientId));
759759
Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
760760
OnReadRequestFinished(read->Cookie, 0, user, ctx);
761761
return;
@@ -1037,4 +1037,8 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10371037
ctx.Send(BlobCache, request.Release());
10381038
}
10391039

1040+
TString TPartition::GetConsumerDeletedMessage(TStringBuf consumerName) {
1041+
return TStringBuilder() << "cannot finish read request. Consumer " << consumerName << " is gone from partition";
1042+
}
1043+
10401044
} // namespace NKikimr::NPQ

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2326,6 +2326,100 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) {
23262326
});
23272327
}
23282328

2329+
Y_UNIT_TEST(TestReadAndDeleteConsumer) {
2330+
TTestContext tc;
2331+
RunTestWithReboots(tc.TabletIds, [&]() {
2332+
return tc.InitialEventsFilter.Prepare();
2333+
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
2334+
TFinalizer finalizer(tc);
2335+
tc.Prepare(dispatchName, setup, activeZone);
2336+
activeZone = false;
2337+
tc.Runtime->SetScheduledLimit(2000);
2338+
tc.Runtime->SetScheduledEventFilter(&tc.ImmediateLogFlushAndRequestTimeoutFilter);
2339+
2340+
TVector<std::pair<ui64, TString>> data;
2341+
TString msg;
2342+
msg.resize(102400, 'a');
2343+
for (ui64 i = 1; i <= 1000; ++i) {
2344+
data.emplace_back(i, msg);
2345+
}
2346+
2347+
PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=1},
2348+
{{"user1", true}, {"user2", true}}, tc);
2349+
CmdWrite(0, "sourceid1", data, tc, false, {}, true);
2350+
2351+
// Reset tablet cache
2352+
PQTabletRestart(tc);
2353+
2354+
TAutoPtr<IEventHandle> handle;
2355+
TEvPersQueue::TEvResponse* readResult = nullptr;
2356+
THolder<TEvPersQueue::TEvRequest> readRequest;
2357+
TEvPersQueue::TEvUpdateConfigResponse* consumerDeleteResult = nullptr;
2358+
THolder<TEvPersQueue::TEvUpdateConfig> consumerDeleteRequest;
2359+
2360+
// Read request
2361+
{
2362+
readRequest.Reset(new TEvPersQueue::TEvRequest);
2363+
auto req = readRequest->Record.MutablePartitionRequest();
2364+
req->SetPartition(0);
2365+
auto read = req->MutableCmdRead();
2366+
read->SetOffset(1);
2367+
read->SetClientId("user1");
2368+
read->SetCount(1);
2369+
read->SetBytes(1'000'000);
2370+
read->SetTimeoutMs(5000);
2371+
}
2372+
2373+
// Consumer delete request
2374+
{
2375+
consumerDeleteRequest.Reset(new TEvPersQueue::TEvUpdateConfig());
2376+
consumerDeleteRequest->MutableRecord()->SetTxId(42);
2377+
auto& cfg = *consumerDeleteRequest->MutableRecord()->MutableTabletConfig();
2378+
cfg.SetVersion(42);
2379+
cfg.AddPartitionIds(0);
2380+
cfg.AddPartitions()->SetPartitionId(0);
2381+
cfg.SetLocalDC(true);
2382+
cfg.SetTopic("topic");
2383+
auto& cons = *cfg.AddConsumers();
2384+
cons.SetName("user2");
2385+
cons.SetImportant(true);
2386+
}
2387+
2388+
TActorId edge = tc.Runtime->AllocateEdgeActor();
2389+
2390+
// Delete consumer during read request
2391+
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, readRequest.Release(), 0, GetPipeConfigWithRetries());
2392+
2393+
// Intercept TEvPQ::TEvBlobResponse event
2394+
std::vector<TEvPQ::TEvBlobResponse::TPtr> capturedBlobResponses;
2395+
auto captureBlobResponsesObserver = tc.Runtime->AddObserver<TEvPQ::TEvBlobResponse>([&](TEvPQ::TEvBlobResponse::TPtr& ev) {
2396+
capturedBlobResponses.emplace_back().Swap(ev);
2397+
});
2398+
2399+
// Delete consumer while read request is still in progress
2400+
tc.Runtime->SendToPipe(tc.TabletId, edge, consumerDeleteRequest.Release(), 0, GetPipeConfigWithRetries());
2401+
consumerDeleteResult = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
2402+
{
2403+
//Cerr << "Got consumer delete response: " << consumerDeleteResult->Record << Endl;
2404+
UNIT_ASSERT(consumerDeleteResult->Record.HasStatus());
2405+
UNIT_ASSERT_EQUAL(consumerDeleteResult->Record.GetStatus(), NKikimrPQ::EStatus::OK);
2406+
}
2407+
2408+
// Resend intercepted blob responses and wait for read result
2409+
captureBlobResponsesObserver.Remove();
2410+
for (auto& ev : capturedBlobResponses) {
2411+
tc.Runtime->Send(ev.Release(), 0, true);
2412+
}
2413+
2414+
readResult = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
2415+
{
2416+
//Cerr << "Got read response: " << readResult->Record << Endl;
2417+
UNIT_ASSERT(readResult->Record.HasStatus());
2418+
UNIT_ASSERT_EQUAL(readResult->Record.GetErrorCode(), NPersQueue::NErrorCode::BAD_REQUEST);
2419+
UNIT_ASSERT_STRING_CONTAINS_C(readResult->Record.GetErrorReason(), "Consumer user1 is gone from partition", readResult->Record.Utf8DebugString());
2420+
}
2421+
});
2422+
}
23292423

23302424

23312425
} // Y_UNIT_TEST_SUITE(TPQTest)

0 commit comments

Comments
 (0)