Skip to content

Forbid writing/reading directly to external data source #1204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,7 @@ class TKikimrIcGateway : public IKqpGateway {
return InvalidCluster<TTableMetadataResult>(cluster);
}

settings.WithExternalDatasources_ = !CheckCluster(cluster);
// In the case of reading from an external data source,
// we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
// In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
return MetadataLoader->LoadTableMetadata(settings.WithExternalDatasources_ ? GetDefaultCluster() : cluster, settings.WithExternalDatasources_ ? cluster : table, settings, Database, UserToken);
return MetadataLoader->LoadTableMetadata(cluster, table, settings, Database, UserToken);
} catch (yexception& e) {
return MakeFuture(ResultFromException<TTableMetadataResult>(e));
}
Expand Down
60 changes: 43 additions & 17 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC
tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
tableMeta->SchemaVersion = description.GetVersion();
tableMeta->Kind = NYql::EKikimrTableKind::External;
tableMeta->TableType = NYql::ETableType::ExternalTable;

tableMeta->Attributes = entry.Attributes;

Expand All @@ -253,7 +254,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC
}

TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
const TString& cluster, const TString& tableName) {
const TString& cluster, const TString& mainCluster, const TString& tableName) {
const auto& description = entry.ExternalDataSourceInfo->Description;
TTableMetadataResult result;
result.SetSuccess();
Expand All @@ -263,6 +264,11 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
tableMeta->SchemaVersion = description.GetVersion();
tableMeta->Kind = NYql::EKikimrTableKind::External;
if (cluster == mainCluster) { // resolved external data source itself
tableMeta->TableType = NYql::ETableType::Unknown;
} else {
tableMeta->TableType = NYql::ETableType::Table; // wanted to resolve table in external data source
}

tableMeta->Attributes = entry.Attributes;

Expand Down Expand Up @@ -300,7 +306,7 @@ TTableMetadataResult GetViewMetadataResult(
}

TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
const TString& cluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) {
const TString& cluster, const TString& mainCluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) {
using TResult = NYql::IKikimrGateway::TTableMetadataResult;
using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
Expand Down Expand Up @@ -339,7 +345,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
result = GetExternalTableMetadataResult(entry, cluster, tableName);
break;
case EKind::KindExternalDataSource:
result = GetExternalDataSourceMetadataResult(entry, cluster, tableName);
result = GetExternalDataSourceMetadataResult(entry, cluster, mainCluster, tableName);
break;
case EKind::KindView:
result = GetViewMetadataResult(entry, cluster, tableName);
Expand Down Expand Up @@ -697,16 +703,30 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;

const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_);
Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only");
auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id,
// In the case of reading from an external data source,
// we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
// In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
const bool resolveEntityInsideDataSource = (cluster != Cluster);
TPath entityName = id;
if constexpr (std::is_same_v<TPath, TString>) {
if (resolveEntityInsideDataSource) {
entityName = cluster;
}
} else {
Y_ENSURE(!resolveEntityInsideDataSource);
}

const auto externalEntryItem = CreateNavigateExternalEntry(entityName, resolveEntityInsideDataSource);
Y_ABORT_UNLESS(!resolveEntityInsideDataSource || externalEntryItem, "External data source must be resolved using path only");
auto resNavigate = resolveEntityInsideDataSource ? *externalEntryItem : CreateNavigateEntry(entityName,
settings, TempTablesState);
const auto entry = resNavigate.Entry;
const auto queryName = resNavigate.QueryName;
const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<NavigateEntryResult>{} : externalEntryItem;
const ui64 expectedSchemaVersion = GetExpectedVersion(id);
const auto externalEntry = resolveEntityInsideDataSource ? std::optional<NavigateEntryResult>{} : externalEntryItem;
const ui64 expectedSchemaVersion = GetExpectedVersion(entityName);

LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id));
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(entityName));

auto navigate = MakeHolder<TNavigate>();
navigate->ResultSet.emplace_back(entry);
Expand All @@ -728,7 +748,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
ActorSystem,
schemeCacheId,
ev.Release(),
[userToken, database, cluster, table, settings, expectedSchemaVersion, this, queryName]
[userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName]
(TPromise<TResult> promise, TResponse&& response) mutable
{
try {
Expand All @@ -739,7 +759,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
auto& entry = InferEntry(navigate.ResultSet);

if (entry.Status != EStatus::Ok) {
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table));
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table));
return;
}

Expand All @@ -759,9 +779,15 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
}
}

const bool resolveEntityInsideDataSource = (cluster != Cluster);
// resolveEntityInsideDataSource => entry.Kind == EKind::KindExternalDataSource
if (resolveEntityInsideDataSource && entry.Kind != EKind::KindExternalDataSource) {
throw yexception() << "\"" << CombinePath(entry.Path.begin(), entry.Path.end()) << "\" is expected to be external data source";
}

switch (entry.Kind) {
case EKind::KindExternalDataSource: {
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, table);
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
if (!externalDataSourceMetadata.Success()) {
promise.SetValue(externalDataSourceMetadata);
return;
Expand All @@ -772,12 +798,12 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue());
promise.SetValue(externalDataSourceMetadata);
});
break;
}
break;
case EKind::KindExternalTable: {
YQL_ENSURE(entry.ExternalTableInfo, "expected external table info");
const auto& dataSourcePath = entry.ExternalTableInfo->Description.GetDataSourcePath();
auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, table);
auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
if (!externalTableMetadata.Success()) {
promise.SetValue(externalTableMetadata);
return;
Expand All @@ -789,8 +815,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
auto externalDataSourceMetadata = result.GetValue();
promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata));
});
break;
}
break;
case EKind::KindIndex: {
Y_ENSURE(entry.ListNodeEntry, "expected children list");
Y_ENSURE(entry.ListNodeEntry->Children.size() == 1, "expected one child");
Expand All @@ -805,10 +831,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
{
promise.SetValue(result.GetValue());
});
break;
}
break;
default: {
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table, queryName));
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table, queryName));
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/kqp/gateway/kqp_metadata_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ namespace NKikimr::NKqp {
class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLoader {
public:

explicit TKqpTableMetadataLoader(TActorSystem* actorSystem,
NYql::TKikimrConfiguration::TPtr config,
bool needCollectSchemeData = false,
explicit TKqpTableMetadataLoader(const TString& cluster,
TActorSystem* actorSystem,
NYql::TKikimrConfiguration::TPtr config,
bool needCollectSchemeData = false,
TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
TDuration maximalSecretsSnapshotWaitTime = TDuration::Seconds(20))
: NeedCollectSchemeData(needCollectSchemeData)
: Cluster(cluster)
, NeedCollectSchemeData(needCollectSchemeData)
, ActorSystem(actorSystem)
, Config(config)
, TempTablesState(std::move(tempTablesState))
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
{};
{}

NThreading::TFuture<NYql::IKikimrGateway::TTableMetadataResult> LoadTableMetadata(
const TString& cluster, const TString& table, const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, const TString& database,
Expand Down Expand Up @@ -56,6 +58,7 @@ class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLo

void OnLoadedTableMetadata(NYql::IKikimrGateway::TTableMetadataResult& loadTableMetadataResult);

const TString Cluster;
TVector<NKikimrKqp::TKqpTableMetadataProto> CollectedSchemeData;
TMutex Lock;
bool NeedCollectSchemeData;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ class TKikimrDataSink : public TDataProviderBase
return true;
}

if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Attempt to write to external data source \"" << key.GetTablePath() << "\" without table. Please specify table to write to"));
return false;
}

if (tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalDataSource && tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalTable) {
YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType;
return true;
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase {
}
}
break;
default:
default:
break;
}
*result = value;
Expand Down Expand Up @@ -679,6 +679,11 @@ class TKikimrDataSource : public TDataProviderBase {
auto& tableDesc = SessionCtx->Tables().GetTable(cluster, tablePath);
if (key.GetKeyType() == TKikimrKey::Type::Table) {
if (tableDesc.Metadata->Kind == EKikimrTableKind::External) {
if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
ctx.AddError(TIssue(node->Pos(ctx),
TStringBuilder() << "Attempt to read from external data source \"" << tablePath << "\" without table. Please specify table to read from"));
return nullptr;
}
if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) {
const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type);
ctx.Step.Repeat(TExprStep::DiscoveryIO)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
auto tableTypeItem = table.Metadata->TableType;
if (tableTypeItem == ETableType::ExternalTable && !SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) {
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()),
TStringBuilder() << "External table are disabled. Please contact your system administrator to enable it"));
TStringBuilder() << "External tables are disabled. Please contact your system administrator to enable it"));
return SyncError();
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) {
counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters);
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);

std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig());
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
QueryState->RequestEv.reset(ev->Release().Release());

std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(
TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Settings.Cluster, TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, QueryServiceConfig);

Expand Down
Loading