Skip to content

Commit bc032ee

Browse files
committed
streamlookup, generic lookup, yt lookup: remove TUVMap usage from events (ydb-platform#9758)
(cherry picked from commit 9dffef6)
1 parent e4cc2aa commit bc032ee

File tree

6 files changed

+136
-102
lines changed

6 files changed

+136
-102
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -212,41 +212,26 @@ struct IDqAsyncLookupSource {
212212
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
213213
>;
214214
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
215-
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
216-
: Alloc(alloc)
217-
, Request(std::move(request))
215+
TEvLookupRequest(std::weak_ptr<TUnboxedValueMap> request)
216+
: Request(std::move(request))
218217
{
219218
}
220-
~TEvLookupRequest() {
221-
auto guard = Guard(*Alloc);
222-
TKeyTypeHelper empty;
223-
Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
224-
}
225-
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
226-
TUnboxedValueMap Request;
219+
std::weak_ptr<TUnboxedValueMap> Request;
227220
};
228221

229222
struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
230-
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
231-
: Alloc(alloc)
232-
, Result(std::move(result))
223+
TEvLookupResult(std::weak_ptr<TUnboxedValueMap> result)
224+
: Result(std::move(result))
233225
{
234226
}
235-
~TEvLookupResult() {
236-
auto guard = Guard(*Alloc.get());
237-
TKeyTypeHelper empty;
238-
Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
239-
}
240-
241-
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
242-
TUnboxedValueMap Result;
227+
std::weak_ptr<TUnboxedValueMap> Result;
243228
};
244229

245230
virtual size_t GetMaxSupportedKeysInRequest() const = 0;
246231
//Initiate lookup for requested keys
247232
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
248-
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
249-
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
233+
//Upon completion, TEvLookupResult event is sent to the preconfigured actor
234+
virtual void AsyncLookup(std::weak_ptr<TUnboxedValueMap> request) = 0;
250235
protected:
251236
~IDqAsyncLookupSource() {}
252237
};

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ class TInputTransformStreamLookupBase
6565
, LruCache(std::make_unique<NKikimr::NMiniKQL::TUnboxedKeyValueLruCacheWithTtl>(cacheLimit, lookupKeyType))
6666
, CacheTtl(cacheTtl)
6767
, ReadyQueue(OutputRowType)
68-
, WaitingForLookupResults(false)
6968
{
7069
Y_ABORT_UNLESS(Alloc);
7170
for (size_t i = 0; i != LookupInputIndexes.size(); ++i) {
@@ -91,8 +90,9 @@ class TInputTransformStreamLookupBase
9190
.MaxKeysInRequest = 1000 // TODO configure me
9291
};
9392
auto guard = Guard(*Alloc);
94-
LookupSource = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
95-
RegisterWithSameMailbox(LookupSource.second);
93+
auto [lookupSource, lookupSourceActor] = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
94+
MaxKeysInRequest = lookupSource->GetMaxSupportedKeysInRequest();
95+
LookupSourceId = RegisterWithSameMailbox(lookupSourceActor);
9696
}
9797
protected:
9898
virtual NUdf::EFetchStatus FetchWideInputValue(NUdf::TUnboxedValue* inputRowItems) = 0;
@@ -134,19 +134,20 @@ class TInputTransformStreamLookupBase
134134
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
135135
auto guard = BindAllocator();
136136
const auto now = std::chrono::steady_clock::now();
137-
auto lookupResult = std::move(ev->Get()->Result);
137+
auto lookupResult = ev->Get()->Result.lock();
138+
Y_ABORT_UNLESS(lookupResult == KeysForLookup);
138139
for (; !AwaitingQueue.empty(); AwaitingQueue.pop_front()) {
139140
auto& [lookupKey, inputOther] = AwaitingQueue.front();
140-
auto lookupPayload = lookupResult.FindPtr(lookupKey);
141+
auto lookupPayload = lookupResult->FindPtr(lookupKey);
141142
if (lookupPayload == nullptr) {
142143
continue;
143144
}
144145
AddReadyQueue(lookupKey, inputOther, lookupPayload);
145146
}
146-
for (auto&& [k, v]: lookupResult) {
147+
for (auto&& [k, v]: *lookupResult) {
147148
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
148149
}
149-
WaitingForLookupResults = false;
150+
KeysForLookup.reset();
150151
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
151152
}
152153

@@ -165,9 +166,10 @@ class TInputTransformStreamLookupBase
165166
}
166167

167168
void PassAway() final {
168-
Send(LookupSource.second->SelfId(), new NActors::TEvents::TEvPoison{});
169+
Send(LookupSourceId, new NActors::TEvents::TEvPoison{});
169170
auto guard = BindAllocator();
170171
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
172+
KeysForLookup.reset();
171173
InputFlow.Clear();
172174
KeyTypeHelper.reset();
173175
decltype(AwaitingQueue){}.swap(AwaitingQueue);
@@ -188,15 +190,14 @@ class TInputTransformStreamLookupBase
188190

189191
DrainReadyQueue(batch);
190192

191-
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !WaitingForLookupResults) {
193+
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !KeysForLookup) {
192194
NUdf::TUnboxedValue* inputRowItems;
193195
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
194196
const auto now = std::chrono::steady_clock::now();
195-
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
196-
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
197+
KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual());
197198
LruCache->Prune(now);
198199
while (
199-
(keysForLookup.size() < maxKeysInRequest) &&
200+
(KeysForLookup->size() < MaxKeysInRequest) &&
200201
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok)) {
201202
NUdf::TUnboxedValue* keyItems;
202203
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(LookupInputIndexes.size(), keyItems);
@@ -212,12 +213,13 @@ class TInputTransformStreamLookupBase
212213
AddReadyQueue(key, other, &*lookupPayload);
213214
} else {
214215
AwaitingQueue.emplace_back(key, std::move(other));
215-
keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{});
216+
KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{});
216217
}
217218
}
218-
if (!keysForLookup.empty()) {
219-
LookupSource.first->AsyncLookup(std::move(keysForLookup));
220-
WaitingForLookupResults = true;
219+
if (!KeysForLookup->empty()) {
220+
Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup));
221+
} else {
222+
KeysForLookup.reset();
221223
}
222224
DrainReadyQueue(batch);
223225
}
@@ -259,7 +261,8 @@ class TInputTransformStreamLookupBase
259261
IDqAsyncIoFactory::TPtr Factory;
260262
NDqProto::TDqInputTransformLookupSettings Settings;
261263
protected:
262-
std::pair<IDqAsyncLookupSource*, NActors::IActor*> LookupSource;
264+
NActors::TActorId LookupSourceId;
265+
size_t MaxKeysInRequest;
263266
const TVector<size_t> LookupInputIndexes;
264267
const TVector<size_t> OtherInputIndexes;
265268
const NMiniKQL::TMultiType* const InputRowType;
@@ -276,8 +279,8 @@ class TInputTransformStreamLookupBase
276279
using TAwaitingQueue = std::deque<TInputKeyOtherPair, NKikimr::NMiniKQL::TMKQLAllocator<TInputKeyOtherPair>>; //input row split in two parts: key columns and other columns
277280
TAwaitingQueue AwaitingQueue;
278281
NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue;
279-
std::atomic<bool> WaitingForLookupResults;
280282
NYql::NDq::TDqAsyncStats IngressStats;
283+
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> KeysForLookup;
281284
};
282285

283286
class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase {

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,35 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
3535
public:
3636
TCallLookupActor(
3737
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
38-
NYql::NDq::IDqAsyncLookupSource* lookupSource,
39-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
38+
const NActors::TActorId& lookupActor,
39+
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> request)
4040
: Alloc(alloc)
41-
, LookupSource(lookupSource)
42-
, Request(std::move(request))
41+
, LookupActor(lookupActor)
42+
, Request(request)
4343
{
4444
}
4545

4646
void Bootstrap() {
47-
LookupSource->AsyncLookup(std::move(Request));
47+
auto ev = new NYql::NDq::IDqAsyncLookupSource::TEvLookupRequest(Request);
48+
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(LookupActor, SelfId(), ev));
49+
}
50+
51+
void PassAway() override {
52+
auto guard = Guard(*Alloc);
53+
Request.reset();
54+
}
55+
56+
~TCallLookupActor() {
57+
PassAway();
4858
}
4959

5060
private:
5161
static constexpr char ActorName[] = "TEST";
5262

5363
private:
5464
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
55-
NYql::NDq::IDqAsyncLookupSource* LookupSource;
56-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
65+
const NActors::TActorId LookupActor;
66+
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> Request;
5767
};
5868

5969
Y_UNIT_TEST(Lookup) {
@@ -174,41 +184,42 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
174184
typeEnv,
175185
holderFactory,
176186
1'000'000);
177-
runtime.Register(actor);
187+
auto lookupActor = runtime.Register(actor);
178188

179-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
189+
auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
180190
for (size_t i = 0; i != 3; ++i) {
181191
NYql::NUdf::TUnboxedValue* keyItems;
182192
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
183193
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
184194
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
185-
request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
195+
request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
186196
}
187197

188198
guard.Release(); // let actors use alloc
189199

190-
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
200+
auto callLookupActor = new TCallLookupActor(alloc, lookupActor, request);
191201
runtime.Register(callLookupActor);
192202

193203
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
194204
auto guard2 = Guard(*alloc.get());
195-
auto lookupResult = std::move(ev->Get()->Result);
205+
auto lookupResult = ev->Get()->Result.lock();
206+
UNIT_ASSERT(lookupResult);
196207

197-
UNIT_ASSERT_EQUAL(3, lookupResult.size());
208+
UNIT_ASSERT_EQUAL(3, lookupResult->size());
198209
{
199-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100}));
210+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}));
200211
UNIT_ASSERT(v);
201212
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
202213
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
203214
}
204215
{
205-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101}));
216+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}));
206217
UNIT_ASSERT(v);
207218
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
208219
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
209220
}
210221
{
211-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102}));
222+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}));
212223
UNIT_ASSERT(v);
213224
UNIT_ASSERT(!*v);
214225
}

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,20 @@ namespace NYql::NDq {
8484
, HolderFactory(holderFactory)
8585
, ColumnDestinations(CreateColumnDestination())
8686
, MaxKeysInRequest(maxKeysInRequest)
87-
, Request(
88-
0,
89-
KeyTypeHelper->GetValueHash(),
90-
KeyTypeHelper->GetValueEqual())
9187
{
9288
}
9389

9490
~TGenericLookupActor() {
91+
Free();
92+
}
93+
94+
private:
95+
void Free() {
9596
auto guard = Guard(*Alloc);
97+
Request.reset();
9698
KeyTypeHelper.reset();
97-
TKeyTypeHelper empty;
98-
Request = IDqAsyncLookupSource::TUnboxedValueMap(0, empty.GetValueHash(), empty.GetValueEqual());
9999
}
100+
public:
100101

101102
void Bootstrap() {
102103
auto dsi = LookupSource.data_source_instance();
@@ -116,13 +117,18 @@ namespace NYql::NDq {
116117
size_t GetMaxSupportedKeysInRequest() const override {
117118
return MaxKeysInRequest;
118119
}
119-
void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override {
120+
void AsyncLookup(std::weak_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) override {
120121
auto guard = Guard(*Alloc);
121-
CreateRequest(std::move(request));
122+
CreateRequest(request.lock());
123+
}
124+
void PassAway() override {
125+
Free();
126+
TBase::PassAway();
122127
}
123128

124129
private: // events
125130
STRICT_STFUNC(StateFunc,
131+
hFunc(TEvLookupRequest, Handle);
126132
hFunc(TEvListSplitsIterator, Handle);
127133
hFunc(TEvListSplitsPart, Handle);
128134
hFunc(TEvReadSplitsIterator, Handle);
@@ -197,11 +203,18 @@ namespace NYql::NDq {
197203
PassAway();
198204
}
199205

206+
void Handle(TEvLookupRequest::TPtr ev) {
207+
auto guard = Guard(*Alloc);
208+
CreateRequest(ev->Get()->Request.lock());
209+
}
210+
200211
private:
201-
void CreateRequest(IDqAsyncLookupSource::TUnboxedValueMap&& request) {
202-
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys";
203-
Y_ABORT_IF(InProgress);
204-
Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest);
212+
void CreateRequest(std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) {
213+
if (!request) {
214+
return;
215+
}
216+
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
217+
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);
205218

206219
Request = std::move(request);
207220
NConnector::NApi::TListSplitsRequest splitRequest;
@@ -273,20 +286,20 @@ namespace NYql::NDq {
273286
for (size_t j = 0; j != columns.size(); ++j) {
274287
(ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i];
275288
}
276-
if (auto* v = Request.FindPtr(key)) {
289+
if (auto* v = Request->FindPtr(key)) {
277290
*v = std::move(output); // duplicates will be overwritten
278291
}
279292
}
280293
}
281294

282295
void FinalizeRequest() {
283-
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request.size() << " keys";
296+
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys";
284297
auto guard = Guard(*Alloc);
285-
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(Request));
298+
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request);
299+
Request.reset();
286300
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
287301
LookupResult = {};
288302
ReadSplitsIterator = {};
289-
InProgress = false;
290303
}
291304

292305
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NConnector::NApi::TError& error) {
@@ -352,7 +365,7 @@ namespace NYql::NDq {
352365
select.mutable_from()->Settable(LookupSource.table());
353366

354367
NConnector::NApi::TPredicate_TDisjunction disjunction;
355-
for (const auto& [k, _] : Request) {
368+
for (const auto& [k, _] : *Request) {
356369
NConnector::NApi::TPredicate_TConjunction conjunction;
357370
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
358371
NConnector::NApi::TPredicate_TComparison eq;
@@ -382,8 +395,7 @@ namespace NYql::NDq {
382395
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
383396
const std::vector<std::pair<EColumnDestination, size_t>> ColumnDestinations;
384397
const size_t MaxKeysInRequest;
385-
std::atomic_bool InProgress;
386-
IDqAsyncLookupSource::TUnboxedValueMap Request;
398+
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
387399
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
388400
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
389401
};

0 commit comments

Comments
 (0)