Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,25 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache

TTableMetadataResult EnrichExternalTable(const TTableMetadataResult& externalTable, const TTableMetadataResult& externalDataSource) {
TTableMetadataResult result;

if (!externalTable.Success()) {
result.AddIssues(externalTable.Issues());
return result;
}

if (!externalDataSource.Success()) {
result.AddIssues(externalDataSource.Issues());
return result;
}

if (externalTable.Metadata->ExternalSource.Type != externalDataSource.Metadata->ExternalSource.Type) {
result.AddIssue(YqlIssue({}, TIssuesIds::KIKIMR_INTERNAL_ERROR, TStringBuilder()
<< "Internal error. External table type mismatch, expected: " << externalTable.Metadata->ExternalSource.Type
<< ", but underlying external data source has type: " << externalDataSource.Metadata->ExternalSource.Type
));
return result;
}

result.SetSuccess();
result.Metadata = externalTable.Metadata;
auto tableMeta = result.Metadata;
Expand Down Expand Up @@ -885,7 +895,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
// In the case of reading from an external data source,
// we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
// In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard.
// In case of external data source `cluster` = "/Root/external_data_source" and `id` = "/path_in_external_system"
const bool resolveEntityInsideDataSource = (cluster != Cluster);
TMaybe<TString> externalPath;
TPath entityName = id;
Expand Down
49 changes: 49 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8278,6 +8278,19 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0);
UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetLocation(), "other-bucket");
}

{
const auto result = queryClient.ExecuteQuery(fmt::format(R"(
CREATE OR REPLACE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="YT",
LOCATION="other-bucket",
AUTH_METHOD="NONE"
);)",
"external_source"_a = externalDataSourceName
), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Changing external data source type is not allowed");
}
}

Y_UNIT_TEST(CreateExternalDataSourceWithSa) {
Expand Down Expand Up @@ -8610,6 +8623,19 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
const auto& externalTable = externalTableDesc->ResultSet.at(0);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/other/location/");
}

{
const auto result = queryClient.ExecuteQuery(fmt::format(R"(
CREATE OR REPLACE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="YT",
LOCATION="other-bucket",
AUTH_METHOD="NONE"
);)",
"external_source"_a = externalDataSourceName
), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Changing external data source type is not allowed");
}
}

Y_UNIT_TEST(DisableCreateExternalTable) {
Expand Down Expand Up @@ -8915,6 +8941,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
NKikimrConfig::TAppConfig config;
config.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
config.MutableQueryServiceConfig()->MutableS3()->SetGeneratorPathsLimit(50000);
config.MutableFeatureFlags()->SetEnableReplaceIfExistsForExternalEntities(true);
TKikimrRunner kikimr{ NKqp::TKikimrSettings(config) };

kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
Expand Down Expand Up @@ -8955,6 +8982,28 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Other entities depend on this data source, please remove them at the beginning: /Root/ExternalTable", result.GetIssues().ToString());
}

auto queryClient = kikimr.GetQueryClient();
{
const auto result = queryClient.ExecuteQuery(fmt::format(R"(
CREATE OR REPLACE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="other-bucket",
AUTH_METHOD="NONE"
);)",
"external_source"_a = externalDataSourceName
), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToOneLineString());
}

{
const auto result = session.ExecuteSchemeQuery(fmt::format(
"DROP EXTERNAL DATA SOURCE `{external_source}`",
"external_source"_a = externalDataSourceName
)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Other entities depend on this data source, please remove them at the beginning: /Root/ExternalTable");
}
}

Y_UNIT_TEST(DropNonExistingExternalDataSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@

#include <utility>

namespace {
namespace NKikimr::NSchemeShard {

using namespace NKikimr;
using namespace NSchemeShard;
namespace {

class TPropose: public TSubOperationState {
private:
Expand Down Expand Up @@ -194,10 +193,9 @@ class TAlterExternalDataSource : public TSubOperation {
THolder<TProposeResponse> Propose(const TString& owner,
TOperationContext& context) override {
Y_UNUSED(owner);
const auto ssId = context.SS->SelfTabletId();
const auto ssId = context.SS->SelfTabletId();
const TString& parentPathStr = Transaction.GetWorkingDir();
const auto& externalDataSourceDescription =
Transaction.GetCreateExternalDataSource();
const auto& externalDataSourceDescription = Transaction.GetCreateExternalDataSource();
const TString& name = externalDataSourceDescription.GetName();

LOG_N("TAlterExternalDataSource Propose"
Expand All @@ -215,22 +213,22 @@ class TAlterExternalDataSource : public TSubOperation {
}

const TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
RETURN_RESULT_UNLESS(NExternalDataSource::IsParentPathValid(
result, parentPath, Transaction, /* isCreate */ false));
RETURN_RESULT_UNLESS(NExternalDataSource::IsParentPathValid(result, parentPath, Transaction, /* isCreate */ false));

const TPath dstPath = parentPath.Child(name);

RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath));
RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context));
RETURN_RESULT_UNLESS(IsDescriptionValid(result, externalDataSourceDescription, context.SS->ExternalSourceFactory));

const auto oldExternalDataSourceInfo =
context.SS->ExternalDataSources.Value(dstPath->PathId, nullptr);
const auto oldExternalDataSourceInfo = context.SS->ExternalDataSources.Value(dstPath->PathId, nullptr);
Y_ABORT_UNLESS(oldExternalDataSourceInfo);
const TExternalDataSourceInfo::TPtr externalDataSourceInfo =
NExternalDataSource::CreateExternalDataSource(externalDataSourceDescription,
oldExternalDataSourceInfo->AlterVersion + 1);
const TExternalDataSourceInfo::TPtr externalDataSourceInfo = NExternalDataSource::CreateExternalDataSource(
externalDataSourceDescription,
oldExternalDataSourceInfo->AlterVersion + 1
);
Y_ABORT_UNLESS(externalDataSourceInfo);
externalDataSourceInfo->ExternalTableReferences = oldExternalDataSourceInfo->ExternalTableReferences;

{
bool isTieredStorage = false;
Expand All @@ -250,9 +248,13 @@ class TAlterExternalDataSource : public TSubOperation {
}
}

if (oldExternalDataSourceInfo->SourceType != externalDataSourceInfo->SourceType) {
result->SetError(NKikimrScheme::StatusSchemeError, "Changing external data source type is not allowed");
return result;
}

AddPathInSchemeShard(result, dstPath);
const TPathElement::TPtr externalDataSource =
ReplaceExternalDataSourcePathElement(dstPath);
const TPathElement::TPtr externalDataSource = ReplaceExternalDataSourcePathElement(dstPath);
CreateTransaction(context, externalDataSource->PathId);

NIceDb::TNiceDb db(context.GetDB());
Expand All @@ -261,8 +263,7 @@ class TAlterExternalDataSource : public TSubOperation {

AdvanceTransactionStateToPropose(context, db);

PersistExternalDataSource(
context, db, externalDataSource, externalDataSourceInfo);
PersistExternalDataSource(context, db, externalDataSource, externalDataSourceInfo);

IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId,
dstPath,
Expand All @@ -286,9 +287,7 @@ class TAlterExternalDataSource : public TSubOperation {
}
};

} // namespace

namespace NKikimr::NSchemeShard {
} // anonymous namespace

ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, const TTxTransaction& tx) {
return MakeSubOperation<TAlterExternalDataSource>(id, tx);
Expand All @@ -299,4 +298,4 @@ ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, TTxState::ETx
return MakeSubOperation<TAlterExternalDataSource>(id, state);
}

}
} // namespace NKikimr::NSchemeShard
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,15 @@ bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc,
}
}

TExternalDataSourceInfo::TPtr CreateExternalDataSource(
const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion) {
TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo;
externalDataSoureInfo->SourceType = desc.GetSourceType();
externalDataSoureInfo->Location = desc.GetLocation();
externalDataSoureInfo->Installation = desc.GetInstallation();
externalDataSoureInfo->AlterVersion = alterVersion;
externalDataSoureInfo->Auth.CopyFrom(desc.GetAuth());
externalDataSoureInfo->Properties.CopyFrom(desc.GetProperties());
return externalDataSoureInfo;
TExternalDataSourceInfo::TPtr CreateExternalDataSource(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion) {
auto externalDataSourceInfo = MakeIntrusive<TExternalDataSourceInfo>();
externalDataSourceInfo->SourceType = desc.GetSourceType();
externalDataSourceInfo->Location = desc.GetLocation();
externalDataSourceInfo->Installation = desc.GetInstallation();
externalDataSourceInfo->AlterVersion = alterVersion;
externalDataSourceInfo->Auth = desc.GetAuth();
externalDataSourceInfo->Properties = desc.GetProperties();
return externalDataSourceInfo;
}

} // namespace NKikimr::NSchemeShard::NExternalDataSource
Loading