Skip to content

Commit d8611fb

Browse files
committed
another cleaned attempt
1 parent 89e8c2a commit d8611fb

File tree

5 files changed

+99
-47
lines changed

5 files changed

+99
-47
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,7 @@ std::shared_ptr<NamesAndTypesList> IcebergMetadata::getInitialSchemaByPath(Conte
842842
}
843843

844844
SharedLockGuard lock(mutex);
845-
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
845+
auto version_if_outdated = getSchemaVersionByFileIfOutdated(Iceberg::makeAbsolutePath(table_location, data_path));
846846
return version_if_outdated.has_value() ? schema_processor.getClickhouseTableSchemaById(version_if_outdated.value()) : nullptr;
847847
}
848848

@@ -856,7 +856,7 @@ std::shared_ptr<const ActionsDAG> IcebergMetadata::getSchemaTransformer(ContextP
856856
}
857857

858858
SharedLockGuard lock(mutex);
859-
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
859+
auto version_if_outdated = getSchemaVersionByFileIfOutdated(Iceberg::makeAbsolutePath(table_location, data_path));
860860
return version_if_outdated.has_value()
861861
? schema_processor.getSchemaTransformationDagByIds(version_if_outdated.value(), relevant_snapshot_schema_id)
862862
: nullptr;
@@ -921,38 +921,38 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
921921
if (configuration_ptr == nullptr)
922922
throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired");
923923

924+
const String full_filename = Iceberg::makeAbsolutePath(table_location, filename);
925+
924926
auto create_fn = [&]()
925927
{
926928
auto base_table_uri_parsed = Iceberg::parseUri(table_location);
927-
auto file_uri_parsed = Iceberg::parseUri(filename);
929+
auto file_uri_parsed = Iceberg::parseUri(full_filename);
928930

929931
ObjectStoragePtr storage_to_use = object_storage;
930-
String key_or_path = filename;
932+
String key = filename;
931933

932934
if (!file_uri_parsed.scheme.empty() && !base_table_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme)
933935
{
934-
if (file_uri_parsed.authority == base_table_uri_parsed.authority)
935-
{
936-
// Same namespace as table_location -> use primary storage and strip leading '/'
937-
key_or_path = file_uri_parsed.path;
938-
if (!key_or_path.empty() && key_or_path.front() == '/')
939-
key_or_path.erase(0, 1);
940-
}
941-
else if (!file_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority.empty())
936+
key = file_uri_parsed.path;
937+
if (!key.empty() && key.front() == '/')
938+
key.erase(0, 1);
939+
940+
// Same storage type, but different namespace (location). Clone storage.
941+
if (file_uri_parsed.authority != base_table_uri_parsed.authority)
942942
{
943-
// Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
944-
std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage(
945-
file_uri_parsed.authority, local_context->getConfigRef(), configuration_ptr->getTypeName() + ".", local_context);
946-
storage_to_use = ObjectStoragePtr(cloned.release());
947-
948-
key_or_path = file_uri_parsed.path;
949-
if (!key_or_path.empty() && key_or_path.front() == '/')
950-
key_or_path.erase(0, 1);
943+
if (secondary_storages.contains(file_uri_parsed.authority))
944+
storage_to_use = secondary_storages[file_uri_parsed.authority];
945+
else
946+
{
947+
std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage(
948+
file_uri_parsed.authority, local_context->getConfigRef(), configuration_ptr->getTypeName() + ".", local_context);
949+
storage_to_use = ObjectStoragePtr(cloned.release());
950+
}
951951
}
952952
}
953953
// TODO: what if storage type is different?
954954

955-
StorageObjectStorage::ObjectInfo object_info(key_or_path, std::nullopt, filename);
955+
StorageObjectStorage::ObjectInfo object_info(key, std::nullopt, full_filename);
956956

957957
auto read_settings = local_context->getReadSettings();
958958
/// Do not utilize filesystem cache if more precise cache enabled
@@ -966,11 +966,11 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
966966

967967
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
968968
{
969-
const std::string manifest_file_name = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
969+
const std::string manifest_file_path = Iceberg::makeAbsolutePath(table_location, manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>());
970970
Int64 added_sequence_number = 0;
971971
if (format_version > 1)
972972
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet<Int64>();
973-
manifest_file_cache_keys.emplace_back(manifest_file_name, added_sequence_number);
973+
manifest_file_cache_keys.emplace_back(manifest_file_path, added_sequence_number);
974974
}
975975
/// We only return the list of {file name, seq number} for cache.
976976
/// Because ManifestList holds a list of ManifestFilePtr which consume much memory space.
@@ -980,7 +980,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
980980

981981
ManifestFileCacheKeys manifest_file_cache_keys;
982982
if (manifest_cache)
983-
manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys(IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn);
983+
manifest_file_cache_keys = manifest_cache->getOrSetManifestFileCacheKeys(IcebergMetadataFilesCache::getKey(configuration_ptr, full_filename), create_fn);
984984
else
985985
manifest_file_cache_keys = create_fn();
986986
return manifest_file_cache_keys;
@@ -1080,46 +1080,45 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
10801080
{
10811081
auto configuration_ptr = configuration.lock();
10821082

1083+
const String full_filename = Iceberg::makeAbsolutePath(table_location, filename);
1084+
10831085
auto create_fn = [&]()
10841086
{
10851087
auto base_table_uri_parsed = Iceberg::parseUri(table_location);
1086-
auto file_uri_parsed = Iceberg::parseUri(filename);
1088+
auto file_uri_parsed = Iceberg::parseUri(full_filename);
10871089

10881090
ObjectStoragePtr storage_to_use = object_storage;
1089-
String key_or_path = filename;
1091+
String key = filename;
10901092

10911093
if (!file_uri_parsed.scheme.empty() && !base_table_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme)
10921094
{
1093-
if (file_uri_parsed.authority == base_table_uri_parsed.authority)
1094-
{
1095-
// Same namespace as table_location -> use primary storage and strip leading '/'
1096-
key_or_path = file_uri_parsed.path;
1097-
if (!key_or_path.empty() && key_or_path.front() == '/')
1098-
key_or_path.erase(0, 1);
1099-
}
1100-
else if (!file_uri_parsed.scheme.empty() && file_uri_parsed.scheme == base_table_uri_parsed.scheme && !file_uri_parsed.authority.empty())
1095+
key = file_uri_parsed.path;
1096+
if (!key.empty() && key.front() == '/')
1097+
key.erase(0, 1);
1098+
1099+
if (file_uri_parsed.authority != base_table_uri_parsed.authority)
11011100
{
1102-
// Same scheme, different authority (e.g. another S3 bucket) -> clone storage for that authority
1103-
std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage(
1104-
file_uri_parsed.authority, local_context->getConfigRef(), configuration_ptr->getTypeName() + ".", local_context);
1105-
storage_to_use = ObjectStoragePtr(cloned.release());
1106-
1107-
key_or_path = file_uri_parsed.path;
1108-
if (!key_or_path.empty() && key_or_path.front() == '/')
1109-
key_or_path.erase(0, 1);
1101+
if (secondary_storages.contains(file_uri_parsed.authority))
1102+
storage_to_use = secondary_storages[file_uri_parsed.authority];
1103+
else
1104+
{
1105+
std::unique_ptr<IObjectStorage> cloned = object_storage->cloneObjectStorage(
1106+
file_uri_parsed.authority, local_context->getConfigRef(), configuration_ptr->getTypeName() + ".", local_context);
1107+
storage_to_use = ObjectStoragePtr(cloned.release());
1108+
}
11101109
}
11111110
}
11121111

1113-
ObjectInfo manifest_object_info(key_or_path, std::nullopt, filename);
1112+
ObjectInfo manifest_object_info(key, std::nullopt, full_filename);
11141113

11151114
auto read_settings = local_context->getReadSettings();
11161115
/// Do not utilize filesystem cache if more precise cache enabled
11171116
if (manifest_cache)
11181117
read_settings.enable_filesystem_cache = false;
11191118

11201119
auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings);
1121-
AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(local_context));
1122-
auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(manifest_file_deserializer, filename);
1120+
AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), full_filename, getFormatSettings(local_context));
1121+
auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(manifest_file_deserializer, full_filename);
11231122
schema_processor.addIcebergTableSchema(schema_object);
11241123
return std::make_shared<ManifestFileContent>(
11251124
manifest_file_deserializer,
@@ -1128,12 +1127,13 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
11281127
schema_object,
11291128
schema_processor,
11301129
inherited_sequence_number,
1130+
table_location,
11311131
local_context);
11321132
};
11331133

11341134
if (manifest_cache)
11351135
{
1136-
auto manifest_file = manifest_cache->getOrSetManifestFile(IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn);
1136+
auto manifest_file = manifest_cache->getOrSetManifestFile(IcebergMetadataFilesCache::getKey(configuration_ptr, full_filename), create_fn);
11371137
schema_processor.addIcebergTableSchema(manifest_file->getSchemaObject());
11381138
return manifest_file;
11391139
}

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ ManifestFileContent::ManifestFileContent(
126126
Poco::JSON::Object::Ptr schema_object_,
127127
const IcebergSchemaProcessor & schema_processor,
128128
Int64 inherited_sequence_number,
129+
std::string table_location_,
129130
DB::ContextPtr context)
130131
{
132+
this->table_location = std::move(table_location_);
131133
this->schema_id = schema_id_;
132134
this->schema_object = schema_object_;
133135

@@ -280,7 +282,7 @@ ManifestFileContent::ManifestFileContent(
280282
columns_infos[column_id].hyperrectangle.emplace(*left, true, *right, true);
281283
}
282284

283-
FileEntry file = FileEntry{DataFileEntry{file_path}};
285+
FileEntry file = FileEntry{DataFileEntry{Iceberg::makeAbsolutePath(table_location, file_path)}};
284286

285287
Int64 added_sequence_number = 0;
286288
if (format_version_ > 1)

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class ManifestFileContent
9292
Poco::JSON::Object::Ptr schema_object_,
9393
const DB::IcebergSchemaProcessor & schema_processor,
9494
Int64 inherited_sequence_number,
95+
std::string table_location_,
9596
DB::ContextPtr context);
9697

9798
const std::vector<ManifestFileEntry> & getFiles() const;
@@ -121,6 +122,8 @@ class ManifestFileContent
121122

122123
std::set<Int32> column_ids_which_have_bounds;
123124

125+
std::string table_location;
126+
124127
};
125128

126129
using ManifestFilePtr = std::shared_ptr<const ManifestFileContent>;

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ std::string extractStorageType(const std::string & path)
7373
return "";
7474
}
7575

76+
bool isRelativePath(const std::string & path)
77+
{
78+
if (path.empty())
79+
return true;
80+
81+
// Non-relative if it has a scheme (e.g., s3://, https://, abfs://, file://, etc.)
82+
if (!extractStorageType(path).empty())
83+
return false;
84+
85+
// Non-relative if it starts with '/' (absolute POSIX path)
86+
if (!path.empty() && path.front() == '/')
87+
return false;
88+
89+
return true;
90+
}
91+
92+
7693
UriParts parseUri(const std::string & uri)
7794
{
7895
UriParts parts;
@@ -123,6 +140,32 @@ UriParts parseUri(const std::string & uri)
123140
return parts;
124141
}
125142

143+
std::string makeAbsolutePath(const std::string & table_location, const std::string & path)
144+
{
145+
if (!isRelativePath(path))
146+
return path;
147+
148+
auto base = parseUri(table_location);
149+
150+
std::string base_dir = base.path.empty() ? std::string("/") : base.path;
151+
if (!base_dir.empty() && base_dir.back() != '/')
152+
base_dir.push_back('/');
153+
154+
std::string rel = path;
155+
if (!rel.empty() && rel.front() == '/')
156+
rel.erase(0, 1);
157+
158+
std::string abs_path = base_dir + rel;
159+
if (abs_path.empty() || abs_path.front() != '/')
160+
abs_path.insert(abs_path.begin(), '/');
161+
162+
if (!base.scheme.empty())
163+
return base.scheme + "://" + base.authority + abs_path;
164+
165+
return std::string("file://") + abs_path;
166+
}
167+
168+
126169

127170

128171
}

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
namespace Iceberg
1111
{
1212

13+
bool isRelativePath(const std::string & path);
14+
1315
// Returns normalized storage type from a URI/path:
1416
// - With storage specification: lowercased and normalized (e.g. s3:// -> "s3")
1517
// - Absolute POSIX path (starts with '/'): "file"
@@ -30,6 +32,8 @@ struct UriParts
3032

3133
UriParts parseUri(const std::string & uri);
3234

35+
std::string makeAbsolutePath(const std::string & table_location, const std::string & path);
36+
3337
}
3438

3539
#endif

0 commit comments

Comments
 (0)