1+ #include < ydb/library/yql/minikql/mkql_alloc.h>
2+ #include < ydb/library/yql/minikql/mkql_node.h>
3+ #include < ydb/library/yql/minikql/mkql_node_builder.h>
4+ #include < ydb/library/yql/public/udf/udf_value.h>
5+ #include < ydb/library/yql/minikql/mkql_type_builder.h>
6+
7+ #include < ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>
8+
9+ #include < ydb/library/actors/testlib/test_runtime.h>
10+ #include < ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h>
11+ #include < ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h>
12+ #include < ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>
13+ #include < library/cpp/testing/unittest/registar.h>
14+
15+ #include < ydb/library/yql/utils/log/proto/logger_config.pb.h>
16+ #include < ydb/library/yql/utils/log/log.h>
17+
18+ using namespace NYql ::NConnector;
19+ using namespace NYql ::NConnector::NTest;
20+ using namespace NYql ;
21+ using namespace NActors ;
22+
23+ Y_UNIT_TEST_SUITE (GenericProviderLookupActor) {
24+
25+ // Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
26+ class TCallLookupActor : public TActorBootstrapped <TCallLookupActor> {
27+ public:
28+ TCallLookupActor (
29+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
30+ NYql::NDq::IDqAsyncLookupSource* lookupSource,
31+ NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
32+ : Alloc(alloc)
33+ , LookupSource(lookupSource)
34+ , KeysToLookUp(std::move(keysToLookUp))
35+ {
36+ }
37+
38+ void Bootstrap () {
39+ LookupSource->AsyncLookup (std::move (KeysToLookUp));
40+ auto guard = Guard (*Alloc);
41+ KeysToLookUp.clear ();
42+ KeysToLookUp.shrink_to_fit ();
43+ }
44+
45+ private:
46+ static constexpr char ActorName[] = " TEST" ;
47+
48+ private:
49+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
50+ NYql::NDq::IDqAsyncLookupSource* LookupSource;
51+ NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
52+ };
53+
54+ Y_UNIT_TEST (Lookup) {
55+ auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters (), true , false );
56+ NKikimr::NMiniKQL::TMemoryUsageInfo memUsage (" TestMemUsage" );
57+ NKikimr::NMiniKQL::THolderFactory holderFactory (alloc->Ref (), memUsage);
58+ NKikimr::NMiniKQL::TTypeEnvironment typeEnv (*alloc);
59+ NKikimr::NMiniKQL::TTypeBuilder typeBuilder (typeEnv);
60+
61+ auto loggerConfig = NYql::NProto::TLoggingConfig ();
62+ loggerConfig.set_allcomponentslevel (::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
63+ NYql::NLog::InitLogger (loggerConfig, false );
64+
65+ TTestActorRuntimeBase runtime;
66+ runtime.Initialize ();
67+ auto edge = runtime.AllocateEdgeActor ();
68+
69+ NYql::NConnector::NApi::TDataSourceInstance dsi;
70+ dsi.Setkind (NYql::NConnector::NApi::EDataSourceKind::YDB);
71+ dsi.mutable_endpoint ()->Sethost (" some_host" );
72+ dsi.mutable_endpoint ()->Setport (2135 );
73+ dsi.Setdatabase (" some_db" );
74+ dsi.Setuse_tls (true );
75+ dsi.set_protocol (::NYql::NConnector::NApi::EProtocol::NATIVE);
76+ auto token = dsi.mutable_credentials () -> mutable_token ();
77+ token->Settype (" IAM" );
78+ token->Setvalue (" TEST_TOKEN" );
79+
80+ auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();
81+
82+ // clang-format off
83+ // step 1: ListSplits
84+ connectorMock->ExpectListSplits ()
85+ .Select ()
86+ .DataSourceInstance (dsi)
87+ .What ()
88+ .Column (" id" , Ydb::Type::UINT64)
89+ .NullableColumn (" optional_id" , Ydb::Type::UINT64)
90+ .NullableColumn (" string_value" , Ydb::Type::STRING)
91+ .Done ()
92+ .Table (" lookup_test" )
93+ .Where ()
94+ .Filter ()
95+ .Disjunction ()
96+ .Operand ()
97+ .Conjunction ()
98+ .Operand ().Equal ().Column (" id" ).Value <ui64>(0 ).Done ().Done ()
99+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(100 ).Done ().Done ()
100+ .Done ()
101+ .Done ()
102+ .Operand ()
103+ .Conjunction ()
104+ .Operand ().Equal ().Column (" id" ).Value <ui64>(1 ).Done ().Done ()
105+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(101 ).Done ().Done ()
106+ .Done ()
107+ .Done ()
108+ .Operand ()
109+ .Conjunction ()
110+ .Operand ().Equal ().Column (" id" ).Value <ui64>(2 ).Done ().Done ()
111+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(102 ).Done ().Done ()
112+ .Done ()
113+ .Done ()
114+ .Done ()
115+ .Done ()
116+ .Done ()
117+ .Done ()
118+ .MaxSplitCount (1 )
119+ .Result ()
120+ .AddResponse (NewSuccess ())
121+ .Description (" Actual split info is not important" )
122+ ;
123+
124+ connectorMock->ExpectReadSplits ()
125+ .DataSourceInstance (dsi)
126+ .Split ()
127+ .Description (" Actual split info is not important" )
128+ .Done ()
129+ .Result ()
130+ .AddResponse (
131+ MakeRecordBatch (
132+ MakeArray<arrow::UInt64Builder, ui64>(" id" , {0 , 1 , 2 }, arrow::uint64 ()),
133+ MakeArray<arrow::UInt64Builder, ui64>(" optional_id" , {100 , 101 , 103 }, arrow::uint64 ()), // the last value is intentially wrong
134+ MakeArray<arrow::StringBuilder, std::string>(" string_value" , {" a" , " b" , " c" }, arrow::utf8 ())
135+ ),
136+ NewSuccess ()
137+ )
138+ ;
139+ // clang-format on
140+
141+ NYql::Generic::TLookupSource lookupSourceSettings;
142+ *lookupSourceSettings.mutable_data_source_instance () = dsi;
143+ lookupSourceSettings.Settable (" lookup_test" );
144+ lookupSourceSettings.SetServiceAccountId (" testsaid" );
145+ lookupSourceSettings.SetServiceAccountIdSignature (" fake_signature" );
146+
147+ google::protobuf::Any packedLookupSource;
148+ Y_ABORT_UNLESS (packedLookupSource.PackFrom (lookupSourceSettings));
149+
150+ NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
151+ keyTypeBuilder.Add (" id" , typeBuilder.NewDataType (NUdf::EDataSlot::Uint64, false ));
152+ keyTypeBuilder.Add (" optional_id" , typeBuilder.NewDataType (NUdf::EDataSlot::Uint64, true ));
153+ NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
154+ outputypeBuilder.Add (" string_value" , typeBuilder.NewDataType (NUdf::EDataSlot::String, true ));
155+
156+ auto guard = Guard (*alloc.get ());
157+
158+ auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor (
159+ connectorMock,
160+ std::make_shared<NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
161+ edge,
162+ alloc,
163+ std::move (lookupSourceSettings),
164+ keyTypeBuilder.Build (),
165+ outputypeBuilder.Build (),
166+ typeEnv,
167+ holderFactory,
168+ 1'000'000 );
169+ runtime.Register (actor);
170+
171+ NKikimr::NMiniKQL::TUnboxedValueVector keys;
172+ for (size_t i = 0 ; i != 3 ; ++i) {
173+ NUdf::TUnboxedValue* keyItems;
174+ auto key = holderFactory.CreateDirectArrayHolder (2 , keyItems);
175+ keyItems[0 ] = NUdf::TUnboxedValuePod (ui64 (i));
176+ keyItems[1 ] = NUdf::TUnboxedValuePod (ui64 (100 + i));
177+ keys.push_back (std::move (key));
178+ }
179+
180+ guard.Release (); // let actors use alloc
181+
182+ auto callLookupActor = new TCallLookupActor (alloc, lookupSource, std::move (keys));
183+ runtime.Register (callLookupActor);
184+
185+ auto ev = runtime.GrabEdgeEventRethrow <NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
186+ auto guard2 = Guard (*alloc.get ());
187+ NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move (ev->Get ()->Data );
188+
189+ UNIT_ASSERT_EQUAL (3 , lookupResult.size ());
190+ {
191+ auto & [k, v] = lookupResult[0 ];
192+ UNIT_ASSERT_EQUAL (0 , k.GetElement (0 ).Get <ui64>());
193+ UNIT_ASSERT_EQUAL (100 , k.GetElement (1 ).Get <ui64>());
194+ NUdf::TUnboxedValue val = v.GetElement (0 );
195+ UNIT_ASSERT (val.AsStringRef () == TStringBuf (" a" ));
196+ }
197+ {
198+ auto & [k, v] = lookupResult[1 ];
199+ UNIT_ASSERT_EQUAL (1 , k.GetElement (0 ).Get <ui64>());
200+ UNIT_ASSERT_EQUAL (101 , k.GetElement (1 ).Get <ui64>());
201+ NUdf::TUnboxedValue val = v.GetElement (0 );
202+ UNIT_ASSERT (val.AsStringRef () == TStringBuf (" b" ));
203+ }
204+ {
205+ auto & [k, v] = lookupResult[2 ];
206+ UNIT_ASSERT_EQUAL (2 , k.GetElement (0 ).Get <ui64>());
207+ UNIT_ASSERT_EQUAL (102 , k.GetElement (1 ).Get <ui64>());
208+ // this key was not found and reported as empty
209+ UNIT_ASSERT (!v);
210+ }
211+ }
212+
213+ } // Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
0 commit comments