Skip to content

Commit 963d227

Browse files
Merge of important bugfixes for PQ read proxy & kafka proxy (#15515)
Co-authored-by: Alek5andr-Kotov <akotov@ydb.tech>
1 parent 28066d8 commit 963d227

File tree

8 files changed

+69
-28
lines changed

8 files changed

+69
-28
lines changed

ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,8 @@ void TKafkaMetadataActor::AddTopicResponse(
230230
void TKafkaMetadataActor::HandleLocationResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) {
231231
--PendingResponses;
232232

233-
auto* r = ev->Get();
234233
auto actorIter = TopicIndexes.find(ev->Sender);
234+
TSimpleSharedPtr<TEvLocationResponse> locationResponse{ev->Release()};
235235

236236
Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd());
237237
Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty());
@@ -248,12 +248,12 @@ void TKafkaMetadataActor::HandleLocationResponse(TEvLocationResponse::TPtr ev, c
248248

249249
for (auto index : actorIter->second) {
250250
auto& topic = Response->Topics[index];
251-
if (r->Status == Ydb::StatusIds::SUCCESS) {
251+
if (locationResponse->Status == Ydb::StatusIds::SUCCESS) {
252252
KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful");
253-
PendingTopicResponses.insert(std::make_pair(index, ev->Release()));
253+
PendingTopicResponses.emplace(index, locationResponse);
254254
} else {
255-
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString());
256-
AddTopicError(topic, ConvertErrorCode(r->Status));
255+
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << locationResponse->Status << ", Issues=" << locationResponse->Issues.ToOneLineString());
256+
AddTopicError(topic, ConvertErrorCode(locationResponse->Status));
257257
}
258258
}
259259
RespondIfRequired(ctx);
@@ -307,7 +307,7 @@ void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
307307

308308
if (NeedAllNodes) {
309309
for (const auto& [id, nodeInfo] : Nodes)
310-
AddBroker(id, nodeInfo.Host, nodeInfo.Port);
310+
AddBroker(id, nodeInfo.Host, nodeInfo.Port);
311311
}
312312

313313
Respond();

ydb/core/kafka_proxy/actors/kafka_metadata_actor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,11 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
8585
bool NeedAllNodes = false;
8686
bool HaveError = false;
8787
bool FallbackToIcDiscovery = false;
88-
TMap<ui64, TAutoPtr<TEvLocationResponse>> PendingTopicResponses;
88+
TMap<ui64, TSimpleSharedPtr<TEvLocationResponse>> PendingTopicResponses;
8989

9090
THashMap<ui64, TNodeInfo> Nodes;
9191
THashMap<TString, TActorId> PartitionActors;
92+
THashSet<ui64> HaveBrokers;
9293

9394
};
9495

ydb/core/kafka_proxy/ut/port_discovery_ut.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,14 @@ namespace NKafka::NTests {
189189
}
190190

191191
void CreateMetarequestActor(
192-
const TActorId& edge, const TString& topicPath, auto* runtime, const auto& kafkaConfig, const TActorId& fakeCacheId = {}
192+
const TActorId& edge, const TVector<TString>& topics, auto* runtime, const auto& kafkaConfig, const TActorId& fakeCacheId = {}
193193
) {
194194
TMetadataRequestData::TPtr metaRequest = std::make_shared<TMetadataRequestData>();
195-
metaRequest->Topics.emplace_back();
196-
auto& topic = metaRequest->Topics[0];
197-
topic.Name = topicPath;
195+
for (const auto& topicPath : topics) {
196+
metaRequest->Topics.emplace_back();
197+
auto& topic = metaRequest->Topics.back();
198+
topic.Name = topicPath;
199+
}
198200

199201
auto context = std::make_shared<TContext>(kafkaConfig);
200202
context->ConnectionId = edge;
@@ -215,14 +217,16 @@ namespace NKafka::NTests {
215217
runtime->EnableScheduleForActor(actorId);
216218
}
217219

218-
void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false) {
220+
void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false, ui64 expectedCount = 1) {
219221
TAutoPtr<IEventHandle> handle;
220222
auto* ev = runtime->GrabEdgeEvent<TEvKafka::TEvResponse>(handle);
221223
UNIT_ASSERT(ev);
222224
auto response = dynamic_cast<TMetadataResponseData*>(ev->Response.get());
223-
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1);
225+
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), expectedCount);
224226
if (!error) {
225-
UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR);
227+
for (const auto& topic : response->Topics) {
228+
UNIT_ASSERT(topic.ErrorCode == EKafkaErrors::NONE_ERROR);
229+
}
226230
} else {
227231
UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND);
228232
UNIT_ASSERT(ev->ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND);
@@ -239,7 +243,7 @@ namespace NKafka::NTests {
239243
auto* runtime = server.GetRuntime();
240244
auto edge = runtime->AllocateEdgeActor();
241245

242-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
246+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
243247
config);
244248

245249
CheckKafkaMetaResponse(runtime, kafkaPort);
@@ -262,7 +266,7 @@ namespace NKafka::NTests {
262266
ep->set_node_id(9998);
263267
auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
264268
runtime->EnableScheduleForActor(fakeCache);
265-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
269+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
266270
config, fakeCache);
267271

268272
CheckKafkaMetaResponse(runtime, kafkaPort);
@@ -277,7 +281,7 @@ namespace NKafka::NTests {
277281
Ydb::Discovery::ListEndpointsResult leResult;
278282
auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, true));
279283
runtime->EnableScheduleForActor(fakeCache);
280-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
284+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
281285
config, fakeCache);
282286

283287
CheckKafkaMetaResponse(runtime, kafkaPort, true);
@@ -296,10 +300,23 @@ namespace NKafka::NTests {
296300
ep->set_node_id(runtime->GetNodeId(0));
297301
auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
298302
runtime->EnableScheduleForActor(fakeCache);
299-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
303+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
300304
config, fakeCache);
301305

302306
CheckKafkaMetaResponse(runtime, 12345);
303307
}
308+
309+
310+
Y_UNIT_TEST(MetadataActorDoubleTopic) {
311+
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");
312+
313+
auto* runtime = server.GetRuntime();
314+
auto edge = runtime->AllocateEdgeActor();
315+
316+
auto path = NKikimr::JoinPath({"/Root/PQ/", topicName});
317+
CreateMetarequestActor(edge, {path, path}, runtime, config);
318+
319+
CheckKafkaMetaResponse(runtime, kafkaPort, false, 2);
320+
}
304321
}
305-
}
322+
}

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2963,6 +2963,7 @@ TPartition::EProcessResult TPartition::PreProcessUserAct(
29632963
void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
29642964
const bool strictCommitOffset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.Strict);
29652965
const TString& user = act.ClientId;
2966+
RemoveUserAct(user);
29662967
const auto& ctx = ActorContext();
29672968
if (!PendingUsersInfo.contains(user) && AffectedUsers.contains(user)) {
29682969
switch (act.Type) {
@@ -3082,7 +3083,6 @@ void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
30823083
return;
30833084
}
30843085

3085-
RemoveUserAct(act.ClientId);
30863086
return EmulatePostProcessUserAct(act, userInfo, ActorContext());
30873087
}
30883088

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace NKikimr::NPQ {
2929

3030
static const ui32 MAX_BLOB_PART_SIZE = 500_KB;
3131
static const ui32 DEFAULT_BUCKET_COUNTER_MULTIPLIER = 20;
32+
static const ui32 MAX_USER_ACTS = 1000;
3233

3334
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
3435

ydb/core/persqueue/partition_read.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
namespace NKikimr::NPQ {
3030

31-
static const ui32 MAX_USER_ACTS = 1000;
32-
3331
TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
3432
if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) {
3533
return {};

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
225225
void SendSetOffset(ui64 cookie,
226226
const TString& clientId,
227227
ui64 offset,
228-
const TString& sessionId);
228+
const TString& sessionId,
229+
bool strict = false);
229230
void SendGetOffset(ui64 cookie,
230231
const TString& clientId);
231232
void WaitCmdWrite(const TCmdWriteMatcher& matcher = {});
@@ -488,7 +489,8 @@ void TPartitionFixture::SendCreateSession(ui64 cookie,
488489
void TPartitionFixture::SendSetOffset(ui64 cookie,
489490
const TString& clientId,
490491
ui64 offset,
491-
const TString& sessionId)
492+
const TString& sessionId,
493+
bool strict)
492494
{
493495
auto event = MakeHolder<TEvPQ::TEvSetClientInfo>(cookie,
494496
clientId,
@@ -498,6 +500,7 @@ void TPartitionFixture::SendSetOffset(ui64 cookie,
498500
0,
499501
0,
500502
TActorId{});
503+
event->Strict = strict;
501504
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
502505
}
503506

@@ -712,7 +715,6 @@ void TPartitionFixture::WaitErrorResponse(const TErrorMatcher& matcher)
712715
}
713716

714717
if (matcher.Error) {
715-
716718
UNIT_ASSERT_VALUES_EQUAL(*matcher.Error, event->Error);
717719
}
718720
}
@@ -1796,6 +1798,28 @@ void TPartitionTxTestHelper::WaitTxPredicateReplyImpl(ui64 userActId, bool statu
17961798
#endif
17971799
}
17981800

1801+
Y_UNIT_TEST_F(UserActCount, TPartitionFixture)
1802+
{
1803+
// In the test, we check that the reference count for `UserInfo` decreases in case of errors. To do this,
1804+
// we send a large number of requests to which the server will respond with an error.
1805+
1806+
CreatePartition();
1807+
1808+
SendCreateSession(1, "client", "session-id", 2, 3);
1809+
WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-id", .Offset=0, .Generation=2, .Step=3}}}});
1810+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
1811+
WaitProxyResponse({.Cookie=1});
1812+
1813+
for (ui64 k = 0; k <= MAX_USER_ACTS; ++k) {
1814+
const ui64 cookie = 2 + k;
1815+
// 1 > EndOffset
1816+
SendSetOffset(cookie, "client", 1, "session-id", true); // strict = true
1817+
WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-id", .Offset=0, .Generation=2, .Step=3}}}});
1818+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
1819+
WaitErrorResponse({.Cookie=cookie, .ErrorCode=NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE});
1820+
}
1821+
}
1822+
17991823
Y_UNIT_TEST_F(Batching, TPartitionFixture)
18001824
{
18011825
CreatePartition();

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,12 +1305,12 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
13051305
const auto req = ev->Get();
13061306

13071307
auto request = MakeReadRequest(ReadOffset, 0, req->MaxCount, req->MaxSize, req->MaxTimeLagMs, req->ReadTimestampMs, DirectReadId);
1308-
1308+
RequestInfly = true;
1309+
CurrentRequest = request;
1310+
13091311
if (!PipeClient) //Pipe will be recreated soon
13101312
return;
13111313

1312-
RequestInfly = true;
1313-
CurrentRequest = request;
13141314
TAutoPtr<TEvPersQueue::TEvRequest> event(new TEvPersQueue::TEvRequest);
13151315
event->Record.Swap(&request);
13161316

0 commit comments

Comments
 (0)