Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ydb/library/yql/providers/generic/actors/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
UNITTEST_FOR(ydb/library/yql/providers/generic/actors)

PEERDIR(
ydb/library/yql/sql/pg_dummy
ydb/library/yql/providers/generic/connector/libcpp/ut_helpers
ydb/library/actors/testlib
library/cpp/testing/unittest
)

SRCS(
yql_generic_lookup_actor_ut.cpp
)

YQL_LAST_ABI_VERSION()

END()
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>

#include <ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>

#include <ydb/library/actors/testlib/test_runtime.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h>
#include <ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>
#include <library/cpp/testing/unittest/registar.h>

#include <ydb/library/yql/utils/log/proto/logger_config.pb.h>
#include <ydb/library/yql/utils/log/log.h>

using namespace NYql::NConnector;
using namespace NYql::NConnector::NTest;
using namespace NYql;
using namespace NActors;

Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {

//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
public:
TCallLookupActor(
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::NDq::IDqAsyncLookupSource* lookupSource,
NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
: Alloc(alloc)
, LookupSource(lookupSource)
, KeysToLookUp(std::move(keysToLookUp))
{
}

void Bootstrap() {
LookupSource->AsyncLookup(std::move(KeysToLookUp));
auto guard = Guard(*Alloc);
KeysToLookUp.clear();
KeysToLookUp.shrink_to_fit();
}

private:
static constexpr char ActorName[] = "TEST";

private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NYql::NDq::IDqAsyncLookupSource* LookupSource;
NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
};

Y_UNIT_TEST(Lookup) {
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
NKikimr::NMiniKQL::TMemoryUsageInfo memUsage("TestMemUsage");
NKikimr::NMiniKQL::THolderFactory holderFactory(alloc->Ref(), memUsage);
NKikimr::NMiniKQL::TTypeEnvironment typeEnv(*alloc);
NKikimr::NMiniKQL::TTypeBuilder typeBuilder(typeEnv);

auto loggerConfig = NYql::NProto::TLoggingConfig();
loggerConfig.set_allcomponentslevel(::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
NYql::NLog::InitLogger(loggerConfig, false);

TTestActorRuntimeBase runtime;
runtime.Initialize();
auto edge = runtime.AllocateEdgeActor();

NYql::NConnector::NApi::TDataSourceInstance dsi;
dsi.Setkind(NYql::NConnector::NApi::EDataSourceKind::YDB);
dsi.mutable_endpoint()->Sethost("some_host");
dsi.mutable_endpoint()->Setport(2135);
dsi.Setdatabase("some_db");
dsi.Setuse_tls(true);
dsi.set_protocol(::NYql::NConnector::NApi::EProtocol::NATIVE);
auto token = dsi.mutable_credentials() -> mutable_token();
token->Settype("IAM");
token->Setvalue("TEST_TOKEN");

auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();

// clang-format off
// step 1: ListSplits
connectorMock->ExpectListSplits()
.Select()
.DataSourceInstance(dsi)
.What()
.Column("id", Ydb::Type::UINT64)
.NullableColumn("optional_id", Ydb::Type::UINT64)
.NullableColumn("string_value", Ydb::Type::STRING)
.Done()
.Table("lookup_test")
.Where()
.Filter()
.Disjunction()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
.Done()
.Done()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(1).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(101).Done().Done()
.Done()
.Done()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
.Done()
.Done()
.Done()
.Done()
.Done()
.Done()
.MaxSplitCount(1)
.Result()
.AddResponse(NewSuccess())
.Description("Actual split info is not important")
;

connectorMock->ExpectReadSplits()
.DataSourceInstance(dsi)
.Split()
.Description("Actual split info is not important")
.Done()
.Result()
.AddResponse(
MakeRecordBatch(
MakeArray<arrow::UInt64Builder, ui64>("id", {0, 1, 2}, arrow::uint64()),
MakeArray<arrow::UInt64Builder, ui64>("optional_id", {100, 101, 103}, arrow::uint64()), //the last value is intentially wrong
MakeArray<arrow::StringBuilder, std::string>("string_value", {"a", "b", "c"}, arrow::utf8())
),
NewSuccess()
)
;
// clang-format on

NYql::Generic::TLookupSource lookupSourceSettings;
*lookupSourceSettings.mutable_data_source_instance() = dsi;
lookupSourceSettings.Settable("lookup_test");
lookupSourceSettings.SetServiceAccountId("testsaid");
lookupSourceSettings.SetServiceAccountIdSignature("fake_signature");

google::protobuf::Any packedLookupSource;
Y_ABORT_UNLESS(packedLookupSource.PackFrom(lookupSourceSettings));

NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
keyTypeBuilder.Add("id", typeBuilder.NewDataType(NUdf::EDataSlot::Uint64, false));
keyTypeBuilder.Add("optional_id", typeBuilder.NewDataType(NUdf::EDataSlot::Uint64, true));
NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NUdf::EDataSlot::String, true));

auto guard = Guard(*alloc.get());

auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
connectorMock,
std::make_shared<NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
edge,
alloc,
std::move(lookupSourceSettings),
keyTypeBuilder.Build(),
outputypeBuilder.Build(),
typeEnv,
holderFactory,
1'000'000);
runtime.Register(actor);

NKikimr::NMiniKQL::TUnboxedValueVector keys;
for (size_t i = 0; i != 3; ++i) {
NUdf::TUnboxedValue* keyItems;
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
keyItems[0] = NUdf::TUnboxedValuePod(ui64(i));
keyItems[1] = NUdf::TUnboxedValuePod(ui64(100 + i));
keys.push_back(std::move(key));
}

guard.Release(); //let actors use alloc

auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys));
runtime.Register(callLookupActor);

auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
auto guard2 = Guard(*alloc.get());
NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);

UNIT_ASSERT_EQUAL(3, lookupResult.size());
{
auto& [k, v] = lookupResult[0];
UNIT_ASSERT_EQUAL(0, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(100, k.GetElement(1).Get<ui64>());
NUdf::TUnboxedValue val = v.GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
}
{
auto& [k, v] = lookupResult[1];
UNIT_ASSERT_EQUAL(1, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(101, k.GetElement(1).Get<ui64>());
NUdf::TUnboxedValue val = v.GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
}
{
auto& [k, v] = lookupResult[2];
UNIT_ASSERT_EQUAL(2, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(102, k.GetElement(1).Get<ui64>());
//this key was not found and reported as empty
UNIT_ASSERT(!v);
}
}

} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/generic/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ PEERDIR(
YQL_LAST_ABI_VERSION()

END()

RECURSE_FOR_TESTS(ut)
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ namespace NYql::NDq {
return MaxKeysInRequest;
}
void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) override {
auto guard = Guard(*Alloc);
CreateRequest(keys);
}

Expand Down Expand Up @@ -148,9 +149,8 @@ namespace NYql::NDq {
Y_ABORT_UNLESS(response.splits_size() == 1);
auto& split = response.splits(0);
NConnector::NApi::TReadSplitsRequest readRequest;
*readRequest.mutable_data_source_instance() = LookupSource.data_source_instance();
*readRequest.mutable_data_source_instance() = GetDataSourceInstanceWithToken();
*readRequest.add_splits() = split;
readRequest.Setmode(NConnector::NApi::TReadSplitsRequest_EMode::TReadSplitsRequest_EMode_ORDERED);
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
Expand Down Expand Up @@ -269,7 +269,6 @@ namespace NYql::NDq {
for (auto&& k : RequestedKeys) {
LookupResult.emplace_back(std::move(k), NUdf::TUnboxedValue{});
}
RequestedKeys.clear();
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(LookupResult));
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
LookupResult = {};
Expand Down Expand Up @@ -317,12 +316,17 @@ namespace NYql::NDq {
return result;
}

NConnector::NApi::TSelect CreateSelect(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) {
NConnector::NApi::TSelect select;
*select.mutable_data_source_instance() = LookupSource.data_source_instance();
NYql::NConnector::NApi::TDataSourceInstance GetDataSourceInstanceWithToken() const {
auto dsi = LookupSource.data_source_instance();
//Note: returned token may be stale and we have no way to check or recover here
//Consider to redesign ICredentialsProvider
TokenProvider->MaybeFillToken(*select.mutable_data_source_instance());
TokenProvider->MaybeFillToken(dsi);
return dsi;
}

NConnector::NApi::TSelect CreateSelect(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) {
NConnector::NApi::TSelect select;
*select.mutable_data_source_instance() = GetDataSourceInstanceWithToken();

for (ui32 i = 0; i != SelectResultType->GetMembersCount(); ++i) {
auto c = select.mutable_what()->add_items()->mutable_column();
Expand Down Expand Up @@ -378,7 +382,7 @@ namespace NYql::NDq {
std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateGenericLookupActor(
NConnector::IClient::TPtr connectorClient,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NActors::TActorId&& parentId,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::Generic::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace NYql::NDq {
CreateGenericLookupActor(
NConnector::IClient::TPtr connectorClient,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NActors::TActorId&& parentId,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::Generic::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,28 @@ namespace NYql::NConnector::NTest {

using namespace fmt::literals;

#define DEFINE_SIMPLE_TYPE_SETTER(T, primitiveTypeId, value_name) \
template <> \
void SetSimpleValue(const T& value, Ydb::TypedValue* proto) { \
proto->mutable_type()->set_type_id(::Ydb::Type::primitiveTypeId); \
proto->mutable_value()->Y_CAT(set_, value_name)(value); \
::Ydb::Type MakeYdbType(::Ydb::Type::PrimitiveTypeId primitiveType, bool optional) {
::Ydb::Type type;
if (optional) {
type.mutable_optional_type()->mutable_item()->Settype_id(primitiveType);
} else {
type.Settype_id(primitiveType);
}
return type;
}

#define DEFINE_SIMPLE_TYPE_SETTER(T, primitiveTypeId, value_name) \
template <> \
void SetSimpleValue(const T& value, Ydb::TypedValue* proto, bool optional) { \
*proto->mutable_type() = MakeYdbType(::Ydb::Type::primitiveTypeId, optional); \
proto->mutable_value()->Y_CAT(set_, value_name)(value); \
}

DEFINE_SIMPLE_TYPE_SETTER(bool, BOOL, bool_value);
DEFINE_SIMPLE_TYPE_SETTER(i32, INT32, int32_value);
DEFINE_SIMPLE_TYPE_SETTER(ui32, UINT32, uint32_value);
DEFINE_SIMPLE_TYPE_SETTER(i64, INT64, int64_value);
DEFINE_SIMPLE_TYPE_SETTER(ui64, UINT64, uint64_value);

void CreatePostgreSQLExternalDataSource(
const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
Expand Down
Loading