Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,40 +202,50 @@ struct IDqComputeActorAsyncOutput {
};

struct IDqAsyncLookupSource {
using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
using TUnboxedValueMap = THashMap<
NUdf::TUnboxedValue,
NUdf::TUnboxedValue,
NKikimr::NMiniKQL::TValueHasher,
NKikimr::NMiniKQL::TValueEqual,
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
>;
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TUnboxedValueVector&& keys)
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
: Alloc(alloc)
, Keys(std::move(keys))
, Request(std::move(request))
{
}
~TEvLookupRequest() {
auto guard = Guard(*Alloc);
Keys = NKikimr::NMiniKQL::TUnboxedValueVector{};
TKeyTypeHelper empty;
Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}

std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NKikimr::NMiniKQL::TUnboxedValueVector Keys;
TUnboxedValueMap Request;
};

struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TKeyPayloadPairVector&& data)
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
: Alloc(alloc)
, Data(std::move(data))
, Result(std::move(result))
{
}
~TEvLookupResult() {
auto guard = Guard(*Alloc.get());
Data = NKikimr::NMiniKQL::TKeyPayloadPairVector{};
TKeyTypeHelper empty;
Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}

std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NKikimr::NMiniKQL::TKeyPayloadPairVector Data;
TUnboxedValueMap Result;
};

virtual size_t GetMaxSupportedKeysInRequest() const = 0;
//Initiate lookup for requested keys
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
//Upon completion, results are sent in a TEvLookupResult to the preconfigured actor
virtual void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) = 0;
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
protected:
~IDqAsyncLookupSource() {}
};
Expand Down Expand Up @@ -266,6 +276,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {

struct TLookupSourceArguments {
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
NActors::TActorId ParentId;
google::protobuf::Any LookupSource; //provider specific data source
const NKikimr::NMiniKQL::TStructType* KeyType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TInputTransformStreamLookupBase
, LookupJoinColumns(std::move(lookupJoinColumns))
, InputRowType(inputRowType)
, LookupKeyType(lookupKeyType)
, KeyTypeHelper(lookupKeyType)
, KeyTypeHelper(std::make_shared<IDqAsyncLookupSource::TKeyTypeHelper>(lookupKeyType))
, LookupPayloadType(lookupPayloadType)
, OutputRowType(outputRowType)
, OutputRowColumnOrder(outputRowColumnOrder)
Expand All @@ -67,6 +67,7 @@ class TInputTransformStreamLookupBase
Become(&TInputTransformStreamLookupBase::StateFunc);
NDq::IDqAsyncIoFactory::TLookupSourceArguments lookupSourceArgs {
.Alloc = Alloc,
.KeyTypeHelper = KeyTypeHelper,
.ParentId = SelfId(),
.LookupSource = Settings.GetRightSource().GetLookupSource(),
.KeyType = LookupKeyType,
Expand All @@ -90,30 +91,15 @@ class TInputTransformStreamLookupBase

void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
auto guard = BindAllocator();
THashMap<
NUdf::TUnboxedValue,
NUdf::TUnboxedValue,
NKikimr::NMiniKQL::TValueHasher,
NKikimr::NMiniKQL::TValueEqual,
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
> map(
ev->Get()->Data.size(),
KeyTypeHelper.GetValueHash(),
KeyTypeHelper.GetValueEqual()
);
for (auto& r: ev->Get()->Data) {
Y_ABORT_UNLESS(r.first.IsBoxed());
Y_ABORT_UNLESS(!r.second || r.second.IsBoxed());
map.emplace(std::move(r));
}
const auto lookupResult = std::move(ev->Get()->Result);
while (!AwaitingQueue.empty()) {
const auto wideInputRow = AwaitingQueue.Head();
NUdf::TUnboxedValue* keyItems;
NUdf::TUnboxedValue lookupKey = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
keyItems[i] = wideInputRow[InputJoinColumns[i]];
}
auto lookupPayload = map.FindPtr(lookupKey);
auto lookupPayload = lookupResult.FindPtr(lookupKey);

NUdf::TUnboxedValue* outputRowItems;
NUdf::TUnboxedValue outputRow = HolderFactory.CreateDirectArrayHolder(OutputRowColumnOrder.size(), outputRowItems);
Expand Down Expand Up @@ -163,7 +149,7 @@ class TInputTransformStreamLookupBase
auto guard = BindAllocator();
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
InputFlow.Clear();
KeyTypeHelper = TKeyTypeHelper{};
KeyTypeHelper.reset();
NMiniKQL::TUnboxedValueBatch{}.swap(AwaitingQueue);
NMiniKQL::TUnboxedValueBatch{}.swap(ReadyQueue);
}
Expand All @@ -180,21 +166,21 @@ class TInputTransformStreamLookupBase
NUdf::TUnboxedValue* inputRowItems;
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
NKikimr::NMiniKQL::TUnboxedValueVector keysForLookup;
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
while (
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok) &&
(keysForLookup.size() < maxKeysInRequest)
) {
NUdf::TUnboxedValue* keyItems;
auto key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
keyItems[i] = inputRowItems[InputJoinColumns[i]];
}
keysForLookup.push_back(key);
keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{});
AwaitingQueue.PushRow(inputRowItems, InputRowType->GetElementsCount());
}
if (!keysForLookup.empty()) {
LookupSource.first->AsyncLookup(keysForLookup);
LookupSource.first->AsyncLookup(std::move(keysForLookup));
WaitingForLookupResults = true;
}
}
Expand Down Expand Up @@ -241,8 +227,7 @@ class TInputTransformStreamLookupBase
const TVector<size_t> LookupJoinColumns;
const NMiniKQL::TMultiType* const InputRowType;
const NMiniKQL::TStructType* const LookupKeyType; //key column types in LookupTable
using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
TKeyTypeHelper KeyTypeHelper;
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
const NMiniKQL::TStructType* const LookupPayloadType; //other column types in LookupTable
const NMiniKQL::TMultiType* const OutputRowType;
const TOutputRowColumnOrder OutputRowColumnOrder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,30 @@ using namespace NActors;

Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {

NYql::NUdf::TUnboxedValue CreateStructValue(NKikimr::NMiniKQL::THolderFactory& holderFactory, std::initializer_list<ui64> members) {
NYql::NUdf::TUnboxedValue* items;
NYql::NUdf::TUnboxedValue result = holderFactory.CreateDirectArrayHolder(members.size(), items);
for (size_t i = 0; i != members.size(); ++i) {
items[i] = NYql::NUdf::TUnboxedValuePod{*(members.begin() + i)};
}
return result;
}

//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
public:
TCallLookupActor(
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::NDq::IDqAsyncLookupSource* lookupSource,
NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
: Alloc(alloc)
, LookupSource(lookupSource)
, KeysToLookUp(std::move(keysToLookUp))
, Request(std::move(request))
{
}

void Bootstrap() {
LookupSource->AsyncLookup(std::move(KeysToLookUp));
auto guard = Guard(*Alloc);
KeysToLookUp.clear();
KeysToLookUp.shrink_to_fit();
LookupSource->AsyncLookup(std::move(Request));
}

private:
Expand All @@ -47,7 +53,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NYql::NDq::IDqAsyncLookupSource* LookupSource;
NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
};

Y_UNIT_TEST(Lookup) {
Expand Down Expand Up @@ -94,8 +100,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
.Disjunction()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
.Done()
.Done()
.Operand()
Expand All @@ -106,8 +112,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
.Done()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
.Done()
.Done()
.Done()
Expand Down Expand Up @@ -153,12 +159,14 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true));

auto guard = Guard(*alloc.get());
auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());

auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
connectorMock,
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
edge,
alloc,
keyTypeHelper,
std::move(lookupSourceSettings),
keyTypeBuilder.Build(),
outputypeBuilder.Build(),
Expand All @@ -167,45 +175,41 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
1'000'000);
runtime.Register(actor);

NKikimr::NMiniKQL::TUnboxedValueVector keys;
NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
for (size_t i = 0; i != 3; ++i) {
NYql::NUdf::TUnboxedValue* keyItems;
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
keys.push_back(std::move(key));
request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
}

guard.Release(); //let actors use alloc

auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys));
auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
runtime.Register(callLookupActor);

auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
auto guard2 = Guard(*alloc.get());
NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);
auto lookupResult = std::move(ev->Get()->Result);

UNIT_ASSERT_EQUAL(3, lookupResult.size());
{
auto& [k, v] = lookupResult[0];
UNIT_ASSERT_EQUAL(0, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(100, k.GetElement(1).Get<ui64>());
NYql::NUdf::TUnboxedValue val = v.GetElement(0);
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100}));
UNIT_ASSERT(v);
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
}
{
auto& [k, v] = lookupResult[1];
UNIT_ASSERT_EQUAL(1, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(101, k.GetElement(1).Get<ui64>());
NYql::NUdf::TUnboxedValue val = v.GetElement(0);
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101}));
UNIT_ASSERT(v);
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
}
{
auto& [k, v] = lookupResult[2];
UNIT_ASSERT_EQUAL(2, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(102, k.GetElement(1).Get<ui64>());
//this key was not found and reported as empty
UNIT_ASSERT(!v);
const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102}));
UNIT_ASSERT(v);
UNIT_ASSERT(!*v);
}
}

Expand Down
Loading