Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,41 +213,26 @@ struct IDqAsyncLookupSource {
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
>;
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
: Alloc(alloc)
, Request(std::move(request))
TEvLookupRequest(std::weak_ptr<TUnboxedValueMap> request)
: Request(std::move(request))
{
}
~TEvLookupRequest() {
auto guard = Guard(*Alloc);
TKeyTypeHelper empty;
Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
TUnboxedValueMap Request;
std::weak_ptr<TUnboxedValueMap> Request;
};

struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
: Alloc(alloc)
, Result(std::move(result))
TEvLookupResult(std::weak_ptr<TUnboxedValueMap> result)
: Result(std::move(result))
{
}
~TEvLookupResult() {
auto guard = Guard(*Alloc.get());
TKeyTypeHelper empty;
Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}

std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
TUnboxedValueMap Result;
std::weak_ptr<TUnboxedValueMap> Result;
};

virtual size_t GetMaxSupportedKeysInRequest() const = 0;
//Initiate lookup for requested keys
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
//Upon completion, TEvLookupResult event is sent to the preconfigured actor
virtual void AsyncLookup(std::weak_ptr<TUnboxedValueMap> request) = 0;
protected:
~IDqAsyncLookupSource() {}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class TInputTransformStreamLookupBase
, LruCache(std::make_unique<NKikimr::NMiniKQL::TUnboxedKeyValueLruCacheWithTtl>(cacheLimit, lookupKeyType))
, CacheTtl(cacheTtl)
, ReadyQueue(OutputRowType)
, WaitingForLookupResults(false)
{
Y_ABORT_UNLESS(Alloc);
for (size_t i = 0; i != LookupInputIndexes.size(); ++i) {
Expand All @@ -91,8 +90,9 @@ class TInputTransformStreamLookupBase
.MaxKeysInRequest = 1000 // TODO configure me
};
auto guard = Guard(*Alloc);
LookupSource = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
RegisterWithSameMailbox(LookupSource.second);
auto [lookupSource, lookupSourceActor] = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
MaxKeysInRequest = lookupSource->GetMaxSupportedKeysInRequest();
LookupSourceId = RegisterWithSameMailbox(lookupSourceActor);
}
protected:
virtual NUdf::EFetchStatus FetchWideInputValue(NUdf::TUnboxedValue* inputRowItems) = 0;
Expand Down Expand Up @@ -134,19 +134,20 @@ class TInputTransformStreamLookupBase
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
auto guard = BindAllocator();
const auto now = std::chrono::steady_clock::now();
auto lookupResult = std::move(ev->Get()->Result);
auto lookupResult = ev->Get()->Result.lock();
Y_ABORT_UNLESS(lookupResult == KeysForLookup);
for (; !AwaitingQueue.empty(); AwaitingQueue.pop_front()) {
auto& [lookupKey, inputOther] = AwaitingQueue.front();
auto lookupPayload = lookupResult.FindPtr(lookupKey);
auto lookupPayload = lookupResult->FindPtr(lookupKey);
if (lookupPayload == nullptr) {
continue;
}
AddReadyQueue(lookupKey, inputOther, lookupPayload);
}
for (auto&& [k, v]: lookupResult) {
for (auto&& [k, v]: *lookupResult) {
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
}
WaitingForLookupResults = false;
KeysForLookup.reset();
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
}

Expand All @@ -165,9 +166,10 @@ class TInputTransformStreamLookupBase
}

void PassAway() final {
Send(LookupSource.second->SelfId(), new NActors::TEvents::TEvPoison{});
Send(LookupSourceId, new NActors::TEvents::TEvPoison{});
auto guard = BindAllocator();
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
KeysForLookup.reset();
InputFlow.Clear();
KeyTypeHelper.reset();
decltype(AwaitingQueue){}.swap(AwaitingQueue);
Expand All @@ -188,15 +190,14 @@ class TInputTransformStreamLookupBase

DrainReadyQueue(batch);

if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !WaitingForLookupResults) {
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !KeysForLookup) {
NUdf::TUnboxedValue* inputRowItems;
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
const auto now = std::chrono::steady_clock::now();
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual());
LruCache->Prune(now);
while (
(keysForLookup.size() < maxKeysInRequest) &&
(KeysForLookup->size() < MaxKeysInRequest) &&
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok)) {
NUdf::TUnboxedValue* keyItems;
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(LookupInputIndexes.size(), keyItems);
Expand All @@ -212,12 +213,13 @@ class TInputTransformStreamLookupBase
AddReadyQueue(key, other, &*lookupPayload);
} else {
AwaitingQueue.emplace_back(key, std::move(other));
keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{});
KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{});
}
}
if (!keysForLookup.empty()) {
LookupSource.first->AsyncLookup(std::move(keysForLookup));
WaitingForLookupResults = true;
if (!KeysForLookup->empty()) {
Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup));
} else {
KeysForLookup.reset();
}
DrainReadyQueue(batch);
}
Expand Down Expand Up @@ -259,7 +261,8 @@ class TInputTransformStreamLookupBase
IDqAsyncIoFactory::TPtr Factory;
NDqProto::TDqInputTransformLookupSettings Settings;
protected:
std::pair<IDqAsyncLookupSource*, NActors::IActor*> LookupSource;
NActors::TActorId LookupSourceId;
size_t MaxKeysInRequest;
const TVector<size_t> LookupInputIndexes;
const TVector<size_t> OtherInputIndexes;
const NMiniKQL::TMultiType* const InputRowType;
Expand All @@ -276,8 +279,8 @@ class TInputTransformStreamLookupBase
using TAwaitingQueue = std::deque<TInputKeyOtherPair, NKikimr::NMiniKQL::TMKQLAllocator<TInputKeyOtherPair>>; //input row split in two parts: key columns and other columns
TAwaitingQueue AwaitingQueue;
NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue;
std::atomic<bool> WaitingForLookupResults;
NYql::NDq::TDqAsyncStats IngressStats;
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> KeysForLookup;
};

class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,35 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
public:
TCallLookupActor(
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::NDq::IDqAsyncLookupSource* lookupSource,
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
const NActors::TActorId& lookupActor,
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> request)
: Alloc(alloc)
, LookupSource(lookupSource)
, Request(std::move(request))
, LookupActor(lookupActor)
, Request(request)
{
}

void Bootstrap() {
LookupSource->AsyncLookup(std::move(Request));
auto ev = new NYql::NDq::IDqAsyncLookupSource::TEvLookupRequest(Request);
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(LookupActor, SelfId(), ev));
}

void PassAway() override {
auto guard = Guard(*Alloc);
Request.reset();
}

~TCallLookupActor() {
PassAway();
}

private:
static constexpr char ActorName[] = "TEST";

private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NYql::NDq::IDqAsyncLookupSource* LookupSource;
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
const NActors::TActorId LookupActor;
std::shared_ptr<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap> Request;
};

Y_UNIT_TEST(Lookup) {
Expand Down Expand Up @@ -173,41 +183,42 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
typeEnv,
holderFactory,
1'000'000);
runtime.Register(actor);
auto lookupActor = runtime.Register(actor);

NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
for (size_t i = 0; i != 3; ++i) {
NYql::NUdf::TUnboxedValue* keyItems;
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
}

guard.Release(); // let actors use alloc

auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
auto callLookupActor = new TCallLookupActor(alloc, lookupActor, request);
runtime.Register(callLookupActor);

auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
auto guard2 = Guard(*alloc.get());
auto lookupResult = std::move(ev->Get()->Result);
auto lookupResult = ev->Get()->Result.lock();
UNIT_ASSERT(lookupResult);

UNIT_ASSERT_EQUAL(3, lookupResult.size());
UNIT_ASSERT_EQUAL(3, lookupResult->size());
{
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100}));
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}));
UNIT_ASSERT(v);
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
}
{
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101}));
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}));
UNIT_ASSERT(v);
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
}
{
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102}));
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}));
UNIT_ASSERT(v);
UNIT_ASSERT(!*v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,20 @@ namespace NYql::NDq {
, HolderFactory(holderFactory)
, ColumnDestinations(CreateColumnDestination())
, MaxKeysInRequest(maxKeysInRequest)
, Request(
0,
KeyTypeHelper->GetValueHash(),
KeyTypeHelper->GetValueEqual())
{
}

~TGenericLookupActor() {
Free();
}

private:
void Free() {
auto guard = Guard(*Alloc);
Request.reset();
KeyTypeHelper.reset();
TKeyTypeHelper empty;
Request = IDqAsyncLookupSource::TUnboxedValueMap(0, empty.GetValueHash(), empty.GetValueEqual());
}
public:

void Bootstrap() {
auto dsi = LookupSource.data_source_instance();
Expand All @@ -116,13 +117,18 @@ namespace NYql::NDq {
size_t GetMaxSupportedKeysInRequest() const override {
return MaxKeysInRequest;
}
void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override {
void AsyncLookup(std::weak_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) override {
auto guard = Guard(*Alloc);
CreateRequest(std::move(request));
CreateRequest(request.lock());
}
void PassAway() override {
Free();
TBase::PassAway();
}

private: // events
STRICT_STFUNC(StateFunc,
hFunc(TEvLookupRequest, Handle);
hFunc(TEvListSplitsIterator, Handle);
hFunc(TEvListSplitsPart, Handle);
hFunc(TEvReadSplitsIterator, Handle);
Expand Down Expand Up @@ -196,11 +202,18 @@ namespace NYql::NDq {
PassAway();
}

void Handle(TEvLookupRequest::TPtr ev) {
auto guard = Guard(*Alloc);
CreateRequest(ev->Get()->Request.lock());
}

private:
void CreateRequest(IDqAsyncLookupSource::TUnboxedValueMap&& request) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys";
Y_ABORT_IF(InProgress);
Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest);
void CreateRequest(std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> request) {
if (!request) {
return;
}
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);

Request = std::move(request);
NConnector::NApi::TListSplitsRequest splitRequest;
Expand Down Expand Up @@ -272,20 +285,20 @@ namespace NYql::NDq {
for (size_t j = 0; j != columns.size(); ++j) {
(ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i];
}
if (auto* v = Request.FindPtr(key)) {
if (auto* v = Request->FindPtr(key)) {
*v = std::move(output); // duplicates will be overwritten
}
}
}

void FinalizeRequest() {
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request.size() << " keys";
YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys";
auto guard = Guard(*Alloc);
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(Request));
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request);
Request.reset();
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
LookupResult = {};
ReadSplitsIterator = {};
InProgress = false;
}

static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NConnector::NApi::TError& error) {
Expand Down Expand Up @@ -351,7 +364,7 @@ namespace NYql::NDq {
select.mutable_from()->Settable(LookupSource.table());

NConnector::NApi::TPredicate_TDisjunction disjunction;
for (const auto& [k, _] : Request) {
for (const auto& [k, _] : *Request) {
NConnector::NApi::TPredicate_TConjunction conjunction;
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
NConnector::NApi::TPredicate_TComparison eq;
Expand Down Expand Up @@ -381,8 +394,7 @@ namespace NYql::NDq {
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
const std::vector<std::pair<EColumnDestination, size_t>> ColumnDestinations;
const size_t MaxKeysInRequest;
std::atomic_bool InProgress;
IDqAsyncLookupSource::TUnboxedValueMap Request;
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
};
Expand Down
Loading