Skip to content

Commit 82687fa

Browse files
authored
YQ-2068 use HashMap for lookup request and result (#5594)
1 parent d23ebbe commit 82687fa

File tree

10 files changed

+140
-149
lines changed

10 files changed

+140
-149
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -202,40 +202,50 @@ struct IDqComputeActorAsyncOutput {
202202
};
203203

204204
struct IDqAsyncLookupSource {
205+
using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
206+
using TUnboxedValueMap = THashMap<
207+
NUdf::TUnboxedValue,
208+
NUdf::TUnboxedValue,
209+
NKikimr::NMiniKQL::TValueHasher,
210+
NKikimr::NMiniKQL::TValueEqual,
211+
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
212+
>;
205213
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
206-
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TUnboxedValueVector&& keys)
214+
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
207215
: Alloc(alloc)
208-
, Keys(std::move(keys))
216+
, Request(std::move(request))
209217
{
210218
}
211219
~TEvLookupRequest() {
212220
auto guard = Guard(*Alloc);
213-
Keys = NKikimr::NMiniKQL::TUnboxedValueVector{};
221+
TKeyTypeHelper empty;
222+
Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
214223
}
215-
216224
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
217-
NKikimr::NMiniKQL::TUnboxedValueVector Keys;
225+
TUnboxedValueMap Request;
218226
};
227+
219228
struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
220-
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TKeyPayloadPairVector&& data)
229+
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
221230
: Alloc(alloc)
222-
, Data(std::move(data))
231+
, Result(std::move(result))
223232
{
224233
}
225234
~TEvLookupResult() {
226235
auto guard = Guard(*Alloc.get());
227-
Data = NKikimr::NMiniKQL::TKeyPayloadPairVector{};
236+
TKeyTypeHelper empty;
237+
Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
228238
}
229239

230240
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
231-
NKikimr::NMiniKQL::TKeyPayloadPairVector Data;
241+
TUnboxedValueMap Result;
232242
};
233243

234244
virtual size_t GetMaxSupportedKeysInRequest() const = 0;
235245
//Initiate lookup for requested keys
236246
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
237-
//Upon completion, results are sent in a TEvLookupResult to the preconfigured actor
238-
virtual void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) = 0;
247+
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
248+
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
239249
protected:
240250
~IDqAsyncLookupSource() {}
241251
};
@@ -266,6 +276,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
266276

267277
struct TLookupSourceArguments {
268278
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
279+
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
269280
NActors::TActorId ParentId;
270281
google::protobuf::Any LookupSource; //provider specific data source
271282
const NKikimr::NMiniKQL::TStructType* KeyType;

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class TInputTransformStreamLookupBase
5151
, LookupJoinColumns(std::move(lookupJoinColumns))
5252
, InputRowType(inputRowType)
5353
, LookupKeyType(lookupKeyType)
54-
, KeyTypeHelper(lookupKeyType)
54+
, KeyTypeHelper(std::make_shared<IDqAsyncLookupSource::TKeyTypeHelper>(lookupKeyType))
5555
, LookupPayloadType(lookupPayloadType)
5656
, OutputRowType(outputRowType)
5757
, OutputRowColumnOrder(outputRowColumnOrder)
@@ -67,6 +67,7 @@ class TInputTransformStreamLookupBase
6767
Become(&TInputTransformStreamLookupBase::StateFunc);
6868
NDq::IDqAsyncIoFactory::TLookupSourceArguments lookupSourceArgs {
6969
.Alloc = Alloc,
70+
.KeyTypeHelper = KeyTypeHelper,
7071
.ParentId = SelfId(),
7172
.LookupSource = Settings.GetRightSource().GetLookupSource(),
7273
.KeyType = LookupKeyType,
@@ -90,30 +91,15 @@ class TInputTransformStreamLookupBase
9091

9192
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
9293
auto guard = BindAllocator();
93-
THashMap<
94-
NUdf::TUnboxedValue,
95-
NUdf::TUnboxedValue,
96-
NKikimr::NMiniKQL::TValueHasher,
97-
NKikimr::NMiniKQL::TValueEqual,
98-
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
99-
> map(
100-
ev->Get()->Data.size(),
101-
KeyTypeHelper.GetValueHash(),
102-
KeyTypeHelper.GetValueEqual()
103-
);
104-
for (auto& r: ev->Get()->Data) {
105-
Y_ABORT_UNLESS(r.first.IsBoxed());
106-
Y_ABORT_UNLESS(!r.second || r.second.IsBoxed());
107-
map.emplace(std::move(r));
108-
}
94+
const auto lookupResult = std::move(ev->Get()->Result);
10995
while (!AwaitingQueue.empty()) {
11096
const auto wideInputRow = AwaitingQueue.Head();
11197
NUdf::TUnboxedValue* keyItems;
11298
NUdf::TUnboxedValue lookupKey = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
11399
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
114100
keyItems[i] = wideInputRow[InputJoinColumns[i]];
115101
}
116-
auto lookupPayload = map.FindPtr(lookupKey);
102+
auto lookupPayload = lookupResult.FindPtr(lookupKey);
117103

118104
NUdf::TUnboxedValue* outputRowItems;
119105
NUdf::TUnboxedValue outputRow = HolderFactory.CreateDirectArrayHolder(OutputRowColumnOrder.size(), outputRowItems);
@@ -163,7 +149,7 @@ class TInputTransformStreamLookupBase
163149
auto guard = BindAllocator();
164150
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
165151
InputFlow.Clear();
166-
KeyTypeHelper = TKeyTypeHelper{};
152+
KeyTypeHelper.reset();
167153
NMiniKQL::TUnboxedValueBatch{}.swap(AwaitingQueue);
168154
NMiniKQL::TUnboxedValueBatch{}.swap(ReadyQueue);
169155
}
@@ -180,21 +166,21 @@ class TInputTransformStreamLookupBase
180166
NUdf::TUnboxedValue* inputRowItems;
181167
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
182168
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
183-
NKikimr::NMiniKQL::TUnboxedValueVector keysForLookup;
169+
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
184170
while (
185171
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok) &&
186172
(keysForLookup.size() < maxKeysInRequest)
187173
) {
188174
NUdf::TUnboxedValue* keyItems;
189-
auto key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
175+
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
190176
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
191177
keyItems[i] = inputRowItems[InputJoinColumns[i]];
192178
}
193-
keysForLookup.push_back(key);
179+
keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{});
194180
AwaitingQueue.PushRow(inputRowItems, InputRowType->GetElementsCount());
195181
}
196182
if (!keysForLookup.empty()) {
197-
LookupSource.first->AsyncLookup(keysForLookup);
183+
LookupSource.first->AsyncLookup(std::move(keysForLookup));
198184
WaitingForLookupResults = true;
199185
}
200186
}
@@ -241,8 +227,7 @@ class TInputTransformStreamLookupBase
241227
const TVector<size_t> LookupJoinColumns;
242228
const NMiniKQL::TMultiType* const InputRowType;
243229
const NMiniKQL::TStructType* const LookupKeyType; //key column types in LookupTable
244-
using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
245-
TKeyTypeHelper KeyTypeHelper;
230+
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
246231
const NMiniKQL::TStructType* const LookupPayloadType; //other column types in LookupTable
247232
const NMiniKQL::TMultiType* const OutputRowType;
248233
const TOutputRowColumnOrder OutputRowColumnOrder;

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,30 @@ using namespace NActors;
2121

2222
Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
2323

24+
NYql::NUdf::TUnboxedValue CreateStructValue(NKikimr::NMiniKQL::THolderFactory& holderFactory, std::initializer_list<ui64> members) {
25+
NYql::NUdf::TUnboxedValue* items;
26+
NYql::NUdf::TUnboxedValue result = holderFactory.CreateDirectArrayHolder(members.size(), items);
27+
for (size_t i = 0; i != members.size(); ++i) {
28+
items[i] = NYql::NUdf::TUnboxedValuePod{*(members.begin() + i)};
29+
}
30+
return result;
31+
}
32+
2433
//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
2534
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
2635
public:
2736
TCallLookupActor(
2837
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
2938
NYql::NDq::IDqAsyncLookupSource* lookupSource,
30-
NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
39+
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
3140
: Alloc(alloc)
3241
, LookupSource(lookupSource)
33-
, KeysToLookUp(std::move(keysToLookUp))
42+
, Request(std::move(request))
3443
{
3544
}
3645

3746
void Bootstrap() {
38-
LookupSource->AsyncLookup(std::move(KeysToLookUp));
39-
auto guard = Guard(*Alloc);
40-
KeysToLookUp.clear();
41-
KeysToLookUp.shrink_to_fit();
47+
LookupSource->AsyncLookup(std::move(Request));
4248
}
4349

4450
private:
@@ -47,7 +53,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
4753
private:
4854
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
4955
NYql::NDq::IDqAsyncLookupSource* LookupSource;
50-
NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
56+
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
5157
};
5258

5359
Y_UNIT_TEST(Lookup) {
@@ -94,8 +100,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
94100
.Disjunction()
95101
.Operand()
96102
.Conjunction()
97-
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
98-
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
103+
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
104+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
99105
.Done()
100106
.Done()
101107
.Operand()
@@ -106,8 +112,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
106112
.Done()
107113
.Operand()
108114
.Conjunction()
109-
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
110-
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
115+
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
116+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
111117
.Done()
112118
.Done()
113119
.Done()
@@ -153,12 +159,14 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
153159
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true));
154160

155161
auto guard = Guard(*alloc.get());
162+
auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());
156163

157164
auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
158165
connectorMock,
159166
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
160167
edge,
161168
alloc,
169+
keyTypeHelper,
162170
std::move(lookupSourceSettings),
163171
keyTypeBuilder.Build(),
164172
outputypeBuilder.Build(),
@@ -167,45 +175,41 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
167175
1'000'000);
168176
runtime.Register(actor);
169177

170-
NKikimr::NMiniKQL::TUnboxedValueVector keys;
178+
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
171179
for (size_t i = 0; i != 3; ++i) {
172180
NYql::NUdf::TUnboxedValue* keyItems;
173181
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
174182
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
175183
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
176-
keys.push_back(std::move(key));
184+
request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
177185
}
178186

179187
guard.Release(); //let actors use alloc
180188

181-
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys));
189+
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
182190
runtime.Register(callLookupActor);
183191

184192
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
185193
auto guard2 = Guard(*alloc.get());
186-
NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);
194+
auto lookupResult = std::move(ev->Get()->Result);
187195

188196
UNIT_ASSERT_EQUAL(3, lookupResult.size());
189197
{
190-
auto& [k, v] = lookupResult[0];
191-
UNIT_ASSERT_EQUAL(0, k.GetElement(0).Get<ui64>());
192-
UNIT_ASSERT_EQUAL(100, k.GetElement(1).Get<ui64>());
193-
NYql::NUdf::TUnboxedValue val = v.GetElement(0);
198+
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100}));
199+
UNIT_ASSERT(v);
200+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
194201
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
195202
}
196203
{
197-
auto& [k, v] = lookupResult[1];
198-
UNIT_ASSERT_EQUAL(1, k.GetElement(0).Get<ui64>());
199-
UNIT_ASSERT_EQUAL(101, k.GetElement(1).Get<ui64>());
200-
NYql::NUdf::TUnboxedValue val = v.GetElement(0);
204+
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101}));
205+
UNIT_ASSERT(v);
206+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
201207
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
202208
}
203209
{
204-
auto& [k, v] = lookupResult[2];
205-
UNIT_ASSERT_EQUAL(2, k.GetElement(0).Get<ui64>());
206-
UNIT_ASSERT_EQUAL(102, k.GetElement(1).Get<ui64>());
207-
//this key was not found and reported as empty
208-
UNIT_ASSERT(!v);
210+
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102}));
211+
UNIT_ASSERT(v);
212+
UNIT_ASSERT(!*v);
209213
}
210214
}
211215

0 commit comments

Comments
 (0)