Skip to content

Commit 0789c57

Browse files
Merge 465ceae into 5a0f4ed
2 parents 5a0f4ed + 465ceae commit 0789c57

File tree

7 files changed

+66
-25
lines changed

7 files changed

+66
-25
lines changed

ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,8 @@ void TKafkaMetadataActor::AddTopicResponse(
230230
void TKafkaMetadataActor::HandleLocationResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx) {
231231
--PendingResponses;
232232

233-
auto* r = ev->Get();
234233
auto actorIter = TopicIndexes.find(ev->Sender);
234+
TSimpleSharedPtr<TEvLocationResponse> locationResponse{ev->Release()};
235235

236236
Y_DEBUG_ABORT_UNLESS(!actorIter.IsEnd());
237237
Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty());
@@ -248,12 +248,12 @@ void TKafkaMetadataActor::HandleLocationResponse(TEvLocationResponse::TPtr ev, c
248248

249249
for (auto index : actorIter->second) {
250250
auto& topic = Response->Topics[index];
251-
if (r->Status == Ydb::StatusIds::SUCCESS) {
251+
if (locationResponse->Status == Ydb::StatusIds::SUCCESS) {
252252
KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful");
253-
PendingTopicResponses.insert(std::make_pair(index, ev->Release()));
253+
PendingTopicResponses.emplace(index, locationResponse);
254254
} else {
255-
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString());
256-
AddTopicError(topic, ConvertErrorCode(r->Status));
255+
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << locationResponse->Status << ", Issues=" << locationResponse->Issues.ToOneLineString());
256+
AddTopicError(topic, ConvertErrorCode(locationResponse->Status));
257257
}
258258
}
259259
RespondIfRequired(ctx);
@@ -307,7 +307,7 @@ void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
307307

308308
if (NeedAllNodes) {
309309
for (const auto& [id, nodeInfo] : Nodes)
310-
AddBroker(id, nodeInfo.Host, nodeInfo.Port);
310+
AddBroker(id, nodeInfo.Host, nodeInfo.Port);
311311
}
312312

313313
Respond();

ydb/core/kafka_proxy/actors/kafka_metadata_actor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,11 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
8585
bool NeedAllNodes = false;
8686
bool HaveError = false;
8787
bool FallbackToIcDiscovery = false;
88-
TMap<ui64, TAutoPtr<TEvLocationResponse>> PendingTopicResponses;
88+
TMap<ui64, TSimpleSharedPtr<TEvLocationResponse>> PendingTopicResponses;
8989

9090
THashMap<ui64, TNodeInfo> Nodes;
9191
THashMap<TString, TActorId> PartitionActors;
92+
THashSet<ui64> HaveBrokers;
9293

9394
};
9495

ydb/core/kafka_proxy/ut/port_discovery_ut.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,14 @@ namespace NKafka::NTests {
189189
}
190190

191191
void CreateMetarequestActor(
192-
const TActorId& edge, const TString& topicPath, auto* runtime, const auto& kafkaConfig, const TActorId& fakeCacheId = {}
192+
const TActorId& edge, const TVector<TString>& topics, auto* runtime, const auto& kafkaConfig, const TActorId& fakeCacheId = {}
193193
) {
194194
TMetadataRequestData::TPtr metaRequest = std::make_shared<TMetadataRequestData>();
195-
metaRequest->Topics.emplace_back();
196-
auto& topic = metaRequest->Topics[0];
197-
topic.Name = topicPath;
195+
for (const auto& topicPath : topics) {
196+
metaRequest->Topics.emplace_back();
197+
auto& topic = metaRequest->Topics.back();
198+
topic.Name = topicPath;
199+
}
198200

199201
auto context = std::make_shared<TContext>(kafkaConfig);
200202
context->ConnectionId = edge;
@@ -215,14 +217,16 @@ namespace NKafka::NTests {
215217
runtime->EnableScheduleForActor(actorId);
216218
}
217219

218-
void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false) {
220+
void CheckKafkaMetaResponse(TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false, ui64 expectedCount = 1) {
219221
TAutoPtr<IEventHandle> handle;
220222
auto* ev = runtime->GrabEdgeEvent<TEvKafka::TEvResponse>(handle);
221223
UNIT_ASSERT(ev);
222224
auto response = dynamic_cast<TMetadataResponseData*>(ev->Response.get());
223-
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1);
225+
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), expectedCount);
224226
if (!error) {
225-
UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::NONE_ERROR);
227+
for (const auto& topic : response->Topics) {
228+
UNIT_ASSERT(topic.ErrorCode == EKafkaErrors::NONE_ERROR);
229+
}
226230
} else {
227231
UNIT_ASSERT(response->Topics[0].ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND);
228232
UNIT_ASSERT(ev->ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND);
@@ -239,7 +243,7 @@ namespace NKafka::NTests {
239243
auto* runtime = server.GetRuntime();
240244
auto edge = runtime->AllocateEdgeActor();
241245

242-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
246+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
243247
config);
244248

245249
CheckKafkaMetaResponse(runtime, kafkaPort);
@@ -262,7 +266,7 @@ namespace NKafka::NTests {
262266
ep->set_node_id(9998);
263267
auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
264268
runtime->EnableScheduleForActor(fakeCache);
265-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
269+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
266270
config, fakeCache);
267271

268272
CheckKafkaMetaResponse(runtime, kafkaPort);
@@ -277,7 +281,7 @@ namespace NKafka::NTests {
277281
Ydb::Discovery::ListEndpointsResult leResult;
278282
auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, true));
279283
runtime->EnableScheduleForActor(fakeCache);
280-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
284+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
281285
config, fakeCache);
282286

283287
CheckKafkaMetaResponse(runtime, kafkaPort, true);
@@ -296,10 +300,23 @@ namespace NKafka::NTests {
296300
ep->set_node_id(runtime->GetNodeId(0));
297301
auto fakeCache = runtime->Register(new TFakeDiscoveryCache(leResult, false));
298302
runtime->EnableScheduleForActor(fakeCache);
299-
CreateMetarequestActor(edge, NKikimr::JoinPath({"/Root/PQ/", topicName}), runtime,
303+
CreateMetarequestActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime,
300304
config, fakeCache);
301305

302306
CheckKafkaMetaResponse(runtime, 12345);
303307
}
308+
309+
310+
Y_UNIT_TEST(MetadataActorDoubleTopic) {
311+
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");
312+
313+
auto* runtime = server.GetRuntime();
314+
auto edge = runtime->AllocateEdgeActor();
315+
316+
auto path = NKikimr::JoinPath({"/Root/PQ/", topicName});
317+
CreateMetarequestActor(edge, {path, path}, runtime, config);
318+
319+
CheckKafkaMetaResponse(runtime, kafkaPort, false, 2);
320+
}
304321
}
305-
}
322+
}

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2890,6 +2890,7 @@ TPartition::EProcessResult TPartition::PreProcessUserAct(
28902890
void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
28912891
const bool strictCommitOffset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.Strict);
28922892
const TString& user = act.ClientId;
2893+
RemoveUserAct(user);
28932894
const auto& ctx = ActorContext();
28942895
if (!PendingUsersInfo.contains(user) && AffectedUsers.contains(user)) {
28952896
switch (act.Type) {
@@ -3016,7 +3017,6 @@ void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
30163017
return;
30173018
}
30183019

3019-
RemoveUserAct(act.ClientId);
30203020
return EmulatePostProcessUserAct(act, userInfo, ActorContext());
30213021
}
30223022

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace NKikimr::NPQ {
2929

3030
static const ui32 MAX_BLOB_PART_SIZE = 500_KB;
3131
static const ui32 DEFAULT_BUCKET_COUNTER_MULTIPLIER = 20;
32+
static const ui32 MAX_USER_ACTS = 1000;
3233

3334
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
3435

ydb/core/persqueue/partition_read.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727

2828
namespace NKikimr::NPQ {
2929

30-
static const ui32 MAX_USER_ACTS = 1000;
31-
3230
TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
3331
if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) {
3432
return {};

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
225225
void SendSetOffset(ui64 cookie,
226226
const TString& clientId,
227227
ui64 offset,
228-
const TString& sessionId);
228+
const TString& sessionId,
229+
bool strict = false);
229230
void SendGetOffset(ui64 cookie,
230231
const TString& clientId);
231232
void WaitCmdWrite(const TCmdWriteMatcher& matcher = {});
@@ -486,7 +487,8 @@ void TPartitionFixture::SendCreateSession(ui64 cookie,
486487
void TPartitionFixture::SendSetOffset(ui64 cookie,
487488
const TString& clientId,
488489
ui64 offset,
489-
const TString& sessionId)
490+
const TString& sessionId,
491+
bool strict)
490492
{
491493
auto event = MakeHolder<TEvPQ::TEvSetClientInfo>(cookie,
492494
clientId,
@@ -496,6 +498,7 @@ void TPartitionFixture::SendSetOffset(ui64 cookie,
496498
0,
497499
0,
498500
TActorId{});
501+
event->Strict = strict;
499502
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
500503
}
501504

@@ -710,7 +713,6 @@ void TPartitionFixture::WaitErrorResponse(const TErrorMatcher& matcher)
710713
}
711714

712715
if (matcher.Error) {
713-
714716
UNIT_ASSERT_VALUES_EQUAL(*matcher.Error, event->Error);
715717
}
716718
}
@@ -1697,6 +1699,28 @@ void TPartitionTxTestHelper::WaitTxPredicateReplyImpl(ui64 userActId, bool statu
16971699
UNIT_ASSERT_VALUES_EQUAL(event->Predicate, status);
16981700
}
16991701

1702+
Y_UNIT_TEST_F(UserActCount, TPartitionFixture)
1703+
{
1704+
// In the test, we check that the reference count for `UserInfo` decreases in case of errors. To do this,
1705+
// we send a large number of requests to which the server will respond with an error.
1706+
1707+
CreatePartition();
1708+
1709+
SendCreateSession(1, "client", "session-id", 2, 3);
1710+
WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-id", .Offset=0, .Generation=2, .Step=3}}}});
1711+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
1712+
WaitProxyResponse({.Cookie=1});
1713+
1714+
for (ui64 k = 0; k <= MAX_USER_ACTS; ++k) {
1715+
const ui64 cookie = 2 + k;
1716+
// 1 > EndOffset
1717+
SendSetOffset(cookie, "client", 1, "session-id", true); // strict = true
1718+
WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-id", .Offset=0, .Generation=2, .Step=3}}}});
1719+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
1720+
WaitErrorResponse({.Cookie=cookie, .ErrorCode=NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE});
1721+
}
1722+
}
1723+
17001724
Y_UNIT_TEST_F(Batching, TPartitionFixture)
17011725
{
17021726
CreatePartition();

0 commit comments

Comments
 (0)