Skip to content

YQ Connector: YQ-2715: add ydb data source #753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
{
ToString(NYql::EDatabaseType::PostgreSQL),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"}, {"database_name", "protocol", "mdb_cluster_id", "use_tls", "schema"}, hostnamePatternsRegEx)
},
{
ToString(NYql::EDatabaseType::Ydb),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC", "SERVICE_ACCOUNT"}, {"database_name", "use_tls", "database_id"}, hostnamePatternsRegEx)
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace NKikimr::NKqp {
enum class EProviderType {
PostgreSQL,
ClickHouse,
Ydb,
};

NApi::TDataSourceInstance MakeDataSourceInstance(EProviderType providerType) {
Expand All @@ -39,6 +40,8 @@ namespace NKikimr::NKqp {
return TConnectorClientMock::TPostgreSQLDataSourceInstanceBuilder<>().GetResult();
case EProviderType::ClickHouse:
return TConnectorClientMock::TClickHouseDataSourceInstanceBuilder<>().GetResult();
case EProviderType::Ydb:
return TConnectorClientMock::TYdbDataSourceInstanceBuilder<>().GetResult();
}
}

Expand All @@ -48,6 +51,8 @@ namespace NKikimr::NKqp {
return CreatePostgreSQLExternalDataSource(kikimr);
case EProviderType::ClickHouse:
return CreateClickHouseExternalDataSource(kikimr);
case EProviderType::Ydb:
return CreateYdbExternalDataSource(kikimr);
}
}

Expand Down Expand Up @@ -165,6 +170,10 @@ namespace NKikimr::NKqp {
TestSelectAllFields(EProviderType::ClickHouse);
}

Y_UNIT_TEST(YdbManaged) {
TestSelectAllFields(EProviderType::Ydb);
}

void TestSelectConstant(EProviderType providerType) {
// prepare mock
auto clientMock = std::make_shared<TConnectorClientMock>();
Expand Down Expand Up @@ -257,6 +266,10 @@ namespace NKikimr::NKqp {
TestSelectConstant(EProviderType::ClickHouse);
}

Y_UNIT_TEST(YdbManagedSelectConstant) {
TestSelectConstant(EProviderType::Ydb);
}

void TestSelectCount(EProviderType providerType) {
// prepare mock
auto clientMock = std::make_shared<TConnectorClientMock>();
Expand Down Expand Up @@ -345,6 +358,10 @@ namespace NKikimr::NKqp {
TestSelectCount(EProviderType::ClickHouse);
}

Y_UNIT_TEST(YdbSelectCount) {
TestSelectCount(EProviderType::Ydb);
}

void TestFilterPushdown(EProviderType providerType) {
// prepare mock
auto clientMock = std::make_shared<TConnectorClientMock>();
Expand Down Expand Up @@ -450,5 +467,9 @@ namespace NKikimr::NKqp {
Y_UNIT_TEST(ClickHouseFilterPushdown) {
TestFilterPushdown(EProviderType::ClickHouse);
}

Y_UNIT_TEST(YdbFilterPushdown) {
TestFilterPushdown(EProviderType::Ydb);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ inline EDatabaseType DatabaseTypeFromDataSourceKind(NConnector::NApi::EDataSourc
return EDatabaseType::PostgreSQL;
case NConnector::NApi::EDataSourceKind::CLICKHOUSE:
return EDatabaseType::ClickHouse;
case NConnector::NApi::EDataSourceKind::YDB:
return EDatabaseType::Ydb;
default:
ythrow yexception() << "Unknown data source kind: " << NConnector::NApi::EDataSourceKind_Name(dataSourceKind);
}
Expand All @@ -33,6 +35,8 @@ inline NConnector::NApi::EDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseT
return NConnector::NApi::EDataSourceKind::POSTGRESQL;
case EDatabaseType::ClickHouse:
return NConnector::NApi::EDataSourceKind::CLICKHOUSE;
case EDatabaseType::Ydb:
return NConnector::NApi::EDataSourceKind::YDB;
default:
ythrow yexception() << "Unknown database type: " << ToString(databaseType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace NYql::NDq {
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
};

for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric"}) {
for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) {
factory.RegisterSource<Generic::TSource>(sourceName, genericFactory);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,42 @@ namespace NYql::NConnector::NTest {
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

void CreateYdbExternalDataSource(
const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
const TString& dataSourceName,
const TString& login,
const TString& password,
const TString& endpoint,
bool useTls,
const TString& databaseName)
{
auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(
R"(
CREATE OBJECT {data_source_name}_password (TYPE SECRET) WITH (value={password});

CREATE EXTERNAL DATA SOURCE {data_source_name} WITH (
SOURCE_TYPE="{source_type}",
LOCATION="{endpoint}",
AUTH_METHOD="BASIC",
LOGIN="{login}",
DATABASE_NAME="{database}",
PASSWORD_SECRET_NAME="{data_source_name}_password",
USE_TLS="{use_tls}"
);
)",
"data_source_name"_a = dataSourceName,
"login"_a = login,
"password"_a = password,
"use_tls"_a = useTls ? "TRUE" : "FALSE",
"source_type"_a = ToString(NYql::EDatabaseType::Ydb),
"endpoint"_a = endpoint,
"database"_a = databaseName);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

std::shared_ptr<arrow::RecordBatch> MakeEmptyRecordBatch(size_t rowsCount) {
return arrow::RecordBatch::Make(
std::make_shared<arrow::Schema>(arrow::FieldVector()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ namespace NYql::NConnector::NTest {
return TClickHouseDataSourceInstanceBuilder<TBuilder>( \
this->Result_->mutable_data_source_instance(), \
static_cast<TBuilder*>(this)); \
} \
TYdbDataSourceInstanceBuilder<TBuilder> YdbDataSourceInstance() { \
return TYdbDataSourceInstanceBuilder<TBuilder>( \
this->Result_->mutable_data_source_instance(), \
static_cast<TBuilder*>(this)); \
}

MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") {
Expand Down Expand Up @@ -200,6 +205,15 @@ namespace NYql::NConnector::NTest {
const TString& serviceAccountIdSignature = DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE,
const TString& databaseName = DEFAULT_DATABASE);

void CreateYdbExternalDataSource(
const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
const TString& dataSourceName = DEFAULT_DATA_SOURCE_NAME,
const TString& login = DEFAULT_LOGIN,
const TString& password = DEFAULT_PASSWORD,
const TString& endpoint = DEFAULT_YDB_ENDPOINT,
bool useTls = DEFAULT_USE_TLS,
const TString& databaseName = DEFAULT_DATABASE);

class TConnectorClientMock: public NYql::NConnector::IClient {
public:
MOCK_METHOD(TResult<NApi::TDescribeTableResponse>, DescribeTableImpl, (const NApi::TDescribeTableRequest& request));
Expand Down Expand Up @@ -277,6 +291,25 @@ namespace NYql::NConnector::NTest {
}
};

template <class TParent = void /* no parent by default */>
struct TYdbDataSourceInstanceBuilder: public TBaseDataSourceInstanceBuilder<TYdbDataSourceInstanceBuilder<TParent>, TParent> {
using TBase = TBaseDataSourceInstanceBuilder<TYdbDataSourceInstanceBuilder<TParent>, TParent>;

explicit TYdbDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr)
: TBase(result, parent)
{
FillWithDefaults();
}

void FillWithDefaults() {
TBase::FillWithDefaults();
this->Host(DEFAULT_YDB_HOST);
this->Port(DEFAULT_YDB_PORT);
this->Kind(NApi::EDataSourceKind::YDB);
this->Protocol(DEFAULT_YDB_PROTOCOL);
}
};

template <class TParent = void /* no parent by default */>
struct TDescribeTableResultBuilder: public TResponseBuilder<TParent, NApi::TDescribeTableResponse> {
using TBuilder = TDescribeTableResultBuilder<TParent>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ namespace NYql::NConnector::NTest {
extern const TString DEFAULT_CH_CLUSTER_ID = "ch-managed";
extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID = "sa";
extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE = "sa_signature";

extern const TString DEFAULT_YDB_HOST = "localhost";
extern const TString DEFAULT_YDB_DATABASE = "local";
extern const TString DEFAULT_YDB_ENDPOINT = TStringBuilder() << DEFAULT_YDB_HOST << ':' << DEFAULT_YDB_PORT;
} // namespace NYql::NConnector::NTest
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ namespace NYql::NConnector::NTest {
constexpr NApi::EProtocol DEFAULT_CH_PROTOCOL = NApi::EProtocol::HTTP;
extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID;
extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE;

extern const TString DEFAULT_YDB_DATABASE;
extern const TString DEFAULT_YDB_HOST;
constexpr int DEFAULT_YDB_PORT = 2136;
extern const TString DEFAULT_YDB_ENDPOINT;
constexpr NApi::EProtocol DEFAULT_YDB_PROTOCOL = NApi::EProtocol::NATIVE;
} // namespace NYql::NConnector::NTest
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ namespace NYql {
NYql::TGenericClusterConfig& clusterConfig) {
using namespace NConnector::NApi;

if (clusterConfig.GetKind() == EDataSourceKind::YDB) {
clusterConfig.SetProtocol(EProtocol::NATIVE);
return;
}

auto it = properties.find("protocol");
if (it == properties.cend()) {
ythrow yexception() << "missing 'PROTOCOL' value";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ namespace NYql {
case NYql::NConnector::NApi::POSTGRESQL:
sourceType = "PostgreSqlGeneric";
break;
case NYql::NConnector::NApi::YDB:
sourceType = "YdbGeneric";
break;
default:
ythrow yexception() << "Data source kind is unknown or not specified";
break;
Expand Down Expand Up @@ -193,6 +196,9 @@ namespace NYql {
case NConnector::NApi::POSTGRESQL:
properties["SourceType"] = "PostgreSql";
break;
case NConnector::NApi::YDB:
properties["SourceType"] = "Ydb";
break;
case NConnector::NApi::DATA_SOURCE_KIND_UNSPECIFIED:
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ namespace NYql {
switch (dataSourceKind) {
case NYql::NConnector::NApi::CLICKHOUSE:
break;
case NYql::NConnector::NApi::YDB:
break;
case NYql::NConnector::NApi::POSTGRESQL: {
// for backward compability set schema "public" by default
// TODO: simplify during https://st.yandex-team.ru/YQ-2494
Expand Down Expand Up @@ -324,7 +326,7 @@ namespace NYql {
dbNameTarget = "postgres";
break;
default:
ythrow yexception() << "Unexpected data source kind: '"
ythrow yexception() << "You must provide database name explicitly for data source kind: '"
<< NYql::NConnector::NApi::EDataSourceKind_Name(dataSourceKind) << "'";
}
} // else take database name from table path
Expand Down