Skip to content

Commit f219f0a

Browse files
authored
Merge 3bfe766 into e573b9d
2 parents e573b9d + 3bfe766 commit f219f0a

File tree

12 files changed

+602
-68
lines changed

12 files changed

+602
-68
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
UNITTEST_FOR(ydb/library/yql/providers/yt/actors)
2+
3+
PEERDIR(
4+
ydb/library/yql/providers/yt/codec/codegen/no_llvm
5+
ydb/library/yql/providers/yt/comp_nodes/no_llvm
6+
ydb/library/yql/providers/yt/gateway/file
7+
ydb/library/yql/minikql/codegen/no_llvm
8+
ydb/library/actors/testlib
9+
ydb/library/yql/public/udf
10+
library/cpp/testing/unittest
11+
ydb/library/yql/sql/pg
12+
ydb/library/yql/public/udf/service/terminate_policy
13+
ydb/library/yql/parser/pg_wrapper/interface
14+
15+
)
16+
17+
SRCS(
18+
yql_yt_lookup_actor_ut.cpp
19+
)
20+
21+
YQL_LAST_ABI_VERSION()
22+
23+
END()
24+
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
2+
#include <ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h>
3+
#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h>
4+
5+
#include <ydb/library/yql/public/udf/udf_value.h>
6+
7+
#include <ydb/library/yql/minikql/mkql_alloc.h>
8+
#include <ydb/library/yql/minikql/mkql_node.h>
9+
#include <ydb/library/yql/minikql/mkql_node_builder.h>
10+
#include <ydb/library/yql/minikql/mkql_string_util.h>
11+
#include <ydb/library/yql/minikql/mkql_type_builder.h>
12+
#include <ydb/library/yql/utils/log/proto/logger_config.pb.h>
13+
#include <ydb/library/yql/utils/log/log.h>
14+
15+
#include <ydb/library/actors/testlib/test_runtime.h>
16+
#include <ydb/library/actors/core/actor_bootstrapped.h>
17+
18+
#include <util/system/tempfile.h>
19+
#include <library/cpp/testing/unittest/registar.h>
20+
#include <initializer_list>
21+
22+
using namespace NYql;
23+
using namespace NActors;
24+
25+
Y_UNIT_TEST_SUITE(YtLookupActor) {
26+
27+
NUdf::TUnboxedValue CreateStructValue(NKikimr::NMiniKQL::THolderFactory& holderFactory, std::initializer_list<TStringBuf> members) {
28+
NUdf::TUnboxedValue* items;
29+
NUdf::TUnboxedValue result = holderFactory.CreateDirectArrayHolder(members.size(), items);
30+
for (size_t i = 0; i != members.size(); ++i) {
31+
items[i] = NKikimr::NMiniKQL::MakeString(*(members.begin() + i));
32+
}
33+
return result;
34+
}
35+
36+
bool CheckStructValue(const NUdf::TUnboxedValue& v, std::initializer_list<TStringBuf> members) {
37+
for (size_t i = 0; i != members.size(); ++i) {
38+
NUdf::TUnboxedValue m = v.GetElement(i);
39+
if (m.AsStringRef() != *(members.begin() + i)) {
40+
return false;
41+
}
42+
}
43+
return true;
44+
}
45+
46+
//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
47+
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
48+
public:
49+
TCallLookupActor(
50+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
51+
NYql::NDq::IDqAsyncLookupSource* lookupSource,
52+
NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
53+
: Alloc(alloc)
54+
, LookupSource(lookupSource)
55+
, KeysToLookUp(std::move(keysToLookUp))
56+
{
57+
}
58+
59+
void Bootstrap() {
60+
LookupSource->AsyncLookup(std::move(KeysToLookUp));
61+
auto guard = Guard(*Alloc);
62+
KeysToLookUp.clear();
63+
KeysToLookUp.shrink_to_fit();
64+
}
65+
66+
private:
67+
static constexpr char ActorName[] = "TEST";
68+
69+
private:
70+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
71+
NYql::NDq::IDqAsyncLookupSource* LookupSource;
72+
NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
73+
};
74+
75+
Y_UNIT_TEST(Lookup) {
76+
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
77+
TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> functionRegistry = CreateFunctionRegistry(NKikimr::NMiniKQL::IBuiltinFunctionRegistry::TPtr());
78+
NKikimr::NMiniKQL::TMemoryUsageInfo memUsage("TestMemUsage");
79+
NKikimr::NMiniKQL::THolderFactory holderFactory(alloc->Ref(), memUsage);
80+
NKikimr::NMiniKQL::TTypeEnvironment typeEnv(*alloc);
81+
NKikimr::NMiniKQL::TTypeBuilder typeBuilder(typeEnv);
82+
83+
auto loggerConfig = NYql::NProto::TLoggingConfig();
84+
loggerConfig.set_allcomponentslevel(::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
85+
NYql::NLog::InitLogger(loggerConfig, false);
86+
87+
TTestActorRuntimeBase runtime;
88+
runtime.Initialize();
89+
auto edge = runtime.AllocateEdgeActor();
90+
91+
NYql::NYt::NSource::TLookupSource source;
92+
source.SetCluster("Plato");
93+
source.SetTable("Lookup");
94+
source.SetRowSpec(R"(
95+
{"_yql_row_spec"={
96+
"Type"=["StructType";[
97+
["hostname";["DataType";"String"]];
98+
["network";["DataType";"String"]];
99+
["fqdn";["DataType";"String"]];
100+
["ip4";["DataType";"String"]];
101+
["ip6";["DataType";"String"]]
102+
]];
103+
}}
104+
)");
105+
106+
NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
107+
keyTypeBuilder.Add("hostname", typeBuilder.NewDataType(NUdf::EDataSlot::String, false));
108+
keyTypeBuilder.Add("network", typeBuilder.NewDataType(NUdf::EDataSlot::String, false));
109+
NKikimr::NMiniKQL::TStructTypeBuilder payloadTypeBuilder{typeEnv};
110+
payloadTypeBuilder.Add("fqdn", typeBuilder.NewDataType(NUdf::EDataSlot::String, true));
111+
payloadTypeBuilder.Add("ip4", typeBuilder.NewDataType(NUdf::EDataSlot::String, true));
112+
113+
TTempFileHandle lookupTable("lookup.txt");
114+
TString lookupTableData = R"(
115+
{"hostname"="host1";"network"="vpc1";"fqdn"="host1.vpc1.net";"ip4"="192.168.1.1"; "ip6"="[xxxx:xxxx:xxxx:1111]"};
116+
{"hostname"="host2";"network"="vpc1";"fqdn"="host2.vpc1.net";"ip4"="192.168.1.2"; "ip6"="[xxxx:xxxx:xxxx:2222]"};
117+
{"hostname"="host1";"network"="vpc2";"fqdn"="host2.vpc2.net";"ip4"="192.168.2.1"; "ip6"="[xxxx:xxxx:xxxx:3333]"};
118+
{"hostname"="very very long hostname to for test 1";"network"="vpc1";"fqdn"="very very long fqdn for test 1";"ip4"="192.168.100.1"; "ip6"="[xxxx:xxxx:XXXX:1111]"};
119+
{"hostname"="very very long hostname to for test 2";"network"="vpc2";"fqdn"="very very long fqdn for test 2";"ip4"="192.168.100.2"; "ip6"="[xxxx:xxxx:XXXX:2222]"};
120+
)";
121+
lookupTable.Write(lookupTableData.data(), lookupTableData.size());
122+
const THashMap<TString, TString> mapping = {
123+
{"yt.Plato.Lookup", lookupTable.Name()}
124+
};
125+
auto ytServices = NFile::TYtFileServices::Make(
126+
nullptr,
127+
mapping
128+
);
129+
auto guard = Guard(*alloc.get());
130+
auto [lookupSource, actor] = NYql::NDq::CreateYtLookupActor(
131+
ytServices,
132+
edge,
133+
alloc,
134+
*functionRegistry,
135+
std::move(source),
136+
keyTypeBuilder.Build(),
137+
payloadTypeBuilder.Build(),
138+
typeEnv,
139+
holderFactory,
140+
1'000'000);
141+
runtime.Register(actor);
142+
143+
NKikimr::NMiniKQL::TUnboxedValueVector keys {\
144+
CreateStructValue(holderFactory, {"host1", "vpc1"}),
145+
CreateStructValue(holderFactory, {"host2", "vpc1"}),
146+
CreateStructValue(holderFactory, {"host2", "vpc2"}), //NOT_FOUND expected
147+
CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}),
148+
};
149+
150+
guard.Release(); //let actors use alloc
151+
152+
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys));
153+
runtime.Register(callLookupActor);
154+
155+
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
156+
auto guard2 = Guard(*alloc.get());
157+
NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);
158+
UNIT_ASSERT_EQUAL(4, lookupResult.size());
159+
{
160+
auto& [k, v] = lookupResult[0];
161+
UNIT_ASSERT(CheckStructValue(k, {"host1", "vpc1"}));
162+
UNIT_ASSERT(CheckStructValue(v, {"host1.vpc1.net", "192.168.1.1"}));
163+
}
164+
{
165+
auto& [k, v] = lookupResult[1];
166+
UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc1"}));
167+
UNIT_ASSERT(CheckStructValue(v, {"host2.vpc1.net", "192.168.1.2"}));
168+
}
169+
{
170+
auto& [k, v] = lookupResult[2];
171+
UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc2"}));
172+
UNIT_ASSERT(!v);
173+
}
174+
{
175+
auto& [k, v] = lookupResult[3];
176+
UNIT_ASSERT(CheckStructValue(k, {"very very long hostname to for test 2", "vpc2"}));
177+
UNIT_ASSERT(CheckStructValue(v, {"very very long fqdn for test 2", "192.168.100.2"}));
178+
}
179+
180+
}
181+
182+
} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
yql_yt_lookup_actor.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/library/yql/minikql/computation
9+
ydb/library/yql/providers/yt/proto
10+
ydb/library/yql/dq/actors/compute
11+
ydb/library/yql/minikql/computation
12+
ydb/library/yql/public/types
13+
)
14+
15+
YQL_LAST_ABI_VERSION()
16+
17+
END()
18+
19+
RECURSE_FOR_TESTS(ut)

0 commit comments

Comments
 (0)