Skip to content

Commit 639bcb2

Browse files
authored
Merge 65a239f into d2299da
2 parents d2299da + 65a239f commit 639bcb2

File tree

6 files changed

+130
-97
lines changed

6 files changed

+130
-97
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -213,41 +213,26 @@ struct IDqAsyncLookupSource {
213213
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
214214
>;
215215
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
216-
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
217-
: Alloc(alloc)
218-
, Request(std::move(request))
216+
TEvLookupRequest(std::weak_ptr<TUnboxedValueMap> request)
217+
: Request(request)
219218
{
220219
}
221-
~TEvLookupRequest() {
222-
auto guard = Guard(*Alloc);
223-
TKeyTypeHelper empty;
224-
Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
225-
}
226-
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
227-
TUnboxedValueMap Request;
220+
std::weak_ptr<TUnboxedValueMap> Request;
228221
};
229222

230223
struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
231-
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
232-
: Alloc(alloc)
233-
, Result(std::move(result))
224+
TEvLookupResult(std::weak_ptr<TUnboxedValueMap> result)
225+
: Result(result)
234226
{
235227
}
236-
~TEvLookupResult() {
237-
auto guard = Guard(*Alloc.get());
238-
TKeyTypeHelper empty;
239-
Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
240-
}
241-
242-
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
243-
TUnboxedValueMap Result;
228+
std::weak_ptr<TUnboxedValueMap> Result;
244229
};
245230

246231
virtual size_t GetMaxSupportedKeysInRequest() const = 0;
247232
//Initiate lookup for requested keys
248233
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
249-
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
250-
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
234+
//Upon completion, TEvLookupResult event is sent to the preconfigured actor
235+
virtual void AsyncLookup(std::weak_ptr<TUnboxedValueMap> request) = 0;
251236
protected:
252237
~IDqAsyncLookupSource() {}
253238
};

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ class TInputTransformStreamLookupBase
6565
, LruCache(std::make_unique<NKikimr::NMiniKQL::TUnboxedKeyValueLruCacheWithTtl>(cacheLimit, lookupKeyType))
6666
, CacheTtl(cacheTtl)
6767
, ReadyQueue(OutputRowType)
68-
, WaitingForLookupResults(false)
6968
{
7069
Y_ABORT_UNLESS(Alloc);
7170
for (size_t i = 0; i != LookupInputIndexes.size(); ++i) {
@@ -91,8 +90,9 @@ class TInputTransformStreamLookupBase
9190
.MaxKeysInRequest = 1000 // TODO configure me
9291
};
9392
auto guard = Guard(*Alloc);
94-
LookupSource = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
95-
RegisterWithSameMailbox(LookupSource.second);
93+
auto LookupSource = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
94+
MaxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
95+
LookupSourceId = RegisterWithSameMailbox(LookupSource.second);
9696
}
9797
protected:
9898
virtual NUdf::EFetchStatus FetchWideInputValue(NUdf::TUnboxedValue* inputRowItems) = 0;
@@ -134,19 +134,20 @@ class TInputTransformStreamLookupBase
134134
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
135135
auto guard = BindAllocator();
136136
const auto now = std::chrono::steady_clock::now();
137-
auto lookupResult = std::move(ev->Get()->Result);
137+
auto lookupResult = ev->Get()->Result.lock();
138+
Y_ABORT_UNLESS(lookupResult == KeysForLookup);
138139
for (; !AwaitingQueue.empty(); AwaitingQueue.pop_front()) {
139140
auto& [lookupKey, inputOther] = AwaitingQueue.front();
140-
auto lookupPayload = lookupResult.FindPtr(lookupKey);
141+
auto lookupPayload = lookupResult->FindPtr(lookupKey);
141142
if (lookupPayload == nullptr) {
142143
continue;
143144
}
144145
AddReadyQueue(lookupKey, inputOther, lookupPayload);
145146
}
146-
for (auto&& [k, v]: lookupResult) {
147+
for (auto&& [k, v]: *lookupResult) {
147148
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
148149
}
149-
WaitingForLookupResults = false;
150+
KeysForLookup.reset();
150151
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
151152
}
152153

@@ -165,9 +166,10 @@ class TInputTransformStreamLookupBase
165166
}
166167

167168
void PassAway() final {
168-
Send(LookupSource.second->SelfId(), new NActors::TEvents::TEvPoison{});
169+
Send(LookupSourceId, new NActors::TEvents::TEvPoison{});
169170
auto guard = BindAllocator();
170171
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
172+
KeysForLookup.reset();
171173
InputFlow.Clear();
172174
KeyTypeHelper.reset();
173175
decltype(AwaitingQueue){}.swap(AwaitingQueue);
@@ -188,15 +190,14 @@ class TInputTransformStreamLookupBase
188190

189191
DrainReadyQueue(batch);
190192

191-
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !WaitingForLookupResults) {
193+
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !KeysForLookup) {
192194
NUdf::TUnboxedValue* inputRowItems;
193195
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
194196
const auto now = std::chrono::steady_clock::now();
195-
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
196-
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
197+
KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual());
197198
LruCache->Prune(now);
198199
while (
199-
(keysForLookup.size() < maxKeysInRequest) &&
200+
(KeysForLookup->size() < MaxKeysInRequest) &&
200201
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok)) {
201202
NUdf::TUnboxedValue* keyItems;
202203
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(LookupInputIndexes.size(), keyItems);
@@ -212,12 +213,13 @@ class TInputTransformStreamLookupBase
212213
AddReadyQueue(key, other, &*lookupPayload);
213214
} else {
214215
AwaitingQueue.emplace_back(key, std::move(other));
215-
keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{});
216+
KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{});
216217
}
217218
}
218-
if (!keysForLookup.empty()) {
219-
LookupSource.first->AsyncLookup(std::move(keysForLookup));
220-
WaitingForLookupResults = true;
219+
if (!KeysForLookup->empty()) {
220+
Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup));
221+
} else {
222+
KeysForLookup.reset();
221223
}
222224
DrainReadyQueue(batch);
223225
}
@@ -259,7 +261,8 @@ class TInputTransformStreamLookupBase
259261
IDqAsyncIoFactory::TPtr Factory;
260262
NDqProto::TDqInputTransformLookupSettings Settings;
261263
protected:
262-
std::pair<IDqAsyncLookupSource*, NActors::IActor*> LookupSource;
264+
NActors::TActorId LookupSourceId;
265+
size_t MaxKeysInRequest;
263266
const TVector<size_t> LookupInputIndexes;
264267
const TVector<size_t> OtherInputIndexes;
265268
const NMiniKQL::TMultiType* const InputRowType;
@@ -276,8 +279,8 @@ class TInputTransformStreamLookupBase
276279
using TAwaitingQueue = std::deque<TInputKeyOtherPair, NKikimr::NMiniKQL::TMKQLAllocator<TInputKeyOtherPair>>; //input row split in two parts: key columns and other columns
277280
TAwaitingQueue AwaitingQueue;
278281
NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue;
279-
std::atomic<bool> WaitingForLookupResults;
280282
NYql::NDq::TDqAsyncStats IngressStats;
283+
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> KeysForLookup;
281284
};
282285

283286
class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase {

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,24 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
3636
TCallLookupActor(
3737
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
3838
NYql::NDq::IDqAsyncLookupSource* lookupSource,
39-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
39+
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> request)
4040
: Alloc(alloc)
4141
, LookupSource(lookupSource)
42-
, Request(std::move(request))
42+
, Request(request)
4343
{
4444
}
4545

4646
void Bootstrap() {
47-
LookupSource->AsyncLookup(std::move(Request));
47+
LookupSource->AsyncLookup(Request);
48+
}
49+
50+
void PassAway() override {
51+
auto guard = Guard(*Alloc);
52+
Request.reset();
53+
}
54+
55+
~TCallLookupActor() {
56+
PassAway();
4857
}
4958

5059
private:
@@ -53,7 +62,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
5362
private:
5463
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
5564
NYql::NDq::IDqAsyncLookupSource* LookupSource;
56-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
65+
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> Request;
5766
};
5867

5968
Y_UNIT_TEST(Lookup) {
@@ -175,39 +184,40 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
175184
1'000'000);
176185
runtime.Register(actor);
177186

178-
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
187+
auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
179188
for (size_t i = 0; i != 3; ++i) {
180189
NYql::NUdf::TUnboxedValue* keyItems;
181190
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
182191
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
183192
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
184-
request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
193+
request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
185194
}
186195

187196
guard.Release(); // let actors use alloc
188197

189-
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
198+
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, request);
190199
runtime.Register(callLookupActor);
191200

192201
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
193202
auto guard2 = Guard(*alloc.get());
194-
auto lookupResult = std::move(ev->Get()->Result);
203+
auto lookupResult = std::move(ev->Get()->Result.lock());
204+
UNIT_ASSERT(lookupResult);
195205

196-
UNIT_ASSERT_EQUAL(3, lookupResult.size());
206+
UNIT_ASSERT_EQUAL(3, lookupResult->size());
197207
{
198-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100}));
208+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}));
199209
UNIT_ASSERT(v);
200210
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
201211
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
202212
}
203213
{
204-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101}));
214+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}));
205215
UNIT_ASSERT(v);
206216
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
207217
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
208218
}
209219
{
210-
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102}));
220+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}));
211221
UNIT_ASSERT(v);
212222
UNIT_ASSERT(!*v);
213223
}

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,20 @@ namespace NYql::NDq {
8484
, HolderFactory(holderFactory)
8585
, ColumnDestinations(CreateColumnDestination())
8686
, MaxKeysInRequest(maxKeysInRequest)
87-
, Request(
88-
0,
89-
KeyTypeHelper->GetValueHash(),
90-
KeyTypeHelper->GetValueEqual())
9187
{
9288
}
9389

9490
~TGenericLookupActor() {
91+
Free();
92+
}
93+
94+
private:
95+
void Free() {
9596
auto guard = Guard(*Alloc);
97+
Request.reset();
9698
KeyTypeHelper.reset();
97-
TKeyTypeHelper empty;
98-
Request = IDqAsyncLookupSource::TUnboxedValueMap(0, empty.GetValueHash(), empty.GetValueEqual());
9999
}
100+
public:
100101

101102
void Bootstrap() {
102103
auto dsi = LookupSource.data_source_instance();
@@ -116,13 +117,18 @@ namespace NYql::NDq {
116117
size_t GetMaxSupportedKeysInRequest() const override {
117118
return MaxKeysInRequest;
118119
}
119-
void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override {
120+
void AsyncLookup(std::weak_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) override {
120121
auto guard = Guard(*Alloc);
121-
CreateRequest(std::move(request));
122+
CreateRequest(request.lock());
123+
}
124+
void PassAway() override {
125+
Free();
126+
TBase::PassAway();
122127
}
123128

124129
private: // events
125130
STRICT_STFUNC(StateFunc,
131+
hFunc(TEvLookupRequest, Handle);
126132
hFunc(TEvListSplitsIterator, Handle);
127133
hFunc(TEvListSplitsPart, Handle);
128134
hFunc(TEvReadSplitsIterator, Handle);
@@ -196,11 +202,18 @@ namespace NYql::NDq {
196202
PassAway();
197203
}
198204

205+
void Handle(TEvLookupRequest::TPtr ev) {
206+
auto guard = Guard(*Alloc);
207+
CreateRequest(ev->Get()->Request.lock());
208+
}
209+
199210
private:
200-
void CreateRequest(IDqAsyncLookupSource::TUnboxedValueMap&& request) {
201-
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys";
202-
Y_ABORT_IF(InProgress);
203-
Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest);
211+
void CreateRequest(std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) {
212+
if (!request) {
213+
return;
214+
}
215+
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
216+
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);
204217

205218
Request = std::move(request);
206219
NConnector::NApi::TListSplitsRequest splitRequest;
@@ -272,20 +285,20 @@ namespace NYql::NDq {
272285
for (size_t j = 0; j != columns.size(); ++j) {
273286
(ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i];
274287
}
275-
if (auto* v = Request.FindPtr(key)) {
288+
if (auto* v = Request->FindPtr(key)) {
276289
*v = std::move(output); // duplicates will be overwritten
277290
}
278291
}
279292
}
280293

281294
void FinalizeRequest() {
282-
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request.size() << " keys";
295+
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys";
283296
auto guard = Guard(*Alloc);
284-
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(Request));
297+
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request);
298+
Request.reset();
285299
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
286300
LookupResult = {};
287301
ReadSplitsIterator = {};
288-
InProgress = false;
289302
}
290303

291304
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NConnector::NApi::TError& error) {
@@ -351,7 +364,7 @@ namespace NYql::NDq {
351364
select.mutable_from()->Settable(LookupSource.table());
352365

353366
NConnector::NApi::TPredicate_TDisjunction disjunction;
354-
for (const auto& [k, _] : Request) {
367+
for (const auto& [k, _] : *Request) {
355368
NConnector::NApi::TPredicate_TConjunction conjunction;
356369
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
357370
NConnector::NApi::TPredicate_TComparison eq;
@@ -381,8 +394,7 @@ namespace NYql::NDq {
381394
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
382395
const std::vector<std::pair<EColumnDestination, size_t>> ColumnDestinations;
383396
const size_t MaxKeysInRequest;
384-
std::atomic_bool InProgress;
385-
IDqAsyncLookupSource::TUnboxedValueMap Request;
397+
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
386398
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
387399
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
388400
};

0 commit comments

Comments
 (0)