Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions docs/en/engines/database-engines/datalake.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,22 @@ catalog_type,

The following settings are supported:

| Setting | Description |
|-------------------------|-----------------------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| Setting | Description |
|-------------------------|-----------------------------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| `dlf_access_key_id` | Access key ID for DLF access |
| `dlf_access_key_secret` | Access key Secret for DLF access |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| `dlf_access_key_id` | Access key ID for DLF access |
| `dlf_access_key_secret` | Access key Secret for DLF access |
| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` |

## Examples {#examples}

Expand All @@ -83,4 +84,30 @@ SETTINGS
onelake_client_secret = client_secret;
SHOW TABLES IN databse_name;
SELECT count() from database_name.table_name;
```
```

## Namespace filter {#namespace}

By default, ClickHouse reads tables from all namespaces available in the catalog. You can limit this behavior using the `namespaces` database setting. The value should be a comma‑separated list of namespaces that are allowed to be read.

Supported catalog types are `rest`, `glue` and `unity`.

For example, if the catalog contains three namespaces - `dev`, `stage`, and `prod` - and you want to read data only from dev and stage, set:
```
namespaces='dev,stage'
```

### Nested namespaces {#namespace-nested}

The Iceberg (`rest`) catalog supports nested namespaces. The `namespaces` filter accepts the following patterns:

- `namespace` - includes tables from the specified namespace, but not from its nested namespaces.
- `namespace.nested` - includes tables from the nested namespace, but not from the parent.
- `namespace.*` - includes tables from all nested namespaces, but not from the parent.

If you need to include both a namespace and its nested namespaces, specify both explicitly. For example:
```
namespaces='namespace,namespace.*'
```

The default value is '*', which means all namespaces are included.
1 change: 1 addition & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@
M(763, SESSION_REFUSED) \
M(764, DEDUPLICATION_IS_NOT_POSSIBLE) \
M(765, UNKNOWN_MASKING_POLICY) \
M(766, CATALOG_NAMESPACE_DISABLED) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
Expand Down
24 changes: 14 additions & 10 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString onelake_client_secret;
extern const DatabaseDataLakeSettingsString dlf_access_key_id;
extern const DatabaseDataLakeSettingsString dlf_access_key_secret;
extern const DatabaseDataLakeSettingsString namespaces;
}

namespace Setting
Expand Down Expand Up @@ -142,6 +143,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
.aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value,
.aws_secret_access_key = settings[DatabaseDataLakeSetting::aws_secret_access_key].value,
.region = settings[DatabaseDataLakeSetting::region].value,
.namespaces = settings[DatabaseDataLakeSetting::namespaces].value
};

switch (settings[DatabaseDataLakeSetting::catalog_type].value)
Expand All @@ -156,6 +158,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
settings[DatabaseDataLakeSetting::auth_header],
settings[DatabaseDataLakeSetting::oauth_server_uri].value,
settings[DatabaseDataLakeSetting::oauth_server_use_request_body].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand All @@ -179,6 +182,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::catalog_credential].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand Down Expand Up @@ -274,24 +278,24 @@ std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfigur
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_AZURE_BLOB_STORAGE
case DB::DatabaseDataLakeStorageType::Azure:
{
return std::make_shared<StorageAzureIcebergConfiguration>(storage_settings);
return std::make_shared<StorageAzureIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_HDFS
case DB::DatabaseDataLakeStorageType::HDFS:
{
return std::make_shared<StorageHDFSIcebergConfiguration>(storage_settings);
return std::make_shared<StorageHDFSIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -303,7 +307,7 @@ std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfigur
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS
default:
Expand All @@ -320,7 +324,7 @@ std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfigur
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3DeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageS3DeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_AZURE_BLOB_STORAGE
Expand All @@ -331,7 +335,7 @@ std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfigur
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -343,7 +347,7 @@ std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfigur
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand All @@ -358,12 +362,12 @@ std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfigur
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DatabaseDataLakeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace ErrorCodes
DECLARE(String, onelake_client_secret, "", "Client secret from azure", 0) \
DECLARE(String, dlf_access_key_id, "", "Access id of DLF token for Paimon REST Catalog", 0) \
DECLARE(String, dlf_access_key_secret, "", "Access secret of DLF token for Paimon REST Catalog", 0) \
DECLARE(String, namespaces, "*", "Comma-separated list of allowed namespaces", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \
Expand Down
36 changes: 26 additions & 10 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int DATALAKE_DATABASE_ERROR;
extern const int CATALOG_NAMESPACE_DISABLED;
}

namespace DB::Setting
Expand All @@ -80,14 +81,6 @@ namespace DB::StorageObjectStorageSetting
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
}

namespace DB::DatabaseDataLakeSetting
{
extern const DatabaseDataLakeSettingsString storage_endpoint;
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
}

namespace CurrentMetrics
{
extern const Metric MarkCacheBytes;
Expand Down Expand Up @@ -175,6 +168,7 @@ GlueCatalog::GlueCatalog(
glue_client = std::make_unique<Aws::Glue::GlueClient>(credentials_provider, endpoint_provider, client_configuration);
}

boost::split(allowed_namespaces, settings.namespaces, boost::is_any_of(", "), boost::token_compress_on);
}

GlueCatalog::~GlueCatalog() = default;
Expand All @@ -200,8 +194,9 @@ DataLake::ICatalog::Namespaces GlueCatalog::getDatabases(const std::string & pre
for (const auto & db : dbs)
{
const auto & db_name = db.GetName();
if (!db_name.starts_with(prefix))
if (!isNamespaceAllowed(db_name) || !db_name.starts_with(prefix))
continue;

result.push_back(db_name);
if (limit != 0 && result.size() >= limit)
break;
Expand Down Expand Up @@ -281,6 +276,9 @@ DB::Names GlueCatalog::getTables() const

bool GlueCatalog::existsTable(const std::string & database_name, const std::string & table_name) const
{
if (!isNamespaceAllowed(database_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name);

Aws::Glue::Model::GetTableRequest request;
request.SetDatabaseName(database_name);
request.SetName(table_name);
Expand All @@ -294,6 +292,9 @@ bool GlueCatalog::tryGetTableMetadata(
const std::string & table_name,
TableMetadata & result) const
{
if (!isNamespaceAllowed(database_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name);

Aws::Glue::Model::GetTableRequest request;
request.SetDatabaseName(database_name);
request.SetName(table_name);
Expand Down Expand Up @@ -512,7 +513,7 @@ GlueCatalog::ObjectStorageWithPath GlueCatalog::createObjectStorageForEarlyTable

auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
storage_settings->loadFromSettingsChanges(settings.allChanged());
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings, settings.namespaces);
DB::StorageObjectStorageConfiguration::initialize(*configuration, args, getContext(), false);

auto object_storage = configuration->createObjectStorage(getContext(), true);
Expand Down Expand Up @@ -580,6 +581,11 @@ void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) cons

void GlueCatalog::createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*metadata_content*/) const
{
if (!isNamespaceAllowed(namespace_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED,
"Failed to create table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

createNamespaceIfNotExists(namespace_name);

Aws::Glue::Model::CreateTableRequest request;
Expand Down Expand Up @@ -652,6 +658,11 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t

void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const
{
if (!isNamespaceAllowed(namespace_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED,
"Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

Aws::Glue::Model::DeleteTableRequest request;
request.SetDatabaseName(namespace_name);
request.SetName(table_name);
Expand All @@ -665,6 +676,11 @@ void GlueCatalog::dropTable(const String & namespace_name, const String & table_
response.GetError().GetMessage());
}

bool GlueCatalog::isNamespaceAllowed(const std::string & namespace_) const
{
return allowed_namespaces.contains("*") || allowed_namespaces.contains(namespace_);
}

}

#endif
3 changes: 3 additions & 0 deletions src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
std::string region;
CatalogSettings settings;
DB::ASTPtr table_engine_definition;
std::unordered_set<std::string> allowed_namespaces;

bool isNamespaceAllowed(const std::string & namespace_) const;

DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const;
DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const;
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ struct CatalogSettings
String aws_access_key_id;
String aws_secret_access_key;
String region;
String namespaces;

DB::SettingsChanges allChanged() const;
};
Expand Down
Loading
Loading