Skip to content

Add {_snowflake_id} wildcard support to object storage #789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion programs/library-bridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ target_link_libraries(clickhouse-library-bridge PRIVATE
daemon
dbms
bridge
clickhouse_functions_extractkeyvaluepairs
clickhouse_functions
)

set_target_properties(clickhouse-library-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
Expand Down
2 changes: 1 addition & 1 deletion programs/odbc-bridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ target_link_libraries(clickhouse-odbc-bridge PRIVATE
dbms
bridge
clickhouse_parsers
clickhouse_functions_extractkeyvaluepairs
clickhouse_functions
ch_contrib::nanodbc
ch_contrib::unixodbc
)
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5951,6 +5951,9 @@ This only affects operations performed on the client side, in particular parsing
Normally this setting should be set in user profile (users.xml or queries like `ALTER USER`), not through the client (client command line arguments, `SET` query, or `SETTINGS` section of `SELECT` query). Through the client it can be changed to false, but can't be changed to true (because the server won't send the settings if user profile has `apply_settings_from_server = false`).

Note that initially (24.12) there was a server setting (`send_settings_to_client`), but latter it got replaced with this client setting, for better usability.
)", 0) \
DECLARE(Bool, object_storage_treat_key_related_wildcards_as_star, false, R"(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three options here:

  1. make it the default behavior, not behind a setting
  2. make the setting on by default
  3. keep it off by default

Upon reading from object storage (e.g, s3, azure and etc), treat {_snowflake_id} and {_partition_id} as *. This will allow symmetrical reads and writes using a single table.
)", 0) \
\
/* ####################################################### */ \
Expand Down
6 changes: 6 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.

addSettingsChanges(settings_changes_history, "25.3",
{
// Altinity Antalya modifications atop of 25.3
{"object_storage_treat_key_related_wildcards_as_star", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.2.1.20000",
{
// Altinity Antalya modifications atop of 25.2
Expand Down
7 changes: 7 additions & 0 deletions src/Functions/generateSnowflakeID.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ struct Data

}

uint64_t generateSnowflakeID()
{
Data data;
SnowflakeId snowflake_id = data.reserveRange(getMachineId(), 1);
return fromSnowflakeId(snowflake_id);
}

class FunctionGenerateSnowflakeID : public IFunction
{
public:
Expand Down
10 changes: 10 additions & 0 deletions src/Functions/generateSnowflakeID.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

#include <cstdint>

namespace DB
{

uint64_t generateSnowflakeID();

}
8 changes: 8 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,18 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
public:
using Configuration = StorageObjectStorage::Configuration;

DataLakeConfiguration() = default;

DataLakeConfiguration(const DataLakeConfiguration & other)
: BaseStorageConfiguration(other)
, current_metadata(other.current_metadata->clone()) {}

bool isDataLakeConfiguration() const override { return true; }

std::string getEngineName() const override { return DataLakeMetadata::name + BaseStorageConfiguration::getEngineName(); }

StorageObjectStorage::ConfigurationPtr clone() override { return std::make_shared<DataLakeConfiguration>(*this); }

void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
{
BaseStorageConfiguration::update(object_storage, local_context);
Expand Down
13 changes: 13 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ class DeltaLakeMetadata final : public IDataLakeMetadata

DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

DeltaLakeMetadata(const DeltaLakeMetadata & other)
{
object_storage = other.object_storage;
data_files = other.data_files;
schema = other.schema;
partition_columns = other.partition_columns;
}

std::unique_ptr<IDataLakeMetadata> clone() override
{
return std::make_unique<DeltaLakeMetadata>(*this);
}

Strings getDataFiles() const override { return data_files; }

NamesAndTypesList getTableSchema() const override { return schema; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata
ContextPtr context_,
bool read_schema_same_as_table_schema_);

DeltaLakeMetadataDeltaKernel(const DeltaLakeMetadataDeltaKernel & other) : log(other.log), table_snapshot(other.table_snapshot) {}

bool supportsUpdate() const override { return true; }

bool update(const ContextPtr & context) override;

std::unique_ptr<IDataLakeMetadata> clone() override
{
return std::make_unique<DeltaLakeMetadataDeltaKernel>(*this);
}

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override;
Expand Down
9 changes: 9 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext

HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

HudiMetadata(const HudiMetadata & other)
: WithContext(other.getContext()), object_storage(other.object_storage), configuration(other.configuration), data_files(other.data_files)
{}

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override { return {}; }
Expand All @@ -39,6 +43,11 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
return std::make_unique<HudiMetadata>(object_storage, configuration, local_context);
}

std::unique_ptr<IDataLakeMetadata> clone() override
{
return std::make_unique<HudiMetadata>(*this);
}

protected:
ObjectIterator iterate(
const ActionsDAG * filter_dag,
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class IDataLakeMetadata : boost::noncopyable
virtual std::optional<size_t> totalRows() const { return {}; }
virtual std::optional<size_t> totalBytes() const { return {}; }

virtual std::unique_ptr<IDataLakeMetadata> clone() = 0;

protected:
ObjectIterator createKeysIterator(
Strings && data_files_,
Expand Down
22 changes: 22 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
const Poco::JSON::Object::Ptr & metadata_object,
IcebergMetadataFilesCachePtr cache_ptr);

IcebergMetadata(const IcebergMetadata & other)
: WithContext(other.getContext())
, object_storage(other.object_storage)
, configuration(other.configuration)
, schema_processor(other.schema_processor.clone())
, log(other.log)
, manifest_cache(other.manifest_cache)
, manifest_file_by_data_file(other.manifest_file_by_data_file)
, last_metadata_version(other.last_metadata_version)
, last_metadata_object(other.last_metadata_object)
, format_version(other.format_version)
, relevant_snapshot_schema_id(other.relevant_snapshot_schema_id)
, relevant_snapshot(other.relevant_snapshot)
, relevant_snapshot_id(other.relevant_snapshot_id)
, table_location(other.table_location)
{}

std::unique_ptr<IDataLakeMetadata> clone() override
{
return std::make_unique<IcebergMetadata>(*this);
}

/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed
/// without changing metadata file). Drops on every snapshot update.
Expand Down
27 changes: 27 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ class IcebergSchemaProcessor
using Node = ActionsDAG::Node;

public:
IcebergSchemaProcessor() = default;

IcebergSchemaProcessor clone()
{
std::lock_guard lock(mutex);

IcebergSchemaProcessor ret;

ret.iceberg_table_schemas_by_ids = iceberg_table_schemas_by_ids;
ret.clickhouse_table_schemas_by_ids = clickhouse_table_schemas_by_ids;
ret.transform_dags_by_ids = transform_dags_by_ids;
ret.clickhouse_types_by_source_ids = clickhouse_types_by_source_ids;
ret.clickhouse_ids_by_source_names = clickhouse_ids_by_source_names;
ret.current_schema_id = current_schema_id;

return ret;
}

void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr);
std::shared_ptr<NamesAndTypesList> getClickhouseTableSchemaById(Int32 id);
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id);
Expand All @@ -87,6 +105,15 @@ class IcebergSchemaProcessor
bool hasClickhouseTableSchemaById(Int32 id) const;

private:
IcebergSchemaProcessor(const IcebergSchemaProcessor & other)
: iceberg_table_schemas_by_ids(other.iceberg_table_schemas_by_ids),
clickhouse_table_schemas_by_ids(other.clickhouse_table_schemas_by_ids),
transform_dags_by_ids(other.transform_dags_by_ids),
clickhouse_types_by_source_ids(other.clickhouse_types_by_source_ids),
clickhouse_ids_by_source_names(other.clickhouse_ids_by_source_names),
current_schema_id(other.current_schema_id)
{}

std::unordered_map<Int32, Poco::JSON::Object::Ptr> iceberg_table_schemas_by_ids;
std::unordered_map<Int32, std::shared_ptr<NamesAndTypesList>> clickhouse_table_schemas_by_ids;
std::map<std::pair<Int32, Int32>, std::shared_ptr<ActionsDAG>> transform_dags_by_ids;
Expand Down
46 changes: 39 additions & 7 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace Setting
extern const SettingsMaxThreads max_threads;
extern const SettingsBool optimize_count_from_files;
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool object_storage_treat_key_related_wildcards_as_star;
}

namespace ErrorCodes
Expand Down Expand Up @@ -373,21 +374,35 @@ void StorageObjectStorage::read(
if (update_configuration_on_read)
configuration->update(object_storage, local_context);

if (partition_by && configuration->withPartitionWildcard())
auto config_clone = configuration->clone();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make a clone every time, but actually change only in specific cases.
May be make here a smart_ptr on original config, and make a clone only when required?


if (config_clone->withPartitionWildcard() || config_clone->withSnowflakeIdWildcard())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from a partitioned {} storage is not implemented yet",
getName());
/*
* Replace `_partition_id` and `_snowflake_id` wildcards with `*` so that any files that match this pattern can be retrieved.
*/
if (local_context->getSettingsRef()[Setting::object_storage_treat_key_related_wildcards_as_star])
{
config_clone->setPath(getPathWithKeyRelatedWildcardsReplacedWithStar(config_clone->getPath()));
}

if (config_clone->withPartitionWildcard() || config_clone->withSnowflakeIdWildcard())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from a globbed path {} on storage {} is not implemented yet,"
"except when the only globs are `_snowflake_id` and/or `_partition_id` with `object_storage_treat_key_related_wildcards_as_star=1`",
config_clone->getPath(), getName());
}
}

const auto read_from_format_info = configuration->prepareReadingFromFormat(
const auto read_from_format_info = config_clone->prepareReadingFromFormat(
object_storage, column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context);
const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef()[Setting::optimize_count_from_files];

auto read_step = std::make_unique<ReadFromObjectStorageStep>(
object_storage,
configuration,
config_clone,
fmt::format("{}({})", getName(), getStorageID().getFullTableName()),
column_names,
getVirtualsList(),
Expand Down Expand Up @@ -421,7 +436,7 @@ SinkToStoragePtr StorageObjectStorage::write(
configuration->getPath());
}

if (configuration->withGlobsIgnorePartitionWildcard())
if (configuration->withGlobsIgnorePartitionWildcardAndSnowflakeIdWildcard())
{
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"Path '{}' contains globs, so the table is in readonly mode",
Expand Down Expand Up @@ -650,6 +665,7 @@ StorageObjectStorage::Configuration::Configuration(const Configuration & other)
format = other.format;
compression_method = other.compression_method;
structure = other.structure;
storage_settings = other.storage_settings;
}

bool StorageObjectStorage::Configuration::withPartitionWildcard() const
Expand All @@ -659,13 +675,29 @@ bool StorageObjectStorage::Configuration::withPartitionWildcard() const
|| getNamespace().find(PARTITION_ID_WILDCARD) != String::npos;
}

bool StorageObjectStorage::Configuration::withSnowflakeIdWildcard() const
{
static const String PARTITION_ID_WILDCARD = "{_snowflake_id}";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SNOWFLAKE_ID_WILDCARD

return getPath().find(PARTITION_ID_WILDCARD) != String::npos
|| getNamespace().find(PARTITION_ID_WILDCARD) != String::npos;
}

bool StorageObjectStorage::Configuration::withGlobsIgnorePartitionWildcard() const
{
if (!withPartitionWildcard())
return withGlobs();
return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos;
}

bool StorageObjectStorage::Configuration::withGlobsIgnorePartitionWildcardAndSnowflakeIdWildcard() const
{
const auto path_without_partition_id_wildcard = PartitionedSink::replaceWildcards(getPath(), "");

const auto path_without_snowflake_id_wildcard = replaceSnowflakeIdWildcard(path_without_partition_id_wildcard, "");

return path_without_snowflake_id_wildcard.find_first_of("*?{") != std::string::npos;
}

bool StorageObjectStorage::Configuration::isPathWithGlobs() const
{
return getPath().find_first_of("*?{") != std::string::npos;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ class StorageObjectStorage::Configuration
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) = 0;

virtual bool withPartitionWildcard() const;
virtual bool withSnowflakeIdWildcard() const;
bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); }
virtual bool withGlobsIgnorePartitionWildcard() const;
virtual bool withGlobsIgnorePartitionWildcardAndSnowflakeIdWildcard() const;
virtual bool isPathWithGlobs() const;
virtual bool isNamespaceWithGlobs() const;
virtual std::string getPathWithoutGlobs() const;
Expand Down
Loading
Loading