Skip to content

[YQ-1997] Refactor create external data source / external table schemeshard operations #1119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#include "schemeshard__operation_common_external_data_source.h"

#include <utility>

namespace NKikimr::NSchemeShard::NExternalDataSource {

constexpr uint32_t MAX_FIELD_SIZE = 1000;
constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB

bool ValidateLocationAndInstallation(const TString& location,
const TString& installation,
TString& errStr) {
if (location.Size() > MAX_FIELD_SIZE) {
errStr =
Sprintf("Maximum length of location must be less or equal equal to %u but got %lu",
MAX_FIELD_SIZE,
location.Size());
return false;
}
if (installation.Size() > MAX_FIELD_SIZE) {
errStr = Sprintf(
"Maximum length of installation must be less or equal equal to %u but got %lu",
MAX_FIELD_SIZE,
installation.Size());
return false;
}
return true;
}

bool CheckAuth(const TString& authMethod,
const TVector<TString>& availableAuthMethods,
TString& errStr) {
if (Find(availableAuthMethods, authMethod) == availableAuthMethods.end()) {
errStr = TStringBuilder{} << authMethod << " isn't supported for this source type";
return false;
}

return true;
}

bool ValidateProperties(const NKikimrSchemeOp::TExternalDataSourceProperties& properties,
TString& errStr) {
if (properties.ByteSizeLong() > MAX_PROTOBUF_SIZE) {
errStr =
Sprintf("Maximum size of properties must be less or equal equal to %u but got %lu",
MAX_PROTOBUF_SIZE,
properties.ByteSizeLong());
return false;
}
return true;
}

bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth,
const NExternalSource::IExternalSource::TPtr& source,
TString& errStr) {
if (auth.ByteSizeLong() > MAX_PROTOBUF_SIZE) {
errStr = Sprintf(
"Maximum size of authorization information must be less or equal equal to %u but got %lu",
MAX_PROTOBUF_SIZE,
auth.ByteSizeLong());
return false;
}
const auto availableAuthMethods = source->GetAuthMethods();
switch (auth.identity_case()) {
case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: {
errStr = "Authorization method isn't specified";
return false;
}
case NKikimrSchemeOp::TAuth::kServiceAccount:
return CheckAuth("SERVICE_ACCOUNT", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kMdbBasic:
return CheckAuth("MDB_BASIC", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kBasic:
return CheckAuth("BASIC", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kAws:
return CheckAuth("AWS", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kNone:
return CheckAuth("NONE", availableAuthMethods, errStr);
}
return false;
}

bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc,
const NExternalSource::IExternalSourceFactory::TPtr& factory,
TString& errStr) {
try {
const auto source = factory->GetOrCreate(desc.GetSourceType());
source->ValidateExternalDataSource(desc.SerializeAsString());
return ValidateLocationAndInstallation(desc.GetLocation(),
desc.GetInstallation(),
errStr) &&
ValidateAuth(desc.GetAuth(), source, errStr) &&
ValidateProperties(desc.GetProperties(), errStr);
} catch (...) {
errStr = CurrentExceptionMessage();
return false;
}
}

TExternalDataSourceInfo::TPtr CreateExternalDataSource(
const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion) {
TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo;
externalDataSoureInfo->SourceType = desc.GetSourceType();
externalDataSoureInfo->Location = desc.GetLocation();
externalDataSoureInfo->Installation = desc.GetInstallation();
externalDataSoureInfo->AlterVersion = alterVersion;
externalDataSoureInfo->Auth.CopyFrom(desc.GetAuth());
externalDataSoureInfo->Properties.CopyFrom(desc.GetProperties());
return externalDataSoureInfo;
}

} // namespace NKikimr::NSchemeShard::NExternalDataSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"

#include <ydb/core/tablet_flat/test/libs/table/test_iter.h>

#include <utility>

#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
#define RETURN_RESULT_UNLESS(x) if (!(x)) return result;

namespace NKikimr::NSchemeShard::NExternalDataSource {

inline TPath::TChecker IsParentPathValid(const TPath& parentPath) {
auto checks = parentPath.Check();
checks.NotUnderDomainUpgrade()
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.NotUnderDeleting()
.IsCommonSensePath()
.IsLikeDirectory();

return std::move(checks);
}

inline bool IsParentPathValid(const THolder<TProposeResponse>& result,
const TPath& parentPath) {
const auto checks = IsParentPathValid(parentPath);

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
}

return static_cast<bool>(checks);
}

bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc,
const NExternalSource::IExternalSourceFactory::TPtr& factory,
TString& errStr);

TExternalDataSourceInfo::TPtr CreateExternalDataSource(
const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion);

} // namespace NKikimr::NSchemeShard::NExternalDataSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#include "schemeshard__operation_common_external_table.h"

#include <utility>

namespace NKikimr::NSchemeShard::NExternalTable {

constexpr uint32_t MAX_FIELD_SIZE = 1000;
constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB

bool ValidateSourceType(const TString& sourceType, TString& errStr) {
// Only object storage supported today
if (sourceType != "ObjectStorage") {
errStr = "Only ObjectStorage source type supported but got " + sourceType;
return false;
}
return true;
}

bool ValidateLocation(const TString& location, TString& errStr) {
if (!location) {
errStr = "Location must not be empty";
return false;
}
if (location.Size() > MAX_FIELD_SIZE) {
errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size());
return false;
}
return true;
}

bool ValidateContent(const TString& content, TString& errStr) {
if (content.Size() > MAX_PROTOBUF_SIZE) {
errStr = Sprintf("Maximum size of content must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, content.Size());
return false;
}
return true;
}

bool ValidateDataSourcePath(const TString& dataSourcePath, TString& errStr) {
if (!dataSourcePath) {
errStr = "Data source path must not be empty";
return false;
}
return true;
}

bool Validate(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) {
return ValidateSourceType(sourceType, errStr)
&& ValidateLocation(desc.GetLocation(), errStr)
&& ValidateContent(desc.GetContent(), errStr)
&& ValidateDataSourcePath(desc.GetDataSourcePath(), errStr);
}

Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) {
Ydb::Type ydbType;
if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
auto* typeDesc = typeInfo.GetTypeDesc();
auto* pg = ydbType.mutable_pg_type();
pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc));
pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc));
} else {
auto& item = notNull
? ydbType
: *ydbType.mutable_optional_type()->mutable_item();
item.set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(typeInfo.GetTypeId()));
}
return ydbType;
}

std::pair<TExternalTableInfo::TPtr, TMaybe<TString>> CreateExternalTable(
const TString& sourceType,
const NKikimrSchemeOp::TExternalTableDescription& desc,
const NExternalSource::IExternalSourceFactory::TPtr& factory,
ui64 alterVersion) {
TString errStr;

if (!desc.ColumnsSize()) {
errStr = "The schema must have at least one column";
return std::make_pair(nullptr, errStr);
}

TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo;
const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;

if (desc.GetSourceType() != "General") {
errStr = "Only general data source has been supported as request";
return std::make_pair(nullptr, errStr);
}

externalTableInfo->DataSourcePath = desc.GetDataSourcePath();
externalTableInfo->Location = desc.GetLocation();
externalTableInfo->AlterVersion = alterVersion;
externalTableInfo->SourceType = sourceType;

NKikimrExternalSources::TSchema schema;
uint64_t nextColumnId = 1;
for (const auto& col : desc.GetColumns()) {
TString colName = col.GetName();

if (!colName) {
errStr = "Columns cannot have an empty name";
return std::make_pair(nullptr, errStr);
}

if (col.HasTypeId()) {
errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type";
return std::make_pair(nullptr, errStr);
}

if (!col.HasType()) {
errStr = TStringBuilder() << "Missing Type for column '" << colName << "'";
return std::make_pair(nullptr, errStr);
}

auto typeName = NMiniKQL::AdaptLegacyYqlType(col.GetType());
const NScheme::IType* type = typeRegistry->GetType(typeName);

NScheme::TTypeInfo typeInfo;
if (type) {
// Only allow YQL types
if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
errStr = Sprintf("Type '%s' specified for column '%s' is no longer supported", col.GetType().data(), colName.data());
return std::make_pair(nullptr, errStr);
}
typeInfo = NScheme::TTypeInfo(type->GetTypeId());
} else {
auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
if (!typeDesc) {
errStr = Sprintf("Type '%s' specified for column '%s' is not supported by storage", col.GetType().data(), colName.data());
return std::make_pair(nullptr, errStr);
}
typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
}

ui32 colId = col.HasId() ? col.GetId() : nextColumnId;
if (externalTableInfo->Columns.contains(colId)) {
errStr = Sprintf("Duplicate column id: %" PRIu32, colId);
return std::make_pair(nullptr, errStr);
}

nextColumnId = colId + 1 > nextColumnId ? colId + 1 : nextColumnId;

TTableInfo::TColumn& column = externalTableInfo->Columns[colId];
column = TTableInfo::TColumn(colName, colId, typeInfo, "", col.GetNotNull()); // TODO: do we need typeMod here?

auto& schemaColumn= *schema.add_column();
schemaColumn.set_name(colName);
*schemaColumn.mutable_type() = CreateYdbType(typeInfo, col.GetNotNull());
}

try {
NKikimrExternalSources::TGeneral general;
general.ParseFromStringOrThrow(desc.GetContent());
const auto source = factory->GetOrCreate(sourceType);
if (!source->HasExternalTable()) {
errStr = TStringBuilder{} << "External table isn't supported for " << sourceType;
return std::make_pair(nullptr, errStr);
}
externalTableInfo->Content = source->Pack(schema, general);
} catch (...) {
errStr = CurrentExceptionMessage();
return std::make_pair(nullptr, errStr);
}

return std::make_pair(externalTableInfo, Nothing());
}


} // namespace NKikimr::NSchemeShard::NExternalDataSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"

#include <ydb/core/tablet_flat/test/libs/table/test_iter.h>

#include <utility>

#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
#define RETURN_RESULT_UNLESS(x) if (!(x)) return result;

namespace NKikimr::NSchemeShard::NExternalTable {

inline TPath::TChecker IsParentPathValid(const TPath& parentPath) {
return parentPath.Check()
.NotUnderDomainUpgrade()
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.NotUnderDeleting()
.IsCommonSensePath()
.IsLikeDirectory();
}

inline bool IsParentPathValid(const THolder<TProposeResponse>& result,
const TPath& parentPath) {
const auto checks = IsParentPathValid(parentPath);

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
}

return static_cast<bool>(checks);
}

bool Validate(const TString& sourceType,
const NKikimrSchemeOp::TExternalTableDescription& desc,
TString& errStr);

std::pair<TExternalTableInfo::TPtr, TMaybe<TString>> CreateExternalTable(
const TString& sourceType,
const NKikimrSchemeOp::TExternalTableDescription& desc,
const NExternalSource::IExternalSourceFactory::TPtr& factory,
ui64 alterVersion);


} // namespace NKikimr::NSchemeShard::NExternalDataSource
Loading