@@ -226,4 +226,189 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
226
226
}
227
227
}
228
228
229
+ Y_UNIT_TEST (LookupWithErrors) {
230
+ auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters (), true , false );
231
+ NKikimr::NMiniKQL::TMemoryUsageInfo memUsage (" TestMemUsage" );
232
+ NKikimr::NMiniKQL::THolderFactory holderFactory (alloc->Ref (), memUsage);
233
+ NKikimr::NMiniKQL::TTypeEnvironment typeEnv (*alloc);
234
+ NKikimr::NMiniKQL::TTypeBuilder typeBuilder (typeEnv);
235
+
236
+ auto loggerConfig = NYql::NProto::TLoggingConfig ();
237
+ loggerConfig.set_allcomponentslevel (::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
238
+ NYql::NLog::InitLogger (loggerConfig, false );
239
+
240
+ TTestActorRuntimeBase runtime (1 , 1 , true );
241
+ runtime.Initialize ();
242
+ auto edge = runtime.AllocateEdgeActor ();
243
+
244
+ NYql::TGenericDataSourceInstance dsi;
245
+ dsi.Setkind (NYql::EGenericDataSourceKind::YDB);
246
+ dsi.mutable_endpoint ()->Sethost (" some_host" );
247
+ dsi.mutable_endpoint ()->Setport (2135 );
248
+ dsi.Setdatabase (" some_db" );
249
+ dsi.Setuse_tls (true );
250
+ dsi.set_protocol (::NYql::EGenericProtocol::NATIVE);
251
+ auto token = dsi.mutable_credentials ()->mutable_token ();
252
+ token->Settype (" IAM" );
253
+ token->Setvalue (" TEST_TOKEN" );
254
+
255
+ auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();
256
+
257
+ // clang-format off
258
+ // step 1: ListSplits
259
+ {
260
+ ::testing::InSequence seq;
261
+ for (grpc::StatusCode readStatus : { grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::OK }) {
262
+ for (grpc::StatusCode listStatus : { grpc::StatusCode::UNAVAILABLE, readStatus == grpc::StatusCode::OK ? grpc::StatusCode::DEADLINE_EXCEEDED : grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::OK }) {
263
+ auto listBuilder = connectorMock->ExpectListSplits ();
264
+ listBuilder
265
+ .Select ()
266
+ .DataSourceInstance (dsi)
267
+ .What ()
268
+ .Column (" id" , Ydb::Type::UINT64)
269
+ .NullableColumn (" optional_id" , Ydb::Type::UINT64)
270
+ .NullableColumn (" string_value" , Ydb::Type::STRING)
271
+ .Done ()
272
+ .Table (" lookup_test" )
273
+ .Where ()
274
+ .Filter ()
275
+ .Disjunction ()
276
+ .Operand ()
277
+ .Conjunction ()
278
+ .Operand ().Equal ().Column (" id" ).Value <ui64>(2 ).Done ().Done ()
279
+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(102 ).Done ().Done ()
280
+ .Done ()
281
+ .Done ()
282
+ .Operand ()
283
+ .Conjunction ()
284
+ .Operand ().Equal ().Column (" id" ).Value <ui64>(1 ).Done ().Done ()
285
+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(101 ).Done ().Done ()
286
+ .Done ()
287
+ .Done ()
288
+ .Operand ()
289
+ .Conjunction ()
290
+ .Operand ().Equal ().Column (" id" ).Value <ui64>(0 ).Done ().Done ()
291
+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(100 ).Done ().Done ()
292
+ .Done ()
293
+ .Done ()
294
+ .Done ()
295
+ .Done ()
296
+ .Done ()
297
+ .Done ()
298
+ .MaxSplitCount (1 )
299
+ ;
300
+ if (listStatus != grpc::StatusCode::OK) {
301
+ listBuilder
302
+ .Status (NYdbGrpc::TGrpcStatus (listStatus, " Mocked Error" ))
303
+ ;
304
+ continue ;
305
+ }
306
+ listBuilder
307
+ .Result ()
308
+ .AddResponse (NewSuccess ())
309
+ .Description (" Actual split info is not important" )
310
+ ;
311
+ }
312
+
313
+ auto readBuilder = connectorMock->ExpectReadSplits ();
314
+ readBuilder
315
+ .DataSourceInstance (dsi)
316
+ .Filtering (NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY)
317
+ .Split ()
318
+ .Description (" Actual split info is not important" )
319
+ .Done ()
320
+ ;
321
+ if (readStatus != grpc::StatusCode::OK) {
322
+ readBuilder
323
+ .Status (NYdbGrpc::TGrpcStatus (readStatus, " Mocked Error" ))
324
+ ;
325
+ continue ;
326
+ }
327
+ readBuilder
328
+ .Result ()
329
+ .AddResponse (
330
+ MakeRecordBatch (
331
+ MakeArray<arrow::UInt64Builder, uint64_t >(" id" , {0 , 1 , 2 }, arrow::uint64 ()),
332
+ MakeArray<arrow::UInt64Builder, uint64_t >(" optional_id" , {100 , 101 , 103 }, arrow::uint64 ()), // the last value is intentially wrong
333
+ MakeArray<arrow::StringBuilder, std::string>(" string_value" , {" a" , " b" , " c" }, arrow::utf8 ())
334
+ ),
335
+ NewSuccess ()
336
+ )
337
+ ;
338
+ }
339
+ }
340
+ // clang-format on
341
+
342
+ NYql::Generic::TLookupSource lookupSourceSettings;
343
+ *lookupSourceSettings.mutable_data_source_instance () = dsi;
344
+ lookupSourceSettings.Settable (" lookup_test" );
345
+ lookupSourceSettings.SetServiceAccountId (" testsaid" );
346
+ lookupSourceSettings.SetServiceAccountIdSignature (" fake_signature" );
347
+
348
+ google::protobuf::Any packedLookupSource;
349
+ Y_ABORT_UNLESS (packedLookupSource.PackFrom (lookupSourceSettings));
350
+
351
+ NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
352
+ keyTypeBuilder.Add (" id" , typeBuilder.NewDataType (NYql::NUdf::EDataSlot::Uint64, false ));
353
+ keyTypeBuilder.Add (" optional_id" , typeBuilder.NewDataType (NYql::NUdf::EDataSlot::Uint64, true ));
354
+ NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
355
+ outputypeBuilder.Add (" string_value" , typeBuilder.NewDataType (NYql::NUdf::EDataSlot::String, true ));
356
+
357
+ auto guard = Guard (*alloc.get ());
358
+ auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build ());
359
+
360
+ auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor (
361
+ connectorMock,
362
+ std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
363
+ edge,
364
+ nullptr ,
365
+ alloc,
366
+ keyTypeHelper,
367
+ std::move (lookupSourceSettings),
368
+ keyTypeBuilder.Build (),
369
+ outputypeBuilder.Build (),
370
+ typeEnv,
371
+ holderFactory,
372
+ 1'000'000 );
373
+ auto lookupActor = runtime.Register (actor);
374
+
375
+ auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3 , keyTypeHelper->GetValueHash (), keyTypeHelper->GetValueEqual ());
376
+ for (size_t i = 0 ; i != 3 ; ++i) {
377
+ NYql::NUdf::TUnboxedValue* keyItems;
378
+ auto key = holderFactory.CreateDirectArrayHolder (2 , keyItems);
379
+ keyItems[0 ] = NYql::NUdf::TUnboxedValuePod (ui64 (i));
380
+ keyItems[1 ] = NYql::NUdf::TUnboxedValuePod (ui64 (100 + i));
381
+ request->emplace (std::move (key), NYql::NUdf::TUnboxedValue{});
382
+ }
383
+
384
+ guard.Release (); // let actors use alloc
385
+
386
+ auto callLookupActor = new TCallLookupActor (alloc, lookupActor, request);
387
+ runtime.Register (callLookupActor);
388
+
389
+ auto ev = runtime.GrabEdgeEventRethrow <NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
390
+ auto guard2 = Guard (*alloc.get ());
391
+ auto lookupResult = ev->Get ()->Result .lock ();
392
+ UNIT_ASSERT (lookupResult);
393
+
394
+ UNIT_ASSERT_EQUAL (3 , lookupResult->size ());
395
+ {
396
+ const auto * v = lookupResult->FindPtr (CreateStructValue (holderFactory, {0 , 100 }));
397
+ UNIT_ASSERT (v);
398
+ NYql::NUdf::TUnboxedValue val = v->GetElement (0 );
399
+ UNIT_ASSERT (val.AsStringRef () == TStringBuf (" a" ));
400
+ }
401
+ {
402
+ const auto * v = lookupResult->FindPtr (CreateStructValue (holderFactory, {1 , 101 }));
403
+ UNIT_ASSERT (v);
404
+ NYql::NUdf::TUnboxedValue val = v->GetElement (0 );
405
+ UNIT_ASSERT (val.AsStringRef () == TStringBuf (" b" ));
406
+ }
407
+ {
408
+ const auto * v = lookupResult->FindPtr (CreateStructValue (holderFactory, {2 , 102 }));
409
+ UNIT_ASSERT (v);
410
+ UNIT_ASSERT (!*v);
411
+ }
412
+ }
413
+
229
414
} // Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
0 commit comments