Skip to content

Commit 9dffef6

Browse files
authored
streamlookup, generic lookup, yt lookup: remove TUVMap usage from events (#9758)
1 parent e0fcd02 commit 9dffef6

File tree

6 files changed

+136
-102
lines changed

6 files changed

+136
-102
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -213,41 +213,26 @@ struct IDqAsyncLookupSource {
213213
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
214214
>;
215215
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
216-
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
217-
: Alloc(alloc)
218-
, Request(std::move(request))
216+
TEvLookupRequest(std::weak_ptr<TUnboxedValueMap> request)
217+
: Request(std::move(request))
219218
{
220219
}
221-
~TEvLookupRequest() {
222-
auto guard = Guard(*Alloc);
223-
TKeyTypeHelper empty;
224-
Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
225-
}
226-
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
227-
TUnboxedValueMap Request;
220+
std::weak_ptr<TUnboxedValueMap> Request;
228221
};
229222

230223
struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
231-
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
232-
: Alloc(alloc)
233-
, Result(std::move(result))
224+
TEvLookupResult(std::weak_ptr<TUnboxedValueMap> result)
225+
: Result(std::move(result))
234226
{
235227
}
236-
~TEvLookupResult() {
237-
auto guard = Guard(*Alloc.get());
238-
TKeyTypeHelper empty;
239-
Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
240-
}
241-
242-
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
243-
TUnboxedValueMap Result;
228+
std::weak_ptr<TUnboxedValueMap> Result;
244229
};
245230

246231
virtual size_t GetMaxSupportedKeysInRequest() const = 0;
247232
//Initiate lookup for requested keys
248233
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
249-
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
250-
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
234+
//Upon completion, TEvLookupResult event is sent to the preconfigured actor
235+
virtual void AsyncLookup(std::weak_ptr<TUnboxedValueMap> request) = 0;
251236
protected:
252237
~IDqAsyncLookupSource() {}
253238
};

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ class TInputTransformStreamLookupBase
6565
, LruCache(std::make_unique<NKikimr::NMiniKQL::TUnboxedKeyValueLruCacheWithTtl>(cacheLimit, lookupKeyType))
6666
, CacheTtl(cacheTtl)
6767
, ReadyQueue(OutputRowType)
68-
, WaitingForLookupResults(false)
6968
{
7069
Y_ABORT_UNLESS(Alloc);
7170
for (size_t i = 0; i != LookupInputIndexes.size(); ++i) {
@@ -91,8 +90,9 @@ class TInputTransformStreamLookupBase
9190
.MaxKeysInRequest = 1000 // TODO configure me
9291
};
9392
auto guard = Guard(*Alloc);
94-
LookupSource = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
95-
RegisterWithSameMailbox(LookupSource.second);
93+
auto [lookupSource, lookupSourceActor] = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
94+
MaxKeysInRequest = lookupSource->GetMaxSupportedKeysInRequest();
95+
LookupSourceId = RegisterWithSameMailbox(lookupSourceActor);
9696
}
9797
protected:
9898
virtual NUdf::EFetchStatus FetchWideInputValue(NUdf::TUnboxedValue* inputRowItems) = 0;
@@ -134,19 +134,20 @@ class TInputTransformStreamLookupBase
134134
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
135135
auto guard = BindAllocator();
136136
const auto now = std::chrono::steady_clock::now();
137-
auto lookupResult = std::move(ev->Get()->Result);
137+
auto lookupResult = ev->Get()->Result.lock();
138+
Y_ABORT_UNLESS(lookupResult == KeysForLookup);
138139
for (; !AwaitingQueue.empty(); AwaitingQueue.pop_front()) {
139140
auto& [lookupKey, inputOther] = AwaitingQueue.front();
140-
auto lookupPayload = lookupResult.FindPtr(lookupKey);
141+
auto lookupPayload = lookupResult->FindPtr(lookupKey);
141142
if (lookupPayload == nullptr) {
142143
continue;
143144
}
144145
AddReadyQueue(lookupKey, inputOther, lookupPayload);
145146
}
146-
for (auto&& [k, v]: lookupResult) {
147+
for (auto&& [k, v]: *lookupResult) {
147148
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
148149
}
149-
WaitingForLookupResults = false;
150+
KeysForLookup.reset();
150151
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
151152
}
152153

@@ -165,9 +166,10 @@ class TInputTransformStreamLookupBase
165166
}
166167

167168
void PassAway() final {
168-
Send(LookupSource.second->SelfId(), new NActors::TEvents::TEvPoison{});
169+
Send(LookupSourceId, new NActors::TEvents::TEvPoison{});
169170
auto guard = BindAllocator();
170171
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
172+
KeysForLookup.reset();
171173
InputFlow.Clear();
172174
KeyTypeHelper.reset();
173175
decltype(AwaitingQueue){}.swap(AwaitingQueue);
@@ -188,15 +190,14 @@ class TInputTransformStreamLookupBase
188190

189191
DrainReadyQueue(batch);
190192

191-
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !WaitingForLookupResults) {
193+
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !KeysForLookup) {
192194
NUdf::TUnboxedValue* inputRowItems;
193195
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
194196
const auto now = std::chrono::steady_clock::now();
195-
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
196-
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
197+
KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual());
197198
LruCache->Prune(now);
198199
while (
199-
(keysForLookup.size() < maxKeysInRequest) &&
200+
(KeysForLookup->size() < MaxKeysInRequest) &&
200201
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok)) {
201202
NUdf::TUnboxedValue* keyItems;
202203
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(LookupInputIndexes.size(), keyItems);
@@ -212,12 +213,13 @@ class TInputTransformStreamLookupBase
212213
AddReadyQueue(key, other, &*lookupPayload);
213214
} else {
214215
AwaitingQueue.emplace_back(key, std::move(other));
215-
keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{});
216+
KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{});
216217
}
217218
}
218-
if (!keysForLookup.empty()) {
219-
LookupSource.first->AsyncLookup(std::move(keysForLookup));
220-
WaitingForLookupResults = true;
219+
if (!KeysForLookup->empty()) {
220+
Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup));
221+
} else {
222+
KeysForLookup.reset();
221223
}
222224
DrainReadyQueue(batch);
223225
}
@@ -259,7 +261,8 @@ class TInputTransformStreamLookupBase
259261
IDqAsyncIoFactory::TPtr Factory;
260262
NDqProto::TDqInputTransformLookupSettings Settings;
261263
protected:
262-
std::pair<IDqAsyncLookupSource*, NActors::IActor*> LookupSource;
264+
NActors::TActorId LookupSourceId;
265+
size_t MaxKeysInRequest;
263266
const TVector<size_t> LookupInputIndexes;
264267
const TVector<size_t> OtherInputIndexes;
265268
const NMiniKQL::TMultiType* const InputRowType;
@@ -276,8 +279,8 @@ class TInputTransformStreamLookupBase
276279
using TAwaitingQueue = std::deque<TInputKeyOtherPair, NKikimr::NMiniKQL::TMKQLAllocator<TInputKeyOtherPair>>; //input row split in two parts: key columns and other columns
277280
TAwaitingQueue AwaitingQueue;
278281
NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue;
279-
std::atomic<bool> WaitingForLookupResults;
280282
NYql::NDq::TDqAsyncStats IngressStats;
283+
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> KeysForLookup;
281284
};
282285

283286
class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase {

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,35 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
3535
public:
3636
TCallLookupActor(
3737
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
38-
NYql::NDq::IDqAsyncLookupSource* lookupSource,
39-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
38+
const NActors::TActorId& lookupActor,
39+
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> request)
4040
: Alloc(alloc)
41-
, LookupSource(lookupSource)
42-
, Request(std::move(request))
41+
, LookupActor(lookupActor)
42+
, Request(request)
4343
{
4444
}
4545

4646
void Bootstrap() {
47-
LookupSource->AsyncLookup(std::move(Request));
47+
auto ev = new NYql::NDq::IDqAsyncLookupSource::TEvLookupRequest(Request);
48+
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(LookupActor, SelfId(), ev));
49+
}
50+
51+
void PassAway() override {
52+
auto guard = Guard(*Alloc);
53+
Request.reset();
54+
}
55+
56+
~TCallLookupActor() {
57+
PassAway();
4858
}
4959

5060
private:
5161
static constexpr char ActorName[] = "TEST";
5262

5363
private:
5464
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
55-
NYql::NDq::IDqAsyncLookupSource* LookupSource;
56-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
65+
const NActors::TActorId LookupActor;
66+
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> Request;
5767
};
5868

5969
Y_UNIT_TEST(Lookup) {
@@ -173,41 +183,42 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
173183
typeEnv,
174184
holderFactory,
175185
1'000'000);
176-
runtime.Register(actor);
186+
auto lookupActor = runtime.Register(actor);
177187

178-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
188+
auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
179189
for (size_t i = 0; i != 3; ++i) {
180190
NYql::NUdf::TUnboxedValue* keyItems;
181191
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
182192
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
183193
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
184-
request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
194+
request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
185195
}
186196

187197
guard.Release(); // let actors use alloc
188198

189-
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
199+
auto callLookupActor = new TCallLookupActor(alloc, lookupActor, request);
190200
runtime.Register(callLookupActor);
191201

192202
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
193203
auto guard2 = Guard(*alloc.get());
194-
auto lookupResult = std::move(ev->Get()->Result);
204+
auto lookupResult = ev->Get()->Result.lock();
205+
UNIT_ASSERT(lookupResult);
195206

196-
UNIT_ASSERT_EQUAL(3, lookupResult.size());
207+
UNIT_ASSERT_EQUAL(3, lookupResult->size());
197208
{
198-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100}));
209+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}));
199210
UNIT_ASSERT(v);
200211
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
201212
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
202213
}
203214
{
204-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101}));
215+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}));
205216
UNIT_ASSERT(v);
206217
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
207218
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
208219
}
209220
{
210-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102}));
221+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}));
211222
UNIT_ASSERT(v);
212223
UNIT_ASSERT(!*v);
213224
}

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,20 @@ namespace NYql::NDq {
8484
, HolderFactory(holderFactory)
8585
, ColumnDestinations(CreateColumnDestination())
8686
, MaxKeysInRequest(maxKeysInRequest)
87-
, Request(
88-
0,
89-
KeyTypeHelper->GetValueHash(),
90-
KeyTypeHelper->GetValueEqual())
9187
{
9288
}
9389

9490
~TGenericLookupActor() {
91+
Free();
92+
}
93+
94+
private:
95+
void Free() {
9596
auto guard = Guard(*Alloc);
97+
Request.reset();
9698
KeyTypeHelper.reset();
97-
TKeyTypeHelper empty;
98-
Request = IDqAsyncLookupSource::TUnboxedValueMap(0, empty.GetValueHash(), empty.GetValueEqual());
9999
}
100+
public:
100101

101102
void Bootstrap() {
102103
auto dsi = LookupSource.data_source_instance();
@@ -116,13 +117,18 @@ namespace NYql::NDq {
116117
size_t GetMaxSupportedKeysInRequest() const override {
117118
return MaxKeysInRequest;
118119
}
119-
void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override {
120+
void AsyncLookup(std::weak_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) override {
120121
auto guard = Guard(*Alloc);
121-
CreateRequest(std::move(request));
122+
CreateRequest(request.lock());
123+
}
124+
void PassAway() override {
125+
Free();
126+
TBase::PassAway();
122127
}
123128

124129
private: // events
125130
STRICT_STFUNC(StateFunc,
131+
hFunc(TEvLookupRequest, Handle);
126132
hFunc(TEvListSplitsIterator, Handle);
127133
hFunc(TEvListSplitsPart, Handle);
128134
hFunc(TEvReadSplitsIterator, Handle);
@@ -196,11 +202,18 @@ namespace NYql::NDq {
196202
PassAway();
197203
}
198204

205+
void Handle(TEvLookupRequest::TPtr ev) {
206+
auto guard = Guard(*Alloc);
207+
CreateRequest(ev->Get()->Request.lock());
208+
}
209+
199210
private:
200-
void CreateRequest(IDqAsyncLookupSource::TUnboxedValueMap&& request) {
201-
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys";
202-
Y_ABORT_IF(InProgress);
203-
Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest);
211+
void CreateRequest(std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) {
212+
if (!request) {
213+
return;
214+
}
215+
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
216+
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);
204217

205218
Request = std::move(request);
206219
NConnector::NApi::TListSplitsRequest splitRequest;
@@ -272,20 +285,20 @@ namespace NYql::NDq {
272285
for (size_t j = 0; j != columns.size(); ++j) {
273286
(ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i];
274287
}
275-
if (auto* v = Request.FindPtr(key)) {
288+
if (auto* v = Request->FindPtr(key)) {
276289
*v = std::move(output); // duplicates will be overwritten
277290
}
278291
}
279292
}
280293

281294
void FinalizeRequest() {
282-
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request.size() << " keys";
295+
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys";
283296
auto guard = Guard(*Alloc);
284-
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(Request));
297+
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request);
298+
Request.reset();
285299
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
286300
LookupResult = {};
287301
ReadSplitsIterator = {};
288-
InProgress = false;
289302
}
290303

291304
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NConnector::NApi::TError& error) {
@@ -351,7 +364,7 @@ namespace NYql::NDq {
351364
select.mutable_from()->Settable(LookupSource.table());
352365

353366
NConnector::NApi::TPredicate_TDisjunction disjunction;
354-
for (const auto& [k, _] : Request) {
367+
for (const auto& [k, _] : *Request) {
355368
NConnector::NApi::TPredicate_TConjunction conjunction;
356369
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
357370
NConnector::NApi::TPredicate_TComparison eq;
@@ -381,8 +394,7 @@ namespace NYql::NDq {
381394
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
382395
const std::vector<std::pair<EColumnDestination, size_t>> ColumnDestinations;
383396
const size_t MaxKeysInRequest;
384-
std::atomic_bool InProgress;
385-
IDqAsyncLookupSource::TUnboxedValueMap Request;
397+
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
386398
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
387399
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
388400
};

0 commit comments

Comments
 (0)