@@ -189,12 +189,14 @@ namespace NKafka::NTests {
189
189
}
190
190
191
191
void CreateMetarequestActor (
192
- const TActorId& edge, const TString& topicPath , auto * runtime, const auto & kafkaConfig, const TActorId& fakeCacheId = {}
192
+ const TActorId& edge, const TVector< TString>& topics , auto * runtime, const auto & kafkaConfig, const TActorId& fakeCacheId = {}
193
193
) {
194
194
TMetadataRequestData::TPtr metaRequest = std::make_shared<TMetadataRequestData>();
195
- metaRequest->Topics .emplace_back ();
196
- auto & topic = metaRequest->Topics [0 ];
197
- topic.Name = topicPath;
195
+ for (const auto & topicPath : topics) {
196
+ metaRequest->Topics .emplace_back ();
197
+ auto & topic = metaRequest->Topics .back ();
198
+ topic.Name = topicPath;
199
+ }
198
200
199
201
auto context = std::make_shared<TContext>(kafkaConfig);
200
202
context->ConnectionId = edge;
@@ -215,14 +217,16 @@ namespace NKafka::NTests {
215
217
runtime->EnableScheduleForActor (actorId);
216
218
}
217
219
218
- void CheckKafkaMetaResponse (TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false ) {
220
+ void CheckKafkaMetaResponse (TTestActorRuntime* runtime, ui64 kafkaPort, bool error = false , ui64 expectedCount = 1 ) {
219
221
TAutoPtr<IEventHandle> handle;
220
222
auto * ev = runtime->GrabEdgeEvent <TEvKafka::TEvResponse>(handle);
221
223
UNIT_ASSERT (ev);
222
224
auto response = dynamic_cast <TMetadataResponseData*>(ev->Response .get ());
223
- UNIT_ASSERT_VALUES_EQUAL (response->Topics .size (), 1 );
225
+ UNIT_ASSERT_VALUES_EQUAL (response->Topics .size (), expectedCount );
224
226
if (!error) {
225
- UNIT_ASSERT (response->Topics [0 ].ErrorCode == EKafkaErrors::NONE_ERROR);
227
+ for (const auto & topic : response->Topics ) {
228
+ UNIT_ASSERT (topic.ErrorCode == EKafkaErrors::NONE_ERROR);
229
+ }
226
230
} else {
227
231
UNIT_ASSERT (response->Topics [0 ].ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND);
228
232
UNIT_ASSERT (ev->ErrorCode == EKafkaErrors::LISTENER_NOT_FOUND);
@@ -239,7 +243,7 @@ namespace NKafka::NTests {
239
243
auto * runtime = server.GetRuntime ();
240
244
auto edge = runtime->AllocateEdgeActor ();
241
245
242
- CreateMetarequestActor (edge, NKikimr::JoinPath ({" /Root/PQ/" , topicName}), runtime,
246
+ CreateMetarequestActor (edge, { NKikimr::JoinPath ({" /Root/PQ/" , topicName})} , runtime,
243
247
config);
244
248
245
249
CheckKafkaMetaResponse (runtime, kafkaPort);
@@ -262,7 +266,7 @@ namespace NKafka::NTests {
262
266
ep->set_node_id (9998 );
263
267
auto fakeCache = runtime->Register (new TFakeDiscoveryCache (leResult, false ));
264
268
runtime->EnableScheduleForActor (fakeCache);
265
- CreateMetarequestActor (edge, NKikimr::JoinPath ({" /Root/PQ/" , topicName}), runtime,
269
+ CreateMetarequestActor (edge, { NKikimr::JoinPath ({" /Root/PQ/" , topicName})} , runtime,
266
270
config, fakeCache);
267
271
268
272
CheckKafkaMetaResponse (runtime, kafkaPort);
@@ -277,7 +281,7 @@ namespace NKafka::NTests {
277
281
Ydb::Discovery::ListEndpointsResult leResult;
278
282
auto fakeCache = runtime->Register (new TFakeDiscoveryCache (leResult, true ));
279
283
runtime->EnableScheduleForActor (fakeCache);
280
- CreateMetarequestActor (edge, NKikimr::JoinPath ({" /Root/PQ/" , topicName}), runtime,
284
+ CreateMetarequestActor (edge, { NKikimr::JoinPath ({" /Root/PQ/" , topicName})} , runtime,
281
285
config, fakeCache);
282
286
283
287
CheckKafkaMetaResponse (runtime, kafkaPort, true );
@@ -296,10 +300,23 @@ namespace NKafka::NTests {
296
300
ep->set_node_id (runtime->GetNodeId (0 ));
297
301
auto fakeCache = runtime->Register (new TFakeDiscoveryCache (leResult, false ));
298
302
runtime->EnableScheduleForActor (fakeCache);
299
- CreateMetarequestActor (edge, NKikimr::JoinPath ({" /Root/PQ/" , topicName}), runtime,
303
+ CreateMetarequestActor (edge, { NKikimr::JoinPath ({" /Root/PQ/" , topicName})} , runtime,
300
304
config, fakeCache);
301
305
302
306
CheckKafkaMetaResponse (runtime, 12345 );
303
307
}
308
+
309
+
310
+ Y_UNIT_TEST (MetadataActorDoubleTopic) {
311
+ auto [server, kafkaPort, config, topicName] = SetupServer (" topic1" );
312
+
313
+ auto * runtime = server.GetRuntime ();
314
+ auto edge = runtime->AllocateEdgeActor ();
315
+
316
+ auto path = NKikimr::JoinPath ({" /Root/PQ/" , topicName});
317
+ CreateMetarequestActor (edge, {path, path}, runtime, config);
318
+
319
+ CheckKafkaMetaResponse (runtime, kafkaPort, false , 2 );
320
+ }
304
321
}
305
- }
322
+ }
0 commit comments