Skip to content

Commit 6ee6db6

Browse files
List all topics in kafka metaRequest (#14110)
1 parent 97e1be1 commit 6ee6db6

File tree

10 files changed

+396
-145
lines changed

10 files changed

+396
-145
lines changed

ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/services/persqueue_v1/actors/schema_actors.h>
55
#include <ydb/core/grpc_services/grpc_endpoint.h>
66
#include <ydb/core/base/statestorage.h>
7+
#include <ydb/core/persqueue/list_all_topics_actor.h>
78

89
namespace NKafka {
910
using namespace NKikimr;
@@ -32,12 +33,16 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
3233
SendDiscoveryRequest();
3334

3435
if (Message->Topics.size() == 0) {
35-
NeedCurrentNode = true;
36+
ctx.Register(NKikimr::NPersQueue::MakeListAllTopicsActor(
37+
SelfId(), Context->DatabasePath, GetUserSerializedToken(Context), true, {}, {}));
38+
39+
PendingResponses++;
40+
NeedAllNodes = true;
3641
}
3742
}
3843

3944
if (Message->Topics.size() != 0) {
40-
ProcessTopics();
45+
ProcessTopicsFromRequest();
4146
}
4247

4348
Become(&TKafkaMetadataActor::StateWork);
@@ -111,39 +116,54 @@ void TKafkaMetadataActor::HandleNodesResponse(
111116
RespondIfRequired(ctx);
112117
}
113118

114-
void TKafkaMetadataActor::AddProxyNodeToBrokers() {
115-
AddBroker(ProxyNodeId, Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort());
116-
}
117-
118-
void TKafkaMetadataActor::ProcessTopics() {
119-
THashMap<TString, TActorId> partitionActors;
119+
void TKafkaMetadataActor::ProcessTopicsFromRequest() {
120+
TVector<TString> topicsToRequest;
120121
for (size_t i = 0; i < Message->Topics.size(); ++i) {
121-
Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{};
122122
auto& reqTopic = Message->Topics[i];
123-
Response->Topics[i].Name = reqTopic.Name.value_or("");
124-
125123
if (!reqTopic.Name.value_or("")) {
126124
AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION);
127125
continue;
128126
}
129-
const auto& topicName = reqTopic.Name.value();
130-
TActorId child;
131-
auto namesIter = partitionActors.find(topicName);
132-
if (namesIter.IsEnd()) {
133-
child = SendTopicRequest(reqTopic);
134-
partitionActors[topicName] = child;
135-
} else {
136-
child = namesIter->second;
137-
}
138-
TopicIndexes[child].push_back(i);
127+
AddTopic(reqTopic.Name.value_or(""), i);
139128
}
140129
}
141130

142-
TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) {
143-
KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << GetUsernameOrAnonymous(Context) << "'");
131+
void TKafkaMetadataActor::HandleListTopics(NKikimr::TEvPQ::TEvListAllTopicsResponse::TPtr& ev) {
132+
Y_ABORT_UNLESS(PendingResponses > 0);
133+
PendingResponses--;
134+
auto topics = std::move(ev->Get()->Topics);
135+
Response->Topics.resize(topics.size());
136+
for (size_t i = 0; i < topics.size(); ++i) {
137+
AddTopic(topics[i], i);
138+
}
139+
RespondIfRequired(ActorContext());
140+
}
141+
142+
void TKafkaMetadataActor::AddProxyNodeToBrokers() {
143+
AddBroker(ProxyNodeId, Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort());
144+
}
145+
146+
147+
void TKafkaMetadataActor::AddTopic(const TString& topic, ui64 index) {
148+
Response->Topics[index] = TMetadataResponseData::TMetadataResponseTopic{};
149+
Response->Topics[index].Name = topic;
150+
151+
TActorId child;
152+
auto namesIter = PartitionActors.find(topic);
153+
if (namesIter.IsEnd()) {
154+
child = SendTopicRequest(topic);
155+
PartitionActors[topic] = child;
156+
} else {
157+
child = namesIter->second;
158+
}
159+
TopicIndexes[child].push_back(index);
160+
}
161+
162+
TActorId TKafkaMetadataActor::SendTopicRequest(const TString& topic) {
163+
KAFKA_LOG_D("Describe partitions locations for topic '" << topic << "' for user '" << GetUsernameOrAnonymous(Context) << "'");
144164

145165
TGetPartitionsLocationRequest locationRequest{};
146-
locationRequest.Topic = NormalizePath(Context->DatabasePath, topicRequest.Name.value());
166+
locationRequest.Topic = NormalizePath(Context->DatabasePath, topic);
147167
locationRequest.Token = GetUserSerializedToken(Context);
148168
locationRequest.Database = Context->DatabasePath;
149169

@@ -193,7 +213,7 @@ void TKafkaMetadataActor::AddTopicResponse(
193213

194214
topic.Partitions.emplace_back(std::move(responsePartition));
195215

196-
if (!WithProxy) {
216+
if (!WithProxy && !NeedAllNodes) {
197217
auto ins = AllClusterNodes.insert(part.NodeId);
198218
if (ins.second) {
199219
auto hostname = (*nodeIter)->Host;
@@ -265,16 +285,6 @@ void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
265285
return;
266286
}
267287

268-
if (NeedCurrentNode) {
269-
auto nodeIter = Nodes.find(SelfId().NodeId());
270-
if (nodeIter.IsEnd()) {
271-
// Node info was not found, request from IC nodes cache instead
272-
RequestICNodeCache();
273-
return;
274-
}
275-
AddBroker(nodeIter->first, nodeIter->second.Host, nodeIter->second.Port);
276-
NeedCurrentNode = false;
277-
}
278288
while (!PendingTopicResponses.empty()) {
279289
auto& [index, ev] = *PendingTopicResponses.begin();
280290
auto& topic = Response->Topics[index];
@@ -295,6 +305,11 @@ void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
295305
PendingTopicResponses.erase(PendingTopicResponses.begin());
296306
}
297307

308+
if (NeedAllNodes) {
309+
for (const auto& [id, nodeInfo] : Nodes)
310+
AddBroker(id, nodeInfo.Host, nodeInfo.Port);
311+
}
312+
298313
Respond();
299314
}
300315

ydb/core/kafka_proxy/actors/kafka_metadata_actor.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/services/persqueue_v1/actors/schema_actors.h>
88
#include <ydb/core/discovery/discovery.h>
99
#include <ydb/core/kafka_proxy/kafka_listener.h>
10+
#include <ydb/core/persqueue/events/internal.h>
1011

1112

1213
namespace NKafka {
@@ -33,12 +34,13 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
3334
ui32 Port;
3435
};
3536

36-
TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
37+
TActorId SendTopicRequest(const TString& topic);
3738
void HandleLocationResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
3839
void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev,
3940
const NActors::TActorContext& ctx);
4041
void HandleDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev);
4142
void HandleDiscoveryError(NKikimr::TEvDiscovery::TEvError::TPtr& ev);
43+
void HandleListTopics(NKikimr::TEvPQ::TEvListAllTopicsResponse::TPtr& ev);
4244

4345
void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response,
4446
const TVector<TNodeInfo*>& nodes);
@@ -47,17 +49,20 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
4749
void AddProxyNodeToBrokers();
4850
void AddBroker(ui64 nodeId, const TString& host, ui64 port);
4951
void RequestICNodeCache();
50-
void ProcessTopics();
52+
void ProcessTopicsFromRequest();
5153
void SendDiscoveryRequest();
5254
void ProcessDiscoveryData(NKikimr::TEvDiscovery::TEvDiscoveryData::TPtr& ev);
5355
TVector<TNodeInfo*> CheckTopicNodes(TEvLocationResponse* response);
5456

57+
void AddTopic(const TString& topic, ui64 index);
58+
5559
STATEFN(StateWork) {
5660
switch (ev->GetTypeRewrite()) {
5761
HFunc(TEvLocationResponse, HandleLocationResponse);
5862
HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse);
5963
hFunc(NKikimr::TEvDiscovery::TEvDiscoveryData, HandleDiscoveryData);
6064
hFunc(NKikimr::TEvDiscovery::TEvError, HandleDiscoveryError);
65+
hFunc(NKikimr::TEvPQ::TEvListAllTopicsResponse, HandleListTopics);
6166
}
6267
}
6368

@@ -77,12 +82,14 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
7782
EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR;
7883

7984
TActorId DiscoveryCacheActor;
80-
bool NeedCurrentNode = false;
85+
bool NeedAllNodes = false;
8186
bool HaveError = false;
8287
bool FallbackToIcDiscovery = false;
8388
TMap<ui64, TAutoPtr<TEvLocationResponse>> PendingTopicResponses;
8489

8590
THashMap<ui64, TNodeInfo> Nodes;
91+
THashMap<TString, TActorId> PartitionActors;
92+
8693
};
8794

8895
} // namespace NKafka

ydb/core/kafka_proxy/ut/metarequest_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) {
9393

9494
event = GetEvent(server, edgeId, {});
9595
response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
96-
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 0);
96+
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 2);
9797

9898
event = GetEvent(server, edgeId, {topicPath}, "proxy-host");
9999
response = dynamic_cast<TMetadataResponseData*>(event->Response.get());

ydb/core/persqueue/events/internal.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ struct TEvPQ {
186186
EvGetWriteInfoRequest,
187187
EvGetWriteInfoResponse,
188188
EvGetWriteInfoError,
189-
EvTxBatchComplete,
189+
EvTxBatchComplete,
190190
EvReadingPartitionStatusRequest,
191191
EvProcessChangeOwnerRequests,
192192
EvWakeupReleasePartition,
@@ -196,6 +196,7 @@ struct TEvPQ {
196196
EvDeletePartition,
197197
EvDeletePartitionDone,
198198
EvTransactionCompleted,
199+
EvListAllTopicsResponse,
199200
EvEnd
200201
};
201202

@@ -1177,6 +1178,16 @@ struct TEvPQ {
11771178

11781179
TMaybe<NPQ::TWriteId> WriteId;
11791180
};
1181+
1182+
struct TEvListAllTopicsResponse : TEventLocal<TEvListAllTopicsResponse, EvListAllTopicsResponse> {
1183+
explicit TEvListAllTopicsResponse() = default;
1184+
explicit TEvListAllTopicsResponse(Ydb::StatusIds status, const TString& error);
1185+
1186+
TVector<TString> Topics;
1187+
bool HaveMoreTopics = false;
1188+
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::SUCCESS;
1189+
TString Error;
1190+
};
11801191
};
11811192

11821193
} //NKikimr

0 commit comments

Comments
 (0)