Skip to content

Commit d770236

Browse files
authored
Merge 33f69f5 into cb9a972
2 parents cb9a972 + 33f69f5 commit d770236

File tree

10 files changed

+132
-155
lines changed

10 files changed

+132
-155
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -202,40 +202,48 @@ struct IDqComputeActorAsyncOutput {
202202
};
203203

204204
struct IDqAsyncLookupSource {
205+
using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
206+
using TUnboxedValueMap = THashMap<
207+
NUdf::TUnboxedValue,
208+
NUdf::TUnboxedValue,
209+
NKikimr::NMiniKQL::TValueHasher,
210+
NKikimr::NMiniKQL::TValueEqual,
211+
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
212+
>;
205213
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
206-
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TUnboxedValueVector&& keys)
214+
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
207215
: Alloc(alloc)
208-
, Keys(std::move(keys))
216+
, Request(std::move(request))
209217
{
210218
}
211219
~TEvLookupRequest() {
212220
auto guard = Guard(*Alloc);
213-
Keys = NKikimr::NMiniKQL::TUnboxedValueVector{};
221+
Request = TUnboxedValueMap{0, {{}, false, nullptr}, {{}, false, nullptr}};
214222
}
215-
216223
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
217-
NKikimr::NMiniKQL::TUnboxedValueVector Keys;
224+
TUnboxedValueMap Request;
218225
};
226+
219227
struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
220-
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TKeyPayloadPairVector&& data)
228+
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
221229
: Alloc(alloc)
222-
, Data(std::move(data))
230+
, Result(std::move(result))
223231
{
224232
}
225233
~TEvLookupResult() {
226234
auto guard = Guard(*Alloc.get());
227-
Data = NKikimr::NMiniKQL::TKeyPayloadPairVector{};
235+
Result = TUnboxedValueMap{0, {{}, false, nullptr}, {{}, false, nullptr}};
228236
}
229237

230238
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
231-
NKikimr::NMiniKQL::TKeyPayloadPairVector Data;
239+
TUnboxedValueMap Result;
232240
};
233241

234242
virtual size_t GetMaxSupportedKeysInRequest() const = 0;
235243
//Initiate lookup for requested keys
236244
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
237-
//Upon completion, results are sent in a TEvLookupResult to the preconfigured actor
238-
virtual void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) = 0;
245+
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
246+
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
239247
protected:
240248
~IDqAsyncLookupSource() {}
241249
};
@@ -266,6 +274,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
266274

267275
struct TLookupSourceArguments {
268276
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
277+
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
269278
NActors::TActorId ParentId;
270279
google::protobuf::Any LookupSource; //provider specific data source
271280
const NKikimr::NMiniKQL::TStructType* KeyType;

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class TInputTransformStreamLookupBase
5151
, LookupJoinColumns(std::move(lookupJoinColumns))
5252
, InputRowType(inputRowType)
5353
, LookupKeyType(lookupKeyType)
54-
, KeyTypeHelper(lookupKeyType)
54+
, KeyTypeHelper(std::make_shared<IDqAsyncLookupSource::TKeyTypeHelper>(lookupKeyType))
5555
, LookupPayloadType(lookupPayloadType)
5656
, OutputRowType(outputRowType)
5757
, OutputRowColumnOrder(outputRowColumnOrder)
@@ -67,6 +67,7 @@ class TInputTransformStreamLookupBase
6767
Become(&TInputTransformStreamLookupBase::StateFunc);
6868
NDq::IDqAsyncIoFactory::TLookupSourceArguments lookupSourceArgs {
6969
.Alloc = Alloc,
70+
.KeyTypeHelper = KeyTypeHelper,
7071
.ParentId = SelfId(),
7172
.LookupSource = Settings.GetRightSource().GetLookupSource(),
7273
.KeyType = LookupKeyType,
@@ -90,30 +91,15 @@ class TInputTransformStreamLookupBase
9091

9192
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
9293
auto guard = BindAllocator();
93-
THashMap<
94-
NUdf::TUnboxedValue,
95-
NUdf::TUnboxedValue,
96-
NKikimr::NMiniKQL::TValueHasher,
97-
NKikimr::NMiniKQL::TValueEqual,
98-
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
99-
> map(
100-
ev->Get()->Data.size(),
101-
KeyTypeHelper.GetValueHash(),
102-
KeyTypeHelper.GetValueEqual()
103-
);
104-
for (auto& r: ev->Get()->Data) {
105-
Y_ABORT_UNLESS(r.first.IsBoxed());
106-
Y_ABORT_UNLESS(!r.second || r.second.IsBoxed());
107-
map.emplace(std::move(r));
108-
}
94+
const auto lookupResult = std::move(ev->Get()->Result);
10995
while (!AwaitingQueue.empty()) {
11096
const auto wideInputRow = AwaitingQueue.Head();
11197
NUdf::TUnboxedValue* keyItems;
11298
NUdf::TUnboxedValue lookupKey = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
11399
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
114100
keyItems[i] = wideInputRow[InputJoinColumns[i]];
115101
}
116-
auto lookupPayload = map.FindPtr(lookupKey);
102+
auto lookupPayload = lookupResult.FindPtr(lookupKey);
117103

118104
NUdf::TUnboxedValue* outputRowItems;
119105
NUdf::TUnboxedValue outputRow = HolderFactory.CreateDirectArrayHolder(OutputRowColumnOrder.size(), outputRowItems);
@@ -163,7 +149,7 @@ class TInputTransformStreamLookupBase
163149
auto guard = BindAllocator();
164150
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
165151
InputFlow.Clear();
166-
KeyTypeHelper = TKeyTypeHelper{};
152+
KeyTypeHelper.reset();
167153
NMiniKQL::TUnboxedValueBatch{}.swap(AwaitingQueue);
168154
NMiniKQL::TUnboxedValueBatch{}.swap(ReadyQueue);
169155
}
@@ -180,21 +166,21 @@ class TInputTransformStreamLookupBase
180166
NUdf::TUnboxedValue* inputRowItems;
181167
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
182168
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
183-
NKikimr::NMiniKQL::TUnboxedValueVector keysForLookup;
169+
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
184170
while (
185171
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok) &&
186172
(keysForLookup.size() < maxKeysInRequest)
187173
) {
188174
NUdf::TUnboxedValue* keyItems;
189-
auto key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
175+
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
190176
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
191177
keyItems[i] = inputRowItems[InputJoinColumns[i]];
192178
}
193-
keysForLookup.push_back(key);
179+
keysForLookup.emplace(std::move(key), NUdf::TUnboxedValuePod{});
194180
AwaitingQueue.PushRow(inputRowItems, InputRowType->GetElementsCount());
195181
}
196182
if (!keysForLookup.empty()) {
197-
LookupSource.first->AsyncLookup(keysForLookup);
183+
LookupSource.first->AsyncLookup(std::move(keysForLookup));
198184
WaitingForLookupResults = true;
199185
}
200186
}
@@ -241,8 +227,7 @@ class TInputTransformStreamLookupBase
241227
const TVector<size_t> LookupJoinColumns;
242228
const NMiniKQL::TMultiType* const InputRowType;
243229
const NMiniKQL::TStructType* const LookupKeyType; //key column types in LookupTable
244-
using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
245-
TKeyTypeHelper KeyTypeHelper;
230+
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
246231
const NMiniKQL::TStructType* const LookupPayloadType; //other column types in LookupTable
247232
const NMiniKQL::TMultiType* const OutputRowType;
248233
const TOutputRowColumnOrder OutputRowColumnOrder;

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,31 @@ using namespace NActors;
2121

2222
Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
2323

24+
NYql::NUdf::TUnboxedValue CreateStructValue(NKikimr::NMiniKQL::THolderFactory& holderFactory, std::initializer_list<ui64> members) {
25+
NYql::NUdf::TUnboxedValue* items;
26+
NYql::NUdf::TUnboxedValue result = holderFactory.CreateDirectArrayHolder(members.size(), items);
27+
for (size_t i = 0; i != members.size(); ++i) {
28+
items[i] = NYql::NUdf::TUnboxedValuePod{*(members.begin() + i)};
29+
}
30+
return result;
31+
}
32+
2433
//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
2534
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
2635
public:
2736
TCallLookupActor(
2837
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
2938
NYql::NDq::IDqAsyncLookupSource* lookupSource,
30-
NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
39+
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
3140
: Alloc(alloc)
3241
, LookupSource(lookupSource)
33-
, KeysToLookUp(std::move(keysToLookUp))
42+
, Request(std::move(request))
3443
{
3544
}
3645

3746
void Bootstrap() {
38-
LookupSource->AsyncLookup(std::move(KeysToLookUp));
47+
LookupSource->AsyncLookup(std::move(Request));
3948
auto guard = Guard(*Alloc);
40-
KeysToLookUp.clear();
41-
KeysToLookUp.shrink_to_fit();
4249
}
4350

4451
private:
@@ -47,7 +54,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
4754
private:
4855
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
4956
NYql::NDq::IDqAsyncLookupSource* LookupSource;
50-
NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
57+
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
5158
};
5259

5360
Y_UNIT_TEST(Lookup) {
@@ -153,12 +160,14 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
153160
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true));
154161

155162
auto guard = Guard(*alloc.get());
163+
auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());
156164

157165
auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
158166
connectorMock,
159167
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
160168
edge,
161169
alloc,
170+
keyTypeHelper,
162171
std::move(lookupSourceSettings),
163172
keyTypeBuilder.Build(),
164173
outputypeBuilder.Build(),
@@ -167,46 +176,36 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
167176
1'000'000);
168177
runtime.Register(actor);
169178

170-
NKikimr::NMiniKQL::TUnboxedValueVector keys;
179+
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request;
171180
for (size_t i = 0; i != 3; ++i) {
172181
NYql::NUdf::TUnboxedValue* keyItems;
173182
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
174183
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
175184
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
176-
keys.push_back(std::move(key));
185+
request.emplace(std::move(key), NUdf::TUnboxedValue{});
177186
}
178187

179188
guard.Release(); //let actors use alloc
180189

181-
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys));
190+
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
182191
runtime.Register(callLookupActor);
183192

184193
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
185194
auto guard2 = Guard(*alloc.get());
186-
NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);
195+
auto lookupResult = std::move(ev->Get()->Result);
187196

188197
UNIT_ASSERT_EQUAL(3, lookupResult.size());
189-
{
190-
auto& [k, v] = lookupResult[0];
191-
UNIT_ASSERT_EQUAL(0, k.GetElement(0).Get<ui64>());
192-
UNIT_ASSERT_EQUAL(100, k.GetElement(1).Get<ui64>());
198+
if (const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}))) {
193199
NYql::NUdf::TUnboxedValue val = v.GetElement(0);
194200
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
195-
}
196-
{
197-
auto& [k, v] = lookupResult[1];
198-
UNIT_ASSERT_EQUAL(1, k.GetElement(0).Get<ui64>());
199-
UNIT_ASSERT_EQUAL(101, k.GetElement(1).Get<ui64>());
201+
} else UNIT_ASSERT(false);
202+
if (const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}))) {
200203
NYql::NUdf::TUnboxedValue val = v.GetElement(0);
201204
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
202-
}
203-
{
204-
auto& [k, v] = lookupResult[2];
205-
UNIT_ASSERT_EQUAL(2, k.GetElement(0).Get<ui64>());
206-
UNIT_ASSERT_EQUAL(102, k.GetElement(1).Get<ui64>());
207-
//this key was not found and reported as empty
208-
UNIT_ASSERT(!v);
209-
}
205+
} else UNIT_ASSERT(false);
206+
if (const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}))) {
207+
UNIT_ASSERT(*v);
208+
} else UNIT_ASSERT(false);
210209
}
211210

212211
} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor)

0 commit comments

Comments
 (0)