From e56efc5c65de8146d422ed470d3b4a7f9d8b4bd1 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Mon, 1 Jun 2026 19:25:57 +0200 Subject: [PATCH 1/2] Port: Iceberg support for external paths in tables (PR #90740) Port https://github.com/ClickHouse/ClickHouse/pull/90740 to antalya-26.3. Iceberg tables may now reference files (data files, manifests, manifest lists) located outside the table location, including on a different object storage backend. Metadata paths are treated as absolute URIs and resolved at read/delete time via new object-storage helpers (`SchemeAuthorityKey`, `resolveObjectStorageForPath`, `SecondaryStorages`), with the cluster-function protocol bumped to `DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH`. Adds the `s3_propagate_credentials_to_other_storages` setting to optionally copy base S3 credentials when creating secondary storages. Notes on porting to this branch: - Skipped the `ExpireSnapshotsExecute`, `RemoveOrphanFilesExecute` and `SnapshotFilesTraversal` files: this functionality does not exist in `antalya-26.3`. The `executeCommand` branch using them was dropped and the existing `expireSnapshots` implementation is kept. - Dropped the `S3UriStyle uri_style` `S3::URI` parameter (from an unrelated upstream change not in this branch); only `enable_url_encoding` is added. - Dropped the upstream-only `_path` virtual column `storage_id` field, which is not present in `VirtualsForFileLikeStorage` here. - Folded the metadata-path preference into the existing `getFileIdentifier` helper in the stable task distributor rather than the upstream inline call sites. - Updated `Mutations.cpp` (`expireSnapshots`) callers for the new `getManifestList` / `getManifestFileEntriesHandle` signatures. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Core/ProtocolDefines.h | 3 +- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + src/Databases/DataLake/DatabaseDataLake.cpp | 4 +- src/IO/S3/URI.cpp | 14 +- src/IO/S3/URI.h | 3 +- src/Interpreters/ClusterFunctionReadTask.cpp | 6 +- src/Interpreters/IcebergMetadataLog.cpp | 6 +- .../Common/AvroForIcebergDeserializer.cpp | 9 +- .../DataLakes/IDataLakeMetadata.h | 3 + .../DataLakes/Iceberg/Compaction.cpp | 100 +++- .../DataLakes/Iceberg/Compaction.h | 2 + .../Iceberg/IcebergDataObjectInfo.cpp | 54 +- .../DataLakes/Iceberg/IcebergDataObjectInfo.h | 36 +- .../DataLakes/Iceberg/IcebergIterator.cpp | 62 ++- .../DataLakes/Iceberg/IcebergIterator.h | 11 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 29 +- .../DataLakes/Iceberg/IcebergMetadata.h | 4 + .../DataLakes/Iceberg/IcebergPath.cpp | 6 + .../DataLakes/Iceberg/IcebergPath.h | 6 + .../DataLakes/Iceberg/IcebergWrites.cpp | 1 + .../Iceberg/ManifestFileIterator.cpp | 3 +- .../DataLakes/Iceberg/Mutations.cpp | 10 +- .../DataLakes/Iceberg/Mutations.h | 1 + .../Iceberg/PositionDeleteTransform.cpp | 19 +- .../Iceberg/PositionDeleteTransform.h | 22 +- .../Iceberg/StatelessMetadataFileGetter.cpp | 36 +- .../Iceberg/StatelessMetadataFileGetter.h | 10 +- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 30 +- .../ObjectStorage/DataLakes/Iceberg/Utils.h | 15 +- .../StorageObjectStorageSource.cpp | 75 ++- .../StorageObjectStorageSource.h | 10 + ...rageObjectStorageStableTaskDistributor.cpp | 6 +- src/Storages/ObjectStorage/Utils.cpp | 524 ++++++++++++++++++ src/Storages/ObjectStorage/Utils.h | 50 ++ src/Storages/StorageURL.cpp | 6 +- .../__init__.py | 0 .../configs/config.d/cluster.xml | 20 + .../configs/config.d/named_collections.xml | 15 + .../configs/config.d/query_log.xml | 6 + .../configs/users.d/users.xml | 9 + .../test_storage_iceberg_multistorage/test.py | 426 ++++++++++++++ .../test_array_evolved_with_struct.py | 2 +- .../metadata/v1.metadata.json | 4 +- .../metadata/v1.metadata.json | 4 +- .../metadata/v1.metadata.json | 4 +- 46 files changed, 1538 insertions(+), 132 deletions(-) create mode 100644 tests/integration/test_storage_iceberg_multistorage/__init__.py create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/test.py diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index ae7cff5d0e38..0c670bcc3985 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -39,7 +39,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_META static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS = 6; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 7; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH; static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 6cf27fbfabb8..b87a4b4a1eb2 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -591,6 +591,9 @@ Use multiple threads for azure multipart upload. )", 0) \ DECLARE(Bool, s3_throw_on_zero_files_match, false, R"( Throw an error, when ListObjects request cannot match any files +)", 0) \ + DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"( +Credentials from the base storage are always propagated to secondary object storages when endpoints match. When this setting is enabled, credentials are also propagated when endpoints differ, including less secure connections (for example, from `https` to plain `http`). )", 0) \ DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"( Throw an error if matched zero files according to glob expansion rules. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 034f4a03b9e2..4261d0e5a0ee 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -46,6 +46,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() }); addSettingsChanges(settings_changes_history, "26.3", { + {"s3_propagate_credentials_to_other_storages", false, false, "New setting"}, {"http_max_fields", 1000000, 1000, "Reduce default to limit pre-authentication memory usage by HTTP connections."}, {"http_max_field_name_size", 131072, 4096, "Reduce default to limit pre-authentication memory usage by HTTP connections."}, {"http_max_request_header_size", 0, 10485760, "New setting to limit total HTTP request header size before authentication."}, diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index ad4adcdb8432..d21c75e28ff8 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -575,7 +575,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con LOG_DEBUG(log, "Has no credentials"); } } - else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()) + else if (!lightweight && table_metadata.requiresCredentials() + && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end() + && table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local) { throw Exception( ErrorCodes::BAD_ARGUMENTS, diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index 5ab8e5cfd724..7a6bedfc2a76 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -17,10 +17,12 @@ namespace DB struct URIConverter { - static void modifyURI(Poco::URI & uri, std::unordered_map mapper) + static void modifyURI(Poco::URI & uri, std::unordered_map mapper, bool enable_url_encoding = true) { Macros macros({{"bucket", uri.getHost()}}); - uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery()); + uri = macros.expand(mapper[uri.getScheme()]).empty() + ? uri + : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding); } }; @@ -32,7 +34,7 @@ namespace ErrorCodes namespace S3 { -URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters) +URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters, bool enable_url_encoding) { /// Case when bucket name represented in domain name of S3 URL. /// E.g. (https://bucket-name.s3.region.amazonaws.com/key) @@ -54,9 +56,9 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre else uri_str = uri_; - uri = Poco::URI(uri_str); + uri = Poco::URI(uri_str, enable_url_encoding); /// Keep a copy of how Poco parsed the original string before any mapping - Poco::URI original_uri(uri_str); + Poco::URI original_uri(uri_str, enable_url_encoding); bool looks_like_presigned = false; for (const auto & [qk, qv] : original_uri.getQueryParameters()) { @@ -101,7 +103,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre } if (!mapper.empty()) - URIConverter::modifyURI(uri, mapper); + URIConverter::modifyURI(uri, mapper, enable_url_encoding); } storage_name = "S3"; diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index fd45baa39774..4524d4eac946 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -39,7 +39,8 @@ struct URI explicit URI( const std::string & uri_, bool allow_archive_path_syntax = false, - bool keep_presigned_query_parameters = true); + bool keep_presigned_query_parameters = true, + bool enable_url_encoding = true); void addRegionToURI(const std::string & region); static void validateBucket(const std::string & bucket, const Poco::URI & uri); diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index fdacbda9270b..3989a86c5b4d 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o data_lake_metadata = object->data_lake_metadata.value(); #if USE_AVRO - if (std::dynamic_pointer_cast(object)) - { - iceberg_info = dynamic_cast(*object).info; - } + if (auto iceberg_object = std::dynamic_pointer_cast(object)) + iceberg_info = iceberg_object->info; #endif file_meta_info = object->relative_path_with_metadata.file_meta_info; diff --git a/src/Interpreters/IcebergMetadataLog.cpp b/src/Interpreters/IcebergMetadataLog.cpp index 2eece43962ed..ddeef5c5476b 100644 --- a/src/Interpreters/IcebergMetadataLog.cpp +++ b/src/Interpreters/IcebergMetadataLog.cpp @@ -102,12 +102,16 @@ void insertRowToLogTable( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg metadata log table is not configured"); } + String normalized_table_path = table_path; + while (normalized_table_path.size() > 1 && normalized_table_path.back() == '/') + normalized_table_path.pop_back(); + iceberg_metadata_log->add( DB::IcebergMetadataLogElement{ .current_time = spec.tv_sec, .query_id = local_context->getCurrentQueryId(), .content_type = row_log_level, - .table_path = table_path, + .table_path = normalized_table_path, .file_path = file_path.serialize(), .metadata_content = get_row(), .row_in_file = row_in_file, diff --git a/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp b/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp index ec0814406048..2b585de49406 100644 --- a/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp @@ -158,8 +158,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE } } - - const auto file_path_key = IcebergPathFromMetadata::deserialize( + const auto file_path_from_metadata = IcebergPathFromMetadata::deserialize( getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet()); /// NOTE: This is weird, because in manifest file partition looks like this: /// { @@ -248,7 +247,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE case FileContentType::DATA: { return std::make_shared( FileContentType::DATA, - file_path_key, + file_path_from_metadata, row_index, status, sequence_number, @@ -295,7 +294,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE } return std::make_shared( FileContentType::POSITION_DELETE, - file_path_key, + file_path_from_metadata, row_index, status, sequence_number, @@ -326,7 +325,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE c_data_file_equality_ids); return std::make_shared( FileContentType::EQUALITY_DELETE, - file_path_key, + file_path_from_metadata, row_index, status, sequence_number, diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 92152ce75c2e..85c91ba5de19 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -122,6 +122,9 @@ class IDataLakeMetadata : boost::noncopyable virtual bool operator==(const IDataLakeMetadata & other) const = 0; + /// Returns the full table location URI (e.g. `s3a://bucket/prefix/table/`) + virtual std::string getTableLocation() const { return {}; } + /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp index 4e0c2380b66d..e687811f4532 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -72,6 +73,9 @@ struct Plan std::unordered_map> manifest_list_to_manifest_files; std::unordered_map>> snapshot_id_to_data_files; std::unordered_map> path_to_data_file; + /// Raw paths of every file referenced by the snapshots being compacted, used at cleanup + /// time to also remove files that live outside the base object_storage. + std::unordered_set referenced_file_paths; FileNamesGenerator generator; Poco::JSON::Object::Ptr initial_metadata_object; @@ -113,6 +117,7 @@ Plan getPlan( const DataLakeStorageSettings & data_lake_settings, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage, + SecondaryStorages & secondary_storages, const String & write_format, ContextPtr context, CompressionMethod compression_method) @@ -155,14 +160,16 @@ Plan getPlan( std::unordered_map> manifest_files; for (const auto & snapshot : snapshots_info) { - auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log); + plan.referenced_file_paths.insert(snapshot.manifest_list_path); + auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log, secondary_storages); for (const auto & manifest_file : manifest_list) { plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path); if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path)) plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id; + plan.referenced_file_paths.insert(manifest_file.manifest_file_path); auto files_handle = getManifestFileEntriesHandle( - object_storage, persistent_table_components, context, log, manifest_file, static_cast(current_schema_id)); + object_storage, persistent_table_components, context, log, manifest_file, static_cast(current_schema_id), secondary_storages); if (!manifest_files.contains(manifest_file.manifest_file_path)) { @@ -171,28 +178,39 @@ Plan getPlan( } manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path); for (const auto & pos_delete_file : files_handle.getFilesWithoutDeleted(FileContentType::POSITION_DELETE)) + { all_positional_delete_files.push_back(pos_delete_file); + plan.referenced_file_paths.insert(pos_delete_file->parsed_entry->file_path_key); + } for (const auto & data_file : files_handle.getFilesWithoutDeleted(FileContentType::DATA)) { + plan.referenced_file_paths.insert(data_file->parsed_entry->file_path_key); auto partition_index = plan.partition_encoder.encodePartition(data_file->parsed_entry->partition_key_value); if (plan.partitions.size() <= partition_index) plan.partitions.push_back({}); + const auto & raw_metadata_path = data_file->parsed_entry->file_path_key.serialize(); + auto [resolved_storage, resolved_key] = resolveObjectStorageForPath( + persistent_table_components.table_location, + raw_metadata_path, object_storage, secondary_storages, context, + persistent_table_components.path_resolver); + IcebergDataObjectInfoPtr data_object_info = std::make_shared( - data_file, persistent_table_components.path_resolver.resolve(data_file->parsed_entry->file_path_key), 0); + data_file, raw_metadata_path, 0, resolved_storage, resolved_key); std::shared_ptr data_file_ptr; - if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path)) + auto path_identifier = Iceberg::IcebergPathFromMetadata::makeStorageIdentity(resolved_storage, resolved_key); + if (!plan.path_to_data_file.contains(path_identifier)) { data_file_ptr = std::make_shared(DataFilePlan{ .data_object_info = data_object_info, .manifest_list = manifest_files[manifest_file.manifest_file_path], .patched_path = plan.generator.generateDataFileName()}); - plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr; + plan.path_to_data_file[path_identifier] = data_file_ptr; } else { - data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path]; + data_file_ptr = plan.path_to_data_file[path_identifier]; } plan.partitions[partition_index].push_back(data_file_ptr); plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back()); @@ -211,7 +229,7 @@ Plan getPlan( { if (data_file->data_object_info->info.sequence_number <= delete_file->sequence_number) data_file->data_object_info->addPositionDeleteObject( - delete_file, persistent_table_components.path_resolver.resolve(delete_file->parsed_entry->file_path_key)); + delete_file, delete_file->parsed_entry->file_path_key.serialize()); } } plan.history = std::move(snapshots_info); @@ -227,7 +245,8 @@ static void writeDataFiles( const std::optional & format_settings, ContextPtr context, const String & write_format, - CompressionMethod write_compression_method) + CompressionMethod write_compression_method, + std::shared_ptr secondary_storages) { for (auto & [_, data_file] : initial_plan.path_to_data_file) { @@ -238,10 +257,15 @@ static void writeDataFiles( format_settings, // todo make compaction using same FormatParserSharedResources std::make_shared(context->getSettingsRef(), 1), - context); + context, + path_resolver, + secondary_storages); - RelativePathWithMetadata relative_path(data_file->data_object_info->getPath()); - auto read_buffer = createReadBuffer(relative_path, object_storage, context, getLogger("IcebergCompaction")); + ObjectStoragePtr storage_to_use = data_file->data_object_info->getResolvedStorage(); + if (!storage_to_use) + storage_to_use = object_storage; + RelativePathWithMetadata object_info(data_file->data_object_info->getPath()); + auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction")); const Settings & settings = context->getSettingsRef(); auto parser_shared_resources = std::make_shared( @@ -395,6 +419,7 @@ void writeMetadataFiles( { manifest_entry->patched_path = plan.generator.generateManifestEntryName(); manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path; + auto buffer_manifest_entry = object_storage->writeObject( StoredObject(path_resolver.resolve(manifest_entry->patched_path)), WriteMode::Rewrite, @@ -500,22 +525,47 @@ void writeMetadataFiles( } } -std::vector getOldFiles(ObjectStoragePtr object_storage, const String & table_path) +/// Files to delete after compaction: a base-storage directory listing under `metadata/` and +/// `data/` (covers historical metadata.json and any orphan files on the base storage), plus +/// any paths from the compacted snapshots that resolve to a secondary storage. +std::vector> getOldFiles( + ObjectStoragePtr object_storage, + SecondaryStorages & secondary_storages, + ContextPtr context, + const PersistentTableComponents & persistent_table_components, + const Plan & plan) { - auto metadata_files = listFiles(*object_storage, table_path, "metadata", ""); - auto data_files = listFiles(*object_storage, table_path, "data", ""); + std::vector> result; + + for (auto && file : listFiles(*object_storage, persistent_table_components.table_path, "metadata", "")) + result.emplace_back(object_storage, std::move(file)); + for (auto && file : listFiles(*object_storage, persistent_table_components.table_path, "data", "")) + result.emplace_back(object_storage, std::move(file)); + + for (const auto & raw_path : plan.referenced_file_paths) + { + auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, + raw_path.serialize(), + object_storage, + secondary_storages, + context, + persistent_table_components.path_resolver); - for (auto && data_file : data_files) - metadata_files.push_back(data_file); + if (storage_to_use.get() != object_storage.get()) + result.emplace_back(std::move(storage_to_use), std::move(key_in_storage)); + } - return metadata_files; + return result; } -void clearOldFiles(ObjectStoragePtr object_storage, const std::vector & old_files) +void clearOldFiles(const std::vector> & old_files) { - for (const auto & metadata_file : old_files) + auto log = getLogger("IcebergCompaction"); + for (const auto & [storage, key] : old_files) { - object_storage->removeObjectIfExists(StoredObject(metadata_file)); + LOG_DEBUG(log, "Removing old file during compaction: storage={}, key={}", storage->getDescription(), key); + storage->removeObjectIfExists(StoredObject(key)); } } @@ -523,6 +573,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage_, + std::shared_ptr secondary_storages_, const DataLakeStorageSettings & data_lake_settings, const std::optional & format_settings_, SharedHeader sample_block_, @@ -534,12 +585,14 @@ void compactIcebergTable( data_lake_settings, persistent_table_components, object_storage_, + *secondary_storages_, write_format, context_, persistent_table_components.metadata_compression_method); if (plan.need_optimize) { - auto old_files = getOldFiles(object_storage_, persistent_table_components.table_path); + auto old_files = getOldFiles( + object_storage_, *secondary_storages_, context_, persistent_table_components, plan); writeDataFiles( plan, sample_block_, @@ -548,9 +601,10 @@ void compactIcebergTable( format_settings_, context_, write_format, - persistent_table_components.metadata_compression_method); + persistent_table_components.metadata_compression_method, + secondary_storages_); writeMetadataFiles(plan, persistent_table_components.path_resolver, object_storage_, context_, sample_block_, write_format, persistent_table_components.table_path); - clearOldFiles(object_storage_, old_files); + clearOldFiles(old_files); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h index 0916002f99f3..302bc3d30e69 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB::Iceberg @@ -15,6 +16,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, DB::ObjectStoragePtr object_storage_, + std::shared_ptr secondary_storages_, const DataLakeStorageSettings & data_lake_settings, const std::optional & format_settings_, DB::SharedHeader sample_block_, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp index d53caa5ad7f8..e429a376ac67 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp @@ -1,3 +1,4 @@ +#include #include "config.h" #include @@ -9,15 +10,18 @@ #include #include +#include #include #include #include +#include namespace DB::ErrorCodes { extern const int NOT_IMPLEMENTED; extern const int UNKNOWN_PROTOCOL; +extern const int PROTOCOL_VERSION_MISMATCH; } @@ -34,10 +38,11 @@ extern const SettingsBool use_roaring_bitmap_iceberg_positional_deletes; #if USE_AVRO IcebergDataObjectInfo::IcebergDataObjectInfo( - Iceberg::ProcessedManifestFileEntryPtr data_manifest_file_entry_, const String & resolved_storage_path_, Int32 schema_id_relevant_to_iterator_) - : ObjectInfo(RelativePathWithMetadata(resolved_storage_path_)) + Iceberg::ProcessedManifestFileEntryPtr data_manifest_file_entry_, const String & metadata_path_, Int32 schema_id_relevant_to_iterator_, ObjectStoragePtr resolved_storage_, const String & resolved_key_) + : ObjectInfo(RelativePathWithMetadata(resolved_key_.empty() ? metadata_path_ : resolved_key_)) , info{ data_manifest_file_entry_->parsed_entry->file_path_key, + metadata_path_, data_manifest_file_entry_->resolved_schema_id, schema_id_relevant_to_iterator_, data_manifest_file_entry_->sequence_number, @@ -46,7 +51,11 @@ IcebergDataObjectInfo::IcebergDataObjectInfo( /* equality_deletes_objects */ {}, data_manifest_file_entry_->parsed_entry->record_count, data_manifest_file_entry_->parsed_entry->file_size_in_bytes} + , resolved_storage(std::move(resolved_storage_)) { + /// resolved_storage and resolved_key must be provided together or neither must be provided + /// (default-constructed, meaning the path has not been resolved yet). + chassert(resolved_key_.empty() == (resolved_storage == nullptr)); } IcebergDataObjectInfo::IcebergDataObjectInfo(const RelativePathWithMetadata & path_) @@ -59,13 +68,15 @@ std::shared_ptr IcebergDataObjectInfo::getPositionDeleteTransf const SharedHeader & header, const std::optional & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, - ContextPtr context_) + ContextPtr context_, + const Iceberg::IcebergPathResolver & path_resolver, + std::shared_ptr secondary_storages) { IcebergDataObjectInfoPtr self = shared_from_this(); if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value) - return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); + return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_, path_resolver, secondary_storages); else - return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); + return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_, path_resolver, secondary_storages); } void IcebergDataObjectInfo::addPositionDeleteObject(Iceberg::ProcessedManifestFileEntryPtr position_delete_object, const String & resolved_storage_path) @@ -95,7 +106,30 @@ void IcebergDataObjectInfo::addEqualityDeleteObject(const Iceberg::ProcessedMani void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuffer & out, size_t protocol_version) const { checkVersion(protocol_version); + + if (requires_external_storage && protocol_version < DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + throw Exception( + ErrorCodes::PROTOCOL_VERSION_MISMATCH, + "Iceberg data file '{}' is outside of the table location, " + "worker needs to have protocol version >= {}, but has {}. ", + data_object_file_metadata_path, + DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH, + protocol_version); + } + + auto path_for_protocol = [&](const String & path) -> String + { + if (protocol_version < DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + return SchemeAuthorityKey(path).key; + return path; + }; + writeStringBinary(data_object_file_path_key.serialize(), out); + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + writeStringBinary(data_object_file_metadata_path, out); + } writeVarInt(underlying_format_read_schema_id, out); writeVarInt(schema_id_relevant_to_iterator, out); writeVarInt(sequence_number, out); @@ -104,12 +138,12 @@ void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuf writeVarUInt(position_deletes_objects.size(), out); for (const auto & pos_delete_obj : position_deletes_objects) { - writeStringBinary(pos_delete_obj.file_path, out); + writeStringBinary(path_for_protocol(pos_delete_obj.file_path), out); writeStringBinary(pos_delete_obj.file_format, out); if (pos_delete_obj.reference_data_file_path.has_value()) { writeVarUInt(1, out); - writeStringBinary(pos_delete_obj.reference_data_file_path.value(), out); + writeStringBinary(path_for_protocol(pos_delete_obj.reference_data_file_path.value()), out); } else { @@ -121,7 +155,7 @@ void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuf writeVarUInt(equality_deletes_objects.size(), out); for (const auto & eq_delete_obj : equality_deletes_objects) { - writeStringBinary(eq_delete_obj.file_path, out); + writeStringBinary(path_for_protocol(eq_delete_obj.file_path), out); writeStringBinary(eq_delete_obj.file_format, out); writeVarInt(eq_delete_obj.schema_id, out); if (eq_delete_obj.equality_ids.has_value()) @@ -170,6 +204,10 @@ void IcebergObjectSerializableInfo::deserializeForClusterFunctionProtocol(ReadBu readStringBinary(raw_path, in); data_object_file_path_key = IcebergPathFromMetadata::deserialize(std::move(raw_path)); } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + readStringBinary(data_object_file_metadata_path, in); + } readVarInt(underlying_format_read_schema_id, in); readVarInt(schema_id_relevant_to_iterator, in); readVarInt(sequence_number, in); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h index b7f7f7450abe..7152331c1e89 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h @@ -17,6 +17,10 @@ namespace DB::Iceberg struct IcebergObjectSerializableInfo { IcebergPathFromMetadata data_object_file_path_key; + /// Raw path string as written in the Iceberg manifest, preserved as-is (may be a full URI like + /// `s3://bucket/...` or a relative path). Used for the `_path` virtual column and as a stable + /// task identifier. Not a canonicalised storage key — see `IcebergPathResolver::resolve` for that. + String data_object_file_metadata_path; Int32 underlying_format_read_schema_id; Int32 schema_id_relevant_to_iterator; Int64 sequence_number; @@ -26,6 +30,9 @@ struct IcebergObjectSerializableInfo std::optional record_count; std::optional file_size_in_bytes; + /// Set to true by the coordinator when the file is outside of the table location + bool requires_external_storage = false; + void serializeForClusterFunctionProtocol(WriteBuffer & out, size_t protocol_version) const; void deserializeForClusterFunctionProtocol(ReadBuffer & in, size_t protocol_version); @@ -38,6 +45,7 @@ struct IcebergObjectSerializableInfo #if USE_AVRO #include +#include #include @@ -50,7 +58,14 @@ struct IcebergDataObjectInfo : public ObjectInfo, std::enable_shared_from_this & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, - ContextPtr context_); + ContextPtr context_, + const Iceberg::IcebergPathResolver & path_resolver, + std::shared_ptr secondary_storages); std::optional getFileFormat() const override { return info.file_format; } void addPositionDeleteObject(Iceberg::ProcessedManifestFileEntryPtr position_delete_object, const String & resolved_storage_path); + std::optional getMetadataPath() const + { + if (info.data_object_file_metadata_path.empty()) + return std::nullopt; + return info.data_object_file_metadata_path; + } + + ObjectStoragePtr getResolvedStorage() const { return resolved_storage; } + + void setResolvedStorage(ObjectStoragePtr storage) { resolved_storage = std::move(storage); } + void addEqualityDeleteObject(const Iceberg::ProcessedManifestFileEntryPtr & equality_delete_object, const String & resolved_storage_path); Iceberg::IcebergObjectSerializableInfo info; + +private: + /// For files located in a different storage than the table's main storage + ObjectStoragePtr resolved_storage; }; using IcebergDataObjectInfoPtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index 21b02a6a0ba4..5bb4efe64665 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -183,7 +184,8 @@ std::optional SingleThreadIcebergKeysIterator::ne local_context, log, manifest_list_entry.manifest_file_path, - manifest_list_entry.manifest_file_byte_size); + manifest_list_entry.manifest_file_byte_size, + *secondary_storages); current_manifest_file_iterator = Iceberg::ManifestFileIterator::create( manifest_file_cacheable_part.deserializer, @@ -211,7 +213,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( const ActionsDAG * filter_dag_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : object_storage(object_storage_) , filter_dag( [&]() -> std::shared_ptr @@ -234,6 +237,7 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( , data_snapshot(data_snapshot_) , persistent_components(persistent_components_) , log(getLogger("IcebergIterator")) + , secondary_storages(secondary_storages_) , manifest_file_content_type(manifest_file_content_type_) { } @@ -245,10 +249,12 @@ IcebergIterator::IcebergIterator( IDataLakeMetadata::FileProgressCallback callback_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : logger(getLogger("IcebergIterator")) , filter_dag(filter_dag_ ? std::make_shared(filter_dag_->clone()) : nullptr) , object_storage(std::move(object_storage_)) + , local_context(local_context_) , table_state_snapshot(table_snapshot_) , persistent_components(persistent_components_) , data_files_iterator( @@ -258,7 +264,8 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , deletes_iterator( object_storage, local_context_, @@ -266,11 +273,13 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , blocking_queue(100) , producer_task(std::nullopt) , callback(std::move(callback_)) , table_schema_id(table_snapshot_->schema_id) + , secondary_storages(secondary_storages_) { auto delete_file = deletes_iterator.next(); while (delete_file.has_value()) @@ -329,11 +338,17 @@ ObjectInfoPtr IcebergIterator::next(size_t) Iceberg::ProcessedManifestFileEntryPtr manifest_file_entry; if (blocking_queue.pop(manifest_file_entry)) { - IcebergDataObjectInfoPtr object_info - = std::make_shared( - manifest_file_entry, - persistent_components.path_resolver.resolve(manifest_file_entry->parsed_entry->file_path_key), - table_state_snapshot->schema_id); + const auto & raw_metadata_path = manifest_file_entry->parsed_entry->file_path_key.serialize(); + auto [storage_to_use, resolved_key] = resolveObjectStorageForPath( + persistent_components.table_location, raw_metadata_path, + object_storage, *secondary_storages, local_context, + persistent_components.path_resolver); + + IcebergDataObjectInfoPtr object_info = std::make_shared( + manifest_file_entry, raw_metadata_path, table_state_snapshot->schema_id, storage_to_use, resolved_key); + + object_info->info.requires_external_storage = (storage_to_use != object_storage); + for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, /* is_equality_delete */ false, logger)) { @@ -369,7 +384,7 @@ ObjectInfoPtr IcebergIterator::next(size_t) lower.has_value() ? lower->serialize() : "[no lower bound]", upper.has_value() ? upper->serialize() : "[no upper bound]"); object_info->addPositionDeleteObject( - position_delete, persistent_components.path_resolver.resolve(position_delete->parsed_entry->file_path_key)); + position_delete, position_delete->parsed_entry->file_path_key.serialize()); } } @@ -386,7 +401,7 @@ ObjectInfoPtr IcebergIterator::next(size_t) defineDeletesSpan(manifest_file_entry, equality_deletes_files, /* is_equality_delete */ true, logger)) { object_info->addEqualityDeleteObject( - equality_delete, persistent_components.path_resolver.resolve(equality_delete->parsed_entry->file_path_key)); + equality_delete, equality_delete->parsed_entry->file_path_key.serialize()); } if (!object_info->info.equality_deletes_objects.empty()) @@ -404,6 +419,29 @@ ObjectInfoPtr IcebergIterator::next(size_t) manifest_file_entry->resolved_schema_id, /// file's schema id to interpret value_bounds bytes manifest_file_entry->parsed_entry->columns_infos, manifest_file_entry->parsed_entry->value_bounds)); + + if (!object_info->info.requires_external_storage) + { + auto resolves_to_external_storage = [&](const String & file_path) + { + auto [del_storage, del_key] = resolveObjectStorageForPath( + persistent_components.table_location, file_path, object_storage, *secondary_storages, local_context, + persistent_components.path_resolver); + return del_storage != object_storage; + }; + auto any_external = [&](const auto & delete_objects) + { + for (const auto & del : delete_objects) + if (resolves_to_external_storage(del.file_path)) + return true; + return false; + }; + + object_info->info.requires_external_storage = + any_external(object_info->info.position_deletes_objects) + || any_external(object_info->info.equality_deletes_objects); + } + ProfileEvents::increment(ProfileEvents::IcebergMetadataReturnedObjectInfos); return object_info; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 95e372ccfb5c..551f3163701a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -26,6 +26,7 @@ #include #include #include +#include namespace DB { @@ -43,7 +44,8 @@ class SingleThreadIcebergKeysIterator const ActionsDAG * filter_dag_, TableStateSnapshotPtr table_snapshot_, IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components); + PersistentTableComponents persistent_components, + std::shared_ptr secondary_storages_); std::optional next(); @@ -56,6 +58,8 @@ class SingleThreadIcebergKeysIterator PersistentTableComponents persistent_components; LoggerPtr log; + std::shared_ptr secondary_storages; + size_t manifest_file_index = 0; Iceberg::ManifestIteratorPtr current_manifest_file_iterator; @@ -74,7 +78,8 @@ class IcebergIterator : public IObjectIterator IDataLakeMetadata::FileProgressCallback callback_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - Iceberg::PersistentTableComponents persistent_components_); + Iceberg::PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_); ObjectInfoPtr next(size_t) override; @@ -85,6 +90,7 @@ class IcebergIterator : public IObjectIterator LoggerPtr logger; std::shared_ptr filter_dag; ObjectStoragePtr object_storage; + ContextPtr local_context; const Iceberg::TableStateSnapshotPtr table_state_snapshot; Iceberg::PersistentTableComponents persistent_components; Iceberg::SingleThreadIcebergKeysIterator data_files_iterator; @@ -97,6 +103,7 @@ class IcebergIterator : public IObjectIterator std::exception_ptr exception; std::mutex exception_mutex; Int32 table_schema_id; + std::shared_ptr secondary_storages; // Sometimes data or manifests can be located on another storage }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 7e1dbdec3a43..95daff72ea11 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -266,6 +266,7 @@ IcebergMetadata::IcebergMetadata( IcebergMetadataFilesCachePtr cache_ptr) : log(getLogger("IcebergMetadata")) , object_storage(std::move(object_storage_)) + , secondary_storages(std::make_shared()) , persistent_components(initializePersistentTableComponents(configuration_, cache_ptr, context_)) , data_lake_settings(configuration_->getDataLakeSettings()) , write_format(configuration_->getFormat()) @@ -326,7 +327,8 @@ void IcebergMetadata::backgroundMetadataPrefetcherThread() auto manifest_file_ptr = Iceberg::getManifestFile( object_storage, persistent_components, ctx, log, entry.manifest_file_path, - entry.manifest_file_byte_size); + entry.manifest_file_byte_size, + *secondary_storages); } } @@ -459,7 +461,7 @@ IcebergDataSnapshotPtr IcebergMetadata::createIcebergDataSnapshotFromSnapshotJSO return std::make_shared( - getManifestList(object_storage, persistent_components, local_context, manifest_list_file_path, log), + getManifestList(object_storage, persistent_components, local_context, manifest_list_file_path, log, *secondary_storages), snapshot_id, schema_id, total_rows, @@ -492,6 +494,7 @@ bool IcebergMetadata::optimize( snapshots_info, persistent_components, object_storage, + secondary_storages, data_lake_settings, format_settings, sample_block, @@ -1079,7 +1082,7 @@ bool IcebergMetadata::isDataSortedBySortingKey(StorageMetadataPtr storage_metada for (const auto & manifest_list_entry : data_snapshot->manifest_list_entries) { auto files_handle = getManifestFileEntriesHandle( - object_storage, persistent_components, context, log, manifest_list_entry, table_state_snapshot->schema_id); + object_storage, persistent_components, context, log, manifest_list_entry, table_state_snapshot->schema_id, *secondary_storages); if (!files_handle.areAllDataFilesSortedBySortOrderID(sorting_key.sort_order_id.value())) return false; @@ -1110,7 +1113,7 @@ std::optional IcebergMetadata::totalRows(ContextPtr local_context) const for (const auto & manifest_list_entry : actual_data_snapshot->manifest_list_entries) { auto manifest_file_ptr = getManifestFileEntriesHandle( - object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id); + object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id, *secondary_storages); auto data_count = manifest_file_ptr.getRowsCountInAllFilesExcludingDeleted(FileContentType::DATA); auto position_deletes_count = manifest_file_ptr.getRowsCountInAllFilesExcludingDeleted(FileContentType::POSITION_DELETE); if (!data_count.has_value() || !position_deletes_count.has_value()) @@ -1139,7 +1142,7 @@ std::optional IcebergMetadata::totalBytes(ContextPtr local_context) cons for (const auto & manifest_list_entry : actual_data_snapshot->manifest_list_entries) { auto manifest_file_ptr = getManifestFileEntriesHandle( - object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id); + object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id, *secondary_storages); auto count = manifest_file_ptr.getBytesCountInAllDataFilesExcludingDeleted(); if (!count.has_value()) return {}; @@ -1208,7 +1211,8 @@ ObjectIterator IcebergMetadata::iterate( callback, iceberg_table_state, getRelevantDataSnapshotFromTableStateSnapshot(*iceberg_table_state, local_context), - persistent_components); + persistent_components, + secondary_storages); } NamesAndTypesList IcebergMetadata::getTableSchema(ContextPtr local_context) const @@ -1266,7 +1270,7 @@ void IcebergMetadata::addDeleteTransformers( LOG_DEBUG(log, "Constructing filter transform for position delete, there are {} delete objects", iceberg_object_info->info.position_deletes_objects.size()); builder.addSimpleTransform( [&](const SharedHeader & header) - { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, parser_shared_resources, local_context); }); + { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, parser_shared_resources, local_context, persistent_components.path_resolver, secondary_storages); }); } const auto & delete_files = iceberg_object_info->info.equality_deletes_objects; if (!delete_files.empty()) @@ -1277,9 +1281,14 @@ void IcebergMetadata::addDeleteTransformers( { /// get header of delete file Block delete_file_header; - RelativePathWithMetadata delete_file_object(delete_file.file_path); + + auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath( + persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context, + persistent_components.path_resolver); + + RelativePathWithMetadata delete_file_object(resolved_delete_key); { - auto schema_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(delete_file.file_format, *schema_read_buffer, local_context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -1302,7 +1311,7 @@ void IcebergMetadata::addDeleteTransformers( } /// Then we read the content of the delete file. auto mutable_columns_for_set = block_for_set.cloneEmptyColumns(); - std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); CompressionMethod compression_method = chooseCompressionMethod(delete_file.file_path, "auto"); auto delete_format = FormatFactory::instance().getInput( delete_file.file_format, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 085e7586440a..2e28df93e0e6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -31,6 +31,7 @@ #include #include #include +#include namespace DB { @@ -147,6 +148,8 @@ class IcebergMetadata : public IDataLakeMetadata CompressionMethod getCompressionMethod() const { return persistent_components.metadata_compression_method; } + std::string getTableLocation() const override { return persistent_components.table_location; } + bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override; bool supportsDelete() const override { return true; } void mutate( @@ -229,6 +232,7 @@ class IcebergMetadata : public IDataLakeMetadata LoggerPtr log; const ObjectStoragePtr object_storage; + mutable std::shared_ptr secondary_storages; DB::Iceberg::PersistentTableComponents persistent_components; const DataLakeStorageSettings & data_lake_settings; const String write_format; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.cpp index 833d5f179917..71f55cd6add3 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.cpp @@ -1,6 +1,7 @@ #include #include +#include #include namespace DB::ErrorCodes @@ -11,6 +12,11 @@ extern const int BAD_ARGUMENTS; namespace DB::Iceberg { +IcebergPathFromMetadata IcebergPathFromMetadata::makeStorageIdentity(const ObjectStoragePtr & storage, const String & key) +{ + return IcebergPathFromMetadata(storage->getDescription() + '\0' + storage->getObjectsNamespace() + '\0' + key); +} + // This function is used to get the file path inside the directory which corresponds to Iceberg table from the full blob path which is written in manifest and metadata files. // For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro // Common path should end with "" or "/". diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h index 13072990005c..d535f3c672a5 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h @@ -3,6 +3,8 @@ #include #include +#include + namespace DB { class FileNamesGenerator; @@ -33,6 +35,10 @@ class IcebergPathFromMetadata /// Also needed to get the file which corresponds to a line in the Chunk when used for position-delete algorithms. static IcebergPathFromMetadata deserialize(String path_) { return IcebergPathFromMetadata(std::move(path_)); } + /// Identity of the physical object a path resolves to, as the triple (storage description, namespace, key). + /// Lets paths spelled differently (s3:// vs s3a:// vs https) but pointing at the same object compare equal. + static IcebergPathFromMetadata makeStorageIdentity(const ObjectStoragePtr & storage, const String & key); + /// Extract the raw path string for writing into Iceberg metadata files, /// serialization, cache keys, virtual column values, etc. const String & serialize() const { return raw_path; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 85dd14befa65..026e19dfcd04 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp index 6fd4092e5d72..1ebdf7a52305 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -316,8 +317,6 @@ ProcessedManifestFileEntryPtr ManifestFileIterator::processRow(size_t row_index) return nullptr; } - /// Compute inherited/resolved fields - Int64 resolved_snapshot_id; if (parsed_entry->parsed_snapshot_id.has_value()) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index b409e3f061f6..10d75fd4f3e8 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -990,6 +990,7 @@ static void collectRetainedFiles( std::set & retained_data_file_paths, std::set & retained_manifest_list_paths) { + SecondaryStorages secondary_storages; for (UInt32 i = 0; i < retained_snapshots->size(); ++i) { auto snapshot = retained_snapshots->getObject(i); @@ -999,14 +1000,14 @@ static void collectRetainedFiles( auto manifest_list_path = IcebergPathFromMetadata::deserialize(snapshot->getValue(Iceberg::f_manifest_list)); retained_manifest_list_paths.insert(manifest_list_path); - auto manifest_keys = getManifestList(object_storage, persistent_table_components, context, manifest_list_path, log); + auto manifest_keys = getManifestList(object_storage, persistent_table_components, context, manifest_list_path, log, secondary_storages); for (const auto & mf_key : manifest_keys) { retained_manifest_paths.insert(mf_key.manifest_file_path); auto entries_handle = getManifestFileEntriesHandle( object_storage, persistent_table_components, context, log, - mf_key, current_schema_id); + mf_key, current_schema_id, secondary_storages); collectAllFilePaths(entries_handle, retained_data_file_paths); } } @@ -1035,6 +1036,7 @@ static ExpiredFiles collectExpiredFiles( Int32 current_schema_id) { ExpiredFiles result; + SecondaryStorages secondary_storages; std::set seen_expired_manifest_list_paths; std::set seen_expired_manifest_paths; for (const auto & manifest_list_path : expired_manifest_list_paths) @@ -1048,7 +1050,7 @@ static ExpiredFiles collectExpiredFiles( ManifestFileCacheKeys manifest_keys; try { - manifest_keys = getManifestList(object_storage, persistent_table_components, context, manifest_list_path, log); + manifest_keys = getManifestList(object_storage, persistent_table_components, context, manifest_list_path, log, secondary_storages); } catch (...) { @@ -1068,7 +1070,7 @@ static ExpiredFiles collectExpiredFiles( { auto entries_handle = getManifestFileEntriesHandle( object_storage, persistent_table_components, context, log, - mf_key, current_schema_id); + mf_key, current_schema_id, secondary_storages); for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::DATA)) if (!retained_data_file_paths.contains(entry->parsed_entry->file_path_key)) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h index a182bb9aa1b6..8ab46998170b 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB::Iceberg { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp index d60719dc3b4f..dc282ff5e156 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp @@ -68,11 +68,18 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() for (const auto & position_deletes_object : iceberg_object_info->info.position_deletes_objects) { + if (position_deletes_object.reference_data_file_path.has_value() + && position_deletes_object.reference_data_file_path != iceberg_data_path) + { + continue; + } - auto object_path = position_deletes_object.file_path; - auto object_metadata = object_storage->getObjectMetadata(object_path, /*with_tags=*/ false); - auto object_info = RelativePathWithMetadata{object_path, object_metadata}; + auto [delete_storage_to_use, resolved_key] = resolveObjectStorageForPath( + path_resolver.getTableLocation(), position_deletes_object.file_path, object_storage, *secondary_storages, context, + path_resolver); + auto object_metadata = delete_storage_to_use->getObjectMetadata(resolved_key, /*with_tags=*/ false); + RelativePathWithMetadata object_info(resolved_key, object_metadata); String format = position_deletes_object.file_format; if (boost::to_lower_copy(format) != "parquet") @@ -80,7 +87,7 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() Block initial_header; { - std::unique_ptr read_buf_schema = createReadBuffer(object_info, object_storage, context, log); + std::unique_ptr read_buf_schema = createReadBuffer(object_info, delete_storage_to_use, context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(format, *read_buf_schema, context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -91,9 +98,9 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() initial_header = Block(initial_header_data); } - CompressionMethod compression_method = chooseCompressionMethod(object_path, "auto"); + CompressionMethod compression_method = chooseCompressionMethod(resolved_key, "auto"); - delete_read_buffers.push_back(createReadBuffer(object_info, object_storage, context, log)); + delete_read_buffers.push_back(createReadBuffer(object_info, delete_storage_to_use, context, log)); auto syntax_result = TreeRewriter(context).analyze(where_ast, initial_header.getNamesAndTypesList()); ExpressionAnalyzer analyzer(where_ast, syntax_result, context); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h index 0062952ee7c6..53393e5ea08b 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB::Iceberg { @@ -28,7 +29,9 @@ class IcebergPositionDeleteTransform : public ISimpleTransform ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) + ContextPtr context_, + const IcebergPathResolver & path_resolver_, + std::shared_ptr secondary_storages_) : ISimpleTransform(header_, header_, false) , header(header_) , iceberg_object_info(iceberg_object_info_) @@ -36,6 +39,8 @@ class IcebergPositionDeleteTransform : public ISimpleTransform , format_settings(format_settings_) , context(context_) , parser_shared_resources(parser_shared_resources_) + , path_resolver(path_resolver_) + , secondary_storages(std::move(secondary_storages_)) { initializeDeleteSources(); } @@ -56,6 +61,9 @@ class IcebergPositionDeleteTransform : public ISimpleTransform ContextPtr context; FormatParserSharedResourcesPtr parser_shared_resources; + const IcebergPathResolver path_resolver; + std::shared_ptr secondary_storages; + /// We need to keep the read buffers alive since the delete_sources depends on them. std::vector> delete_read_buffers; std::vector> delete_sources; @@ -72,8 +80,10 @@ class IcebergBitmapPositionDeleteTransform : public IcebergPositionDeleteTransfo ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_) + ContextPtr context_, + const IcebergPathResolver & path_resolver_, + std::shared_ptr secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_, path_resolver_, std::move(secondary_storages_)) { initialize(); } @@ -98,8 +108,10 @@ class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTran ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_) + ContextPtr context_, + const IcebergPathResolver & path_resolver_, + std::shared_ptr secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_, path_resolver_, std::move(secondary_storages_)) { initialize(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp index b44c3d86c9e9..483c4ef57dd0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp @@ -70,7 +70,8 @@ Iceberg::ManifestFileCacheableInfo getManifestFile( ContextPtr local_context, LoggerPtr log, const IcebergPathFromMetadata & filename, - size_t bytes_size) + size_t bytes_size, + SecondaryStorages & secondary_storages) { auto log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value; @@ -79,15 +80,19 @@ Iceberg::ManifestFileCacheableInfo getManifestFile( auto create_fn = [&, use_iceberg_metadata_cache]() { - RelativePathWithMetadata manifest_object_info(persistent_table_components.path_resolver.resolve(filename)); + auto [storage_to_use, resolved_key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, filename.serialize(), object_storage, secondary_storages, local_context, + persistent_table_components.path_resolver); + + RelativePathWithMetadata manifest_object_info(resolved_key_in_storage); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (use_iceberg_metadata_cache) read_settings.enable_filesystem_cache = false; - auto buffer = createReadBuffer(manifest_object_info, object_storage, local_context, log, read_settings); - auto manifest_file_deserializer = std::make_unique( + auto buffer = createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings); + auto manifest_file_deserializer = std::make_shared( std::move(buffer), filename, getFormatSettings(local_context)); return Iceberg::ManifestFileCacheableInfo{std::move(manifest_file_deserializer), bytes_size}; @@ -108,7 +113,8 @@ Iceberg::ManifestFileIterator::ManifestFileEntriesHandle getManifestFileEntriesH ContextPtr local_context, LoggerPtr log, const ManifestFileCacheKey & cache_key, - Int32 table_snapshot_schema_id) + Int32 table_snapshot_schema_id, + SecondaryStorages & secondary_storages) { auto cacheable_info = getManifestFile( object_storage, @@ -116,7 +122,8 @@ Iceberg::ManifestFileIterator::ManifestFileEntriesHandle getManifestFileEntriesH local_context, log, cache_key.manifest_file_path, - static_cast(cache_key.manifest_file_byte_size)); + cache_key.manifest_file_byte_size, + secondary_storages); auto iterator = Iceberg::ManifestFileIterator::create( cacheable_info.deserializer, @@ -142,7 +149,8 @@ ManifestFileCacheKeys getManifestList( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, const IcebergPathFromMetadata & filename, - LoggerPtr log) + LoggerPtr log, + SecondaryStorages & secondary_storages) { IcebergMetadataLogLevel log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value; @@ -151,14 +159,18 @@ ManifestFileCacheKeys getManifestList( auto create_fn = [&, use_iceberg_metadata_cache]() { - RelativePathWithMetadata object_info(persistent_table_components.path_resolver.resolve(filename)); + auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, filename.serialize(), object_storage, secondary_storages, local_context, + persistent_table_components.path_resolver); + + RelativePathWithMetadata object_info(key_in_storage); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (use_iceberg_metadata_cache) read_settings.enable_filesystem_cache = false; - auto manifest_list_buf = createReadBuffer(object_info, object_storage, local_context, log, read_settings); + auto manifest_list_buf = createReadBuffer(object_info, storage_to_use, local_context, log, read_settings); AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(local_context)); ManifestFileCacheKeys manifest_file_cache_keys; @@ -168,7 +180,7 @@ ManifestFileCacheKeys getManifestList( local_context, dump_metadata, DB::IcebergMetadataLogLevel::ManifestListMetadata, - persistent_table_components.path_resolver.getTableRoot(), + persistent_table_components.table_path, filename, std::nullopt, std::nullopt); @@ -205,14 +217,14 @@ ManifestFileCacheKeys getManifestList( manifest_list_deserializer.getValueFromRowByName(i, f_content, TypeIndex::Int32).safeGet()); } manifest_file_cache_keys.emplace_back( - manifest_file_name, manifest_length, added_sequence_number, added_snapshot_id.safeGet(), content_type); + manifest_file_name, static_cast(manifest_length), added_sequence_number, added_snapshot_id.safeGet(), content_type); auto dump_row_metadata = [&]()->String { return manifest_list_deserializer.getContent(i); }; insertRowToLogTable( local_context, dump_row_metadata, DB::IcebergMetadataLogLevel::ManifestListEntry, - persistent_table_components.path_resolver.getTableRoot(), + persistent_table_components.table_path, filename, i, std::nullopt); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h index 4949de8d33e1..035a22ea3f3a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h @@ -19,6 +19,7 @@ #include #include +#include namespace DB::Iceberg { @@ -29,7 +30,8 @@ Iceberg::ManifestFileCacheableInfo getManifestFile( ContextPtr local_context, LoggerPtr log, const IcebergPathFromMetadata & filename, - size_t bytes_size); + size_t bytes_size, + SecondaryStorages & secondary_storages); /// Creates a fully initialized ManifestFileIterator from a cache key. /// All entries are drained so that aggregate methods (e.g. getRowsCountInAllFilesExcludingDeleted) @@ -40,7 +42,8 @@ Iceberg::ManifestFileIterator::ManifestFileEntriesHandle getManifestFileEntriesH ContextPtr local_context, LoggerPtr log, const ManifestFileCacheKey & cache_key, - Int32 table_snapshot_schema_id); + Int32 table_snapshot_schema_id, + SecondaryStorages & secondary_storages); ManifestFileCacheKeys getManifestList( @@ -48,7 +51,8 @@ ManifestFileCacheKeys getManifestList( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, const IcebergPathFromMetadata & filename, - LoggerPtr log); + LoggerPtr log, + SecondaryStorages & secondary_storages); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index ef377b6e1e6c..1e2afa3ee106 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -39,12 +39,13 @@ #include #include #include +#include #if USE_AVRO #include -#include #include +#include #include #include #include @@ -106,7 +107,6 @@ static constexpr auto MAX_TRANSACTION_RETRIES = 100; namespace DB::Iceberg { - using namespace DB; static CompressionMethod getCompressionMethodFromMetadataFile(const String & path) { @@ -1553,3 +1553,29 @@ void sortBlockByKeyDescription(Block & block, const KeyDescription & sort_descri } #endif + +namespace DB +{ + +ObjectStoragePtr getResolvedStorageFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info, const ObjectStoragePtr & default_storage) +{ +#if USE_AVRO + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + { + if (auto resolved = iceberg_info->getResolvedStorage()) + return resolved; + } +#endif + return default_storage; +} + +std::optional getMetadataPathFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info) +{ +#if USE_AVRO + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + return iceberg_info->getMetadataPath(); +#endif + return std::nullopt; +} + +} diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 8f00f9a1707f..2720e51547e0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -1,6 +1,8 @@ #pragma once #include +#include +#include "config.h" #include #include #include @@ -13,9 +15,20 @@ #include #include +#include + +namespace DB +{ +struct ObjectInfo; +using ObjectInfoPtr = std::shared_ptr; + +/// These functions are always available; they return fallback values when USE_AVRO is not defined +ObjectStoragePtr getResolvedStorageFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info, const ObjectStoragePtr & default_storage); +std::optional getMetadataPathFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info); +} + #if USE_AVRO -#include #include #include #include diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 30650b9132f2..332c4bcd09cf 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #if ENABLE_DISTRIBUTED_CACHE #include #include @@ -203,11 +204,17 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( { const bool expect_whole_archive = !local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; + /// Use the full table location URI (e.g. `s3a://bucket/prefix/table/`) when available + std::string table_location = configuration->getPathForRead().path; + if (auto * metadata = configuration->getExternalMetadata()) + table_location = metadata->getTableLocation(); + auto distributed_iterator = std::make_unique( local_context->getClusterFunctionReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads], /*is_archive_=*/is_archive && !expect_whole_archive, object_storage, + table_location, local_context); if (is_archive && expect_whole_archive) @@ -452,6 +459,8 @@ Chunk StorageObjectStorageSource::generate() path); } + std::string path_for_virtual_column = getMetadataPathFromObjectInfo(object_info).value_or(path); + const String * iceberg_metadata_file_path = nullptr; #if USE_AVRO if (const auto * iceberg_info = dynamic_cast(object_info.get())) @@ -462,7 +471,7 @@ Chunk StorageObjectStorageSource::generate() chunk, read_from_format_info.requested_virtual_columns, { - .path = path, + .path = path_for_virtual_column, .size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_metadata->size_bytes, .filename = &filename, .last_modified = object_metadata->last_modified, @@ -722,16 +731,18 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade bool with_tags = read_from_format_info.requested_virtual_columns.contains("_tags"); const auto & path = object_info->isArchive() ? object_info->getPathToArchive() : object_info->getPath(); + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(object_info, object_storage); + if (query_settings.ignore_non_existent_file) { - auto metadata = object_storage->tryGetObjectMetadata(path, with_tags); + auto metadata = storage_to_use->tryGetObjectMetadata(path, with_tags); if (!metadata) return {}; object_info->setObjectMetadata(metadata.value()); } else - object_info->setObjectMetadata(object_storage->getObjectMetadata(path, with_tags)); + object_info->setObjectMetadata(storage_to_use->getObjectMetadata(path, with_tags)); } if (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0 @@ -978,7 +989,11 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else { compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->getCompressionMethod()); - read_buf = createReadBuffer(object_info->relative_path_with_metadata, object_storage, context_, log); + read_buf = createReadBuffer( + object_info->relative_path_with_metadata, + getResolvedStorageFromObjectInfo(object_info, object_storage), + context_, + log); } Block initial_header = read_from_format_info.format_header; @@ -1639,11 +1654,13 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( size_t max_threads_count, bool is_archive_, ObjectStoragePtr object_storage_, + const std::string & table_location_, ContextPtr context_) : WithContext(context_) , callback(callback_) , is_archive(is_archive_) , object_storage(object_storage_) + , table_location(table_location_) { if (!getContext()->isSwarmModeEnabled()) { @@ -1675,10 +1692,37 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( { auto object = object_future.get(); if (object) + { + resolveIcebergObjectStorageIfNeeded(object); buffer.push_back(object); + } } } +void StorageObjectStorageSource::ReadTaskIterator::resolveIcebergObjectStorageIfNeeded([[maybe_unused]] const ObjectInfoPtr & object) +{ +#if USE_AVRO + /// For Iceberg objects, resolve the storage from the raw metadata path + auto iceberg_info = std::dynamic_pointer_cast(object); + if (!iceberg_info || iceberg_info->getResolvedStorage()) + return; + + auto metadata_path = iceberg_info->getMetadataPath(); + if (!metadata_path) + return; + + /// Only secondary-storage files need resolving here (an ObjectStorage can't be shipped over the + /// wire); base-storage files keep the coordinator's key. + if (auto resolved = tryResolveObjectStorageForPath( + table_location, *metadata_path, object_storage, secondary_storages, getContext()); + resolved && resolved->first != object_storage) + { + iceberg_info->setResolvedStorage(resolved->first); + iceberg_info->relative_path_with_metadata.relative_path = resolved->second; + } +#endif +} + ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator::next(size_t) { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); @@ -1692,10 +1736,12 @@ ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator::next(size_t) return nullptr; } - auto task = callback(); - if (!task || task->isEmpty()) + auto raw = callback(); + if (!raw || raw->isEmpty()) return nullptr; - object_info = task->getObjectInfo(); + + object_info = raw->getObjectInfo(); + resolveIcebergObjectStorageIfNeeded(object_info); } else { @@ -1790,7 +1836,10 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o /* path_to_archive */ object_info->getPath(), /* archive_read_function */ [=, this]() - { return createReadBuffer(object_info->relative_path_with_metadata, object_storage, getContext(), log); }, + { + auto storage = getResolvedStorageFromObjectInfo(object_info, object_storage); + return createReadBuffer(object_info->relative_path_with_metadata, storage, getContext(), log); + }, /* archive_size */ size); } @@ -1812,7 +1861,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor } if (!archive_object->getObjectMetadata()) - archive_object->setObjectMetadata(object_storage->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + { + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(archive_object, object_storage); + archive_object->setObjectMetadata(storage_to_use->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + } archive_reader = createArchiveReader(archive_object); file_enumerator = archive_reader->firstFile(); @@ -1838,7 +1890,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor return {}; if (!archive_object->getObjectMetadata()) - archive_object->setObjectMetadata(object_storage->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + { + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(archive_object, object_storage); + archive_object->setObjectMetadata(storage_to_use->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + } archive_reader = createArchiveReader(archive_object); if (!archive_reader->fileExists(path_in_archive)) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 6326fa6ee89f..d30329acce01 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -170,6 +171,7 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri size_t max_threads_count, bool is_archive_, ObjectStoragePtr object_storage_, + const std::string & table_location_, ContextPtr context_); ObjectInfoPtr next(size_t) override; @@ -179,11 +181,19 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri private: ObjectInfoPtr createObjectInfoInArchive(const std::string & path_to_archive, const std::string & path_in_archive); + /// For Iceberg objects: resolve which storage the file lives in (possibly a secondary storage) + /// from the raw metadata path and record it on the object. No-op for non-Iceberg objects. + void resolveIcebergObjectStorageIfNeeded(const ObjectInfoPtr & object); + ClusterFunctionReadTaskCallback callback; ObjectInfos buffer; std::atomic_size_t index = 0; bool is_archive; ObjectStoragePtr object_storage; + std::string table_location; +#if USE_AVRO + SecondaryStorages secondary_storages; /// For Iceberg: cache of storages for external file locations +#endif /// path_to_archive -> archive reader. std::unordered_map> archive_readers; std::mutex archive_readers_mutex; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 06af26c0ba98..13dc50e97e91 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -342,7 +344,9 @@ String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPt } return file_identifier; } - return file_object->getIdentifier(); + /// Prefer the original Iceberg metadata path (possibly absolute / on another storage) when available, + /// so the same file is identified consistently across replicas. + return getMetadataPathFromObjectInfo(file_object).value_or(file_object->getIdentifier()); } } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index 542329f76944..ab0315c46eb6 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -14,6 +14,26 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#if USE_AWS_S3 +#include +#endif +#if USE_AVRO +#include +#endif +#if USE_AZURE_BLOB_STORAGE +#include +#endif +#if USE_HDFS +#include +#endif + namespace DB { @@ -23,6 +43,138 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int LOGICAL_ERROR; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int PATH_ACCESS_DENIED; +} + +namespace +{ + +#if USE_AVRO +std::string normalizeScheme(const std::string & scheme) +{ + auto scheme_lowercase = Poco::toLower(scheme); + + if (scheme_lowercase == "s3a" || scheme_lowercase == "s3n" || scheme_lowercase == "gs" || scheme_lowercase == "gcs" || scheme_lowercase == "oss") + scheme_lowercase = "s3"; + else if (scheme_lowercase == "wasb" || scheme_lowercase == "wasbs" || scheme_lowercase == "abfss") + scheme_lowercase = "abfs"; + + return scheme_lowercase; +} + +std::string factoryTypeForScheme(const std::string & normalized_scheme) +{ + if (normalized_scheme == "s3") return "s3"; + if (normalized_scheme == "abfs") return "azure"; + if (normalized_scheme == "hdfs") return "hdfs"; + if (normalized_scheme == "file") return "local"; + return ""; +} + +#if USE_AWS_S3 +/// For s3:// URIs (generic), bucket needs to match. +/// For explicit http(s):// URIs, both bucket and endpoint must match. +bool s3URIMatches(const S3::URI & target_uri, const std::string & base_bucket, const std::string & base_endpoint, const std::string & target_scheme_normalized) +{ + bool bucket_matches = (target_uri.bucket == base_bucket); + bool endpoint_matches = (target_uri.endpoint == base_endpoint); + bool is_generic_s3_uri = (target_scheme_normalized == "s3"); + return bucket_matches && (endpoint_matches || is_generic_s3_uri); +} + +bool sameEndpoint(const std::string & a, const std::string & b) +{ + SchemeAuthorityKey pa(a); + SchemeAuthorityKey pb(b); + if (pa.authority.empty() || pb.authority.empty()) + return false; + return pa.scheme == pb.scheme && pa.authority == pb.authority; +} +#endif +std::pair getOrCreateStorageAndKey( + const std::string & cache_key, + const std::string & key_to_use, + const std::string & storage_type, + SecondaryStorages & secondary_storages, + const ContextPtr & context, + std::function configure_fn) +{ + std::lock_guard lock(secondary_storages.mutex); + if (auto it = secondary_storages.storages.find(cache_key); it != secondary_storages.storages.end()) + return {it->second, key_to_use}; + + Poco::AutoPtr cfg(new Poco::Util::MapConfiguration); + const std::string config_prefix = "object_storages." + cache_key; + + cfg->setString(config_prefix + ".object_storage_type", storage_type); + + configure_fn(*cfg, config_prefix); + + /// Create under lock to avoid duplicate creation and wasted work + ObjectStoragePtr storage = ObjectStorageFactory::instance().create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); + + secondary_storages.storages.emplace(cache_key, storage); + return {storage, key_to_use}; +} + +bool isAbsolutePath(const std::string & path) +{ + if (!path.empty() && (path.front() == '/' || path.find("://") != std::string_view::npos)) + return true; + + return false; +} + +#endif // USE_AVRO + +} + +SchemeAuthorityKey::SchemeAuthorityKey(const std::string & uri) +{ + if (uri.empty()) + return; + + if (auto scheme_sep = uri.find("://"); scheme_sep != std::string_view::npos) + { + scheme = Poco::toLower(uri.substr(0, scheme_sep)); + auto rest = uri.substr(scheme_sep + 3); // skip :// + + // authority is up to next '/' + auto slash = rest.find('/'); + if (slash == std::string_view::npos) + { + /// Bad URI: missing path component after authority. + /// Exception will be thrown when looking up non-existing object in the storage, so we can just return here. + authority = std::string(rest); + key = "/"; + return; + } + authority = std::string(rest.substr(0, slash)); + /// For file:// URIs, the path is absolute, so we need to keep the leading '/' + /// e.g. file:///home/user/data -> scheme="file", authority="", key="/home/user/data" + if (scheme == "file") + key = std::string(rest.substr(slash)); + else + key = std::string(rest.substr(++slash)); + return; + } + + /// Check for scheme:/path (common for file: https://datatracker.ietf.org/doc/html/rfc8089#appendix-B) + if (auto colon = uri.find(':'); colon != std::string_view::npos && colon > 0) + { + auto after_colon = uri.substr(colon + 1); + + if (!after_colon.empty() && after_colon[0] == '/') + { + scheme = Poco::toLower(uri.substr(0, colon)); + authority = ""; // No authority + key = std::string(after_colon); + return; + } + } + + // Relative path (paths starting with '/' without a scheme are now handled by the caller) + key = std::string(uri); } std::optional checkAndGetNewFileOnInsertIfNeeded( @@ -256,5 +408,377 @@ extern const SettingsUInt64 max_download_buffer_size; extern const SettingsBool use_cache_for_count_from_files; extern const SettingsString filesystem_cache_name; extern const SettingsUInt64 filesystem_cache_boundary_alignment; +extern const SettingsBool s3_propagate_credentials_to_other_storages; +} + +#if USE_AVRO +/// Resolve an absolute metadata path directly to its (object storage, key) by parsing the URI. +/// The storage may be `base_storage` or a secondary one. Returns std::nullopt for paths that must +/// instead go through `path_resolver`: relative paths and bare local-fs absolute base paths. +std::optional> tryResolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context) +{ + if (!isAbsolutePath(path)) + return std::nullopt; // Relative path always belongs to base storage + + auto ensure_local_path_inside_user_files = [&](const std::string & local_path) + { + const auto target_path = std::filesystem::path(local_path).lexically_normal(); + const auto user_files_path = std::filesystem::path(context->getUserFilesPath()).lexically_normal(); + + if (user_files_path.empty() || !fileOrSymlinkPathStartsWith(target_path.string(), user_files_path.string())) + throw DB::Exception( + DB::ErrorCodes::PATH_ACCESS_DENIED, + "File URI '{}' is outside of allowed `user_files` path '{}'", + local_path, + user_files_path.string()); + }; + + SchemeAuthorityKey table_location_decomposed{table_location}; + SchemeAuthorityKey target_decomposed{path}; + + if (target_decomposed.scheme.empty() && target_decomposed.key.starts_with('/')) + { + if (base_storage->getType() == ObjectStorageType::Local) + ensure_local_path_inside_user_files(target_decomposed.key); + + return std::nullopt; + } + + const std::string base_scheme_normalized = normalizeScheme(table_location_decomposed.scheme); + const std::string target_scheme_normalized = normalizeScheme(target_decomposed.scheme); + + /// `file://` paths must stay inside `user_files`. + /// Without this check, metadata could drive reads from arbitrary local paths. + if (target_scheme_normalized == "file") + { + ensure_local_path_inside_user_files(target_decomposed.key); + } + + // For S3 URIs, use S3::URI to properly handle all kinds of URIs, e.g. https://s3.amazonaws.com/bucket/... == s3://bucket/... + #if USE_AWS_S3 + if (target_scheme_normalized == "s3" || target_scheme_normalized == "https" || target_scheme_normalized == "http") + { + std::string normalized_path = path; + if (target_decomposed.scheme == "s3a" || target_decomposed.scheme == "s3n" || target_decomposed.scheme == "oss") + { + normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key; + } + else if (target_decomposed.scheme == "gcs") + { + normalized_path = "gs://" + target_decomposed.authority + "/" + target_decomposed.key; + } + /// Paths from metadata already have correct encoding; disable Poco::URI + /// percent-decoding so that keys like `col=12%3A00%3A00` are preserved as-is. + S3::URI s3_uri(normalized_path, /*allow_archive_path_syntax*/ false, + /*keep_presigned_query_parameters*/ true, + /*enable_url_encoding*/ false); + + std::string key_to_use = s3_uri.key; + + bool use_base_storage = false; + if (base_storage->getType() == ObjectStorageType::S3) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_bucket = s3_storage->getObjectsNamespace(); + const std::string base_endpoint = s3_storage->getDescription(); + + if (s3URIMatches(s3_uri, base_bucket, base_endpoint, target_scheme_normalized)) + use_base_storage = true; + } + } + + if (!use_base_storage && (base_scheme_normalized == "s3" || base_scheme_normalized == "https" || base_scheme_normalized == "http")) + { + std::string normalized_table_location = table_location; + if (table_location_decomposed.scheme == "s3a" || table_location_decomposed.scheme == "s3n" || table_location_decomposed.scheme == "oss") + { + normalized_table_location = "s3://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; + } + else if (table_location_decomposed.scheme == "gcs") + { + normalized_table_location = "gs://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; + } + S3::URI base_s3_uri(normalized_table_location, /*allow_archive_path_syntax*/ false, + /*keep_presigned_query_parameters*/ true, + /*enable_url_encoding*/ false); + + if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized)) + use_base_storage = true; + } + + if (use_base_storage) + return std::make_pair(base_storage, key_to_use); + + /// Construct the endpoint for this storage, then build the cache key from it. + /// A generic `s3://bucket/...` inherits one from the base storage. + const bool endpoint_explicit = (target_decomposed.scheme == "http" || target_decomposed.scheme == "https"); + + std::string endpoint_to_use; + + if (endpoint_explicit) + { + endpoint_to_use = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + } + else + { + std::string base_endpoint; + if (base_storage->getType() == ObjectStorageType::S3) + base_endpoint = base_storage->getDescription(); + + if (!base_endpoint.empty()) + { + if (base_endpoint.find(".s3.") != std::string::npos && base_endpoint.find(".amazonaws.com") != std::string::npos) + { + /// AWS-style: https://oldbucket.s3.us-east-1.amazonaws.com -> https://newbucket.s3.us-east-1.amazonaws.com + size_t s3_pos = base_endpoint.find(".s3."); + size_t scheme_end = base_endpoint.find("://"); + if (scheme_end != std::string::npos) + { + std::string scheme = base_endpoint.substr(0, scheme_end + 3); + std::string suffix = base_endpoint.substr(s3_pos); + + /// Trim path after endpoint + size_t slash_pos = suffix.find('/', 1); + if (slash_pos != std::string::npos) + suffix = suffix.substr(0, slash_pos); + endpoint_to_use = scheme + s3_uri.bucket + suffix; + } + } + else + { + /// Path-style (e.g. minio): http://host:port/oldbucket -> http://host:port/newbucket + size_t scheme_end = base_endpoint.find("://"); + if (scheme_end != std::string::npos) + { + size_t path_start = base_endpoint.find('/', scheme_end + 3); + if (path_start != std::string::npos) + base_endpoint = base_endpoint.substr(0, path_start); + } + if (!base_endpoint.empty() && base_endpoint.back() == '/') + base_endpoint.pop_back(); + endpoint_to_use = base_endpoint + "/" + s3_uri.bucket; + } + } + + /// Fallback: base storage is not S3 + if (endpoint_to_use.empty()) + { + endpoint_to_use = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + } + } + + /// Include credential-propagation flag in the cache key: `configure_fn` runs only on miss, + /// so different per-query values of `s3_propagate_credentials_to_other_storages` must not share an entry. + const bool propagate_creds = context->getSettingsRef()[Setting::s3_propagate_credentials_to_other_storages]; + const std::string storage_cache_key = "s3://" + s3_uri.bucket + "@" + endpoint_to_use + + "#propagate=" + (propagate_creds ? "1" : "0"); + + return getOrCreateStorageAndKey( + storage_cache_key, + key_to_use, + "s3", + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + cfg.setString(config_prefix + ".endpoint", endpoint_to_use); + + /// Copy credentials from base storage when the endpoint is the same or + /// `s3_propagate_credentials_to_other_storages` is enabled. + if (base_storage->getType() == ObjectStorageType::S3 + && (context->getSettingsRef()[Setting::s3_propagate_credentials_to_other_storages] + || sameEndpoint(base_storage->getDescription(), endpoint_to_use))) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + if (auto s3_client = s3_storage->tryGetS3StorageClient()) + { + const auto credentials = s3_client->getCredentials(); + const String & access_key_id = credentials.GetAWSAccessKeyId(); + const String & secret_access_key = credentials.GetAWSSecretKey(); + const String & session_token = credentials.GetSessionToken(); + const String & region = s3_client->getRegion(); + + if (!access_key_id.empty()) + cfg.setString(config_prefix + ".access_key_id", access_key_id); + if (!secret_access_key.empty()) + cfg.setString(config_prefix + ".secret_access_key", secret_access_key); + if (!session_token.empty()) + cfg.setString(config_prefix + ".session_token", session_token); + if (!region.empty()) + cfg.setString(config_prefix + ".region", region); + } + } + } + }); + } + #endif + + #if USE_HDFS + if (target_scheme_normalized == "hdfs") + { + bool use_base_storage = false; + + // Check if base_storage matches (only if it's HDFS) + if (base_storage->getType() == ObjectStorageType::HDFS) + { + if (auto hdfs_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_url = hdfs_storage->getDescription(); + // Extract endpoint from base URL (hdfs://namenode:port/path -> hdfs://namenode:port) + std::string base_endpoint; + if (auto pos = base_url.find('/', base_url.find("//") + 2); pos != std::string::npos) + base_endpoint = base_url.substr(0, pos); + else + base_endpoint = base_url; + + // For HDFS, compare endpoints (namenode addresses) + std::string target_endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + + if (base_endpoint == target_endpoint) + use_base_storage = true; + + // Also check if table_location matches + if (!use_base_storage && base_scheme_normalized == "hdfs") + { + if (table_location_decomposed.authority == target_decomposed.authority) + use_base_storage = true; + } + } + } + + if (use_base_storage) + return std::make_pair(base_storage, target_decomposed.key); + } + #endif + + /// Fallback for schemes not handled above (e.g., abfs, file) + if (base_scheme_normalized == target_scheme_normalized && table_location_decomposed.authority == target_decomposed.authority) + return std::make_pair(base_storage, target_decomposed.key); + + const std::string type_for_factory = factoryTypeForScheme(target_scheme_normalized); + if (type_for_factory.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported storage scheme '{}' in path '{}'", target_scheme_normalized, path); + + /// For `file://` URIs the authority is always empty, so using just `"file://"` as the + /// cache key would cause every directory to share a single `LocalObjectStorage` instance + /// whose root (`key_prefix`) is set to the parent directory of the first file ever seen. + /// To avoid this, include the parent directory of the target file in the cache key so that + /// each directory gets its own storage instance with the correct root. + std::string file_dir_path; // only set for file:// URIs + std::string cache_key; + if (target_scheme_normalized == "file") + { + std::filesystem::path fs_path(target_decomposed.key); + file_dir_path = fs_path.parent_path().string(); + if (file_dir_path.empty() || file_dir_path == "/") + file_dir_path = "/"; + else if (file_dir_path.back() != '/') + file_dir_path += '/'; + cache_key = "file://" + file_dir_path; + } + else + { + cache_key = target_scheme_normalized + "://" + target_decomposed.authority; + } + + /// Handle storage types that need new storage creation + return getOrCreateStorageAndKey( + cache_key, + target_decomposed.key, + type_for_factory, + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + if (target_scheme_normalized == "file") + { + cfg.setString(config_prefix + ".path", file_dir_path); + } + else if (target_scheme_normalized == "abfs") + { + std::string container_name; + std::string account_name; + const auto & authority = target_decomposed.authority; + + auto at_pos = authority.find('@'); + if (at_pos != std::string::npos) + { + container_name = authority.substr(0, at_pos); + account_name = authority.substr(at_pos + 1); + /// Remove .dfs.core.windows.net suffix if present + auto suffix_pos = account_name.find('.'); + if (suffix_pos != std::string::npos) + account_name = account_name.substr(0, suffix_pos); + } + else + container_name = authority; + + cfg.setString(config_prefix + ".container_name", container_name); + if (!account_name.empty()) + cfg.setString(config_prefix + ".account_name", account_name); + +#if USE_AZURE_BLOB_STORAGE + /// Copy credentials from base Azure storage if available + if (base_storage->getType() == ObjectStorageType::Azure) + { + if (auto azure_storage = std::dynamic_pointer_cast(base_storage)) + { + const auto & conn_params = azure_storage->getConnectionParameters(); + const auto & auth_method = azure_storage->getAzureBlobStorageAuthMethod(); + + if (std::holds_alternative(auth_method)) + { + cfg.setString(config_prefix + ".connection_string", + std::get(auth_method).toUnderType()); + } + else + { + const auto & endpoint = conn_params.endpoint; + if (!endpoint.storage_account_url.empty()) + cfg.setString(config_prefix + ".storage_account_url", endpoint.storage_account_url); + if (account_name.empty() && !endpoint.account_name.empty()) + cfg.setString(config_prefix + ".account_name", endpoint.account_name); + } + } + } +#endif + } + else if (target_scheme_normalized == "hdfs") + { + // HDFS endpoint must end with '/' + auto endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + if (!endpoint.empty() && endpoint.back() != '/') + endpoint.push_back('/'); + cfg.setString(config_prefix + ".endpoint", endpoint); + } + }); +} + +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context, + const Iceberg::IcebergPathResolver & path_resolver) +{ + if (auto resolved = tryResolveObjectStorageForPath(table_location, path, base_storage, secondary_storages, context)) + return *resolved; + /// Relative paths only: map via path_resolver (table_location -> table_root translation). + return {base_storage, path_resolver.resolve(Iceberg::IcebergPathFromMetadata::deserialize(path))}; } + +#endif + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 5cc48a5d581d..d0ecf59fe747 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -2,11 +2,38 @@ #include #include +#include +#include +#include + namespace DB { class IObjectStorage; +#if USE_AVRO +/// Thread-safe wrapper for secondary object storages map +/// (now only used for Iceberg) +struct SecondaryStorages +{ + mutable std::mutex mutex; + std::map storages; +}; +#endif + +// A URI split into components +// s3://bucket/a/b -> scheme="s3", authority="bucket", path="/a/b" +// file:///var/x -> scheme="file", authority="", path="/var/x" +// /abs/p -> scheme="", authority="", path="/abs/p" +struct SchemeAuthorityKey +{ + explicit SchemeAuthorityKey(const std::string & uri); + + std::string scheme; + std::string authority; + std::string key; +}; + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, @@ -62,5 +89,28 @@ struct ParseFromDiskResult ParseFromDiskResult parseFromDisk(ASTs args, bool with_structure, ContextPtr context, const fs::path & prefix); +#if USE_AVRO +namespace Iceberg { class IcebergPathResolver; } + +/// Resolve an absolute metadata path directly to its (object storage, key) by parsing the URI. +/// The storage may be `base_storage` or a secondary one. Returns std::nullopt for paths that must +/// instead go through `path_resolver`: relative paths and bare local-fs absolute base paths. +std::optional> tryResolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context); + +/// Resolve a metadata path to (object storage, key) for reading. Absolute paths resolve directly via +/// `tryResolveObjectStorageForPath`; relative paths are mapped via `path_resolver`. +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context, + const Iceberg::IcebergPathResolver & path_resolver); +#endif } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 81148b1514cb..9f51fdb2efea 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -812,10 +812,10 @@ std::function IStorageURLBase::getReadPOSTDataCallback( namespace { - class ReadBufferIterator : public IReadBufferIterator, WithContext + class StorageURLReadBufferIterator : public IReadBufferIterator, WithContext { public: - ReadBufferIterator( + StorageURLReadBufferIterator( const std::vector & urls_to_check_, std::optional format_, const CompressionMethod & compression_method_, @@ -1045,7 +1045,7 @@ std::pair IStorageURLBase::getTableStructureAndForma else urls_to_check = {uri}; - ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); + StorageURLReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); if (format) return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); diff --git a/tests/integration/test_storage_iceberg_multistorage/__init__.py b/tests/integration/test_storage_iceberg_multistorage/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml new file mode 100644 index 000000000000..54c08b27abe8 --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml @@ -0,0 +1,20 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml new file mode 100644 index 000000000000..516e4ba63a3a --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml @@ -0,0 +1,15 @@ + + + + http://minio1:9001/root/ + minio + ClickHouse_Minio_P@ssw0rd + + + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml new file mode 100644 index 000000000000..a63e91f41fbc --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml @@ -0,0 +1,6 @@ + + + system + query_log
+
+
diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml b/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml new file mode 100644 index 000000000000..4b6ba057ecb1 --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml @@ -0,0 +1,9 @@ + + + + + default + 1 + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/test.py b/tests/integration/test_storage_iceberg_multistorage/test.py new file mode 100644 index 000000000000..5b477055b2ab --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/test.py @@ -0,0 +1,426 @@ +import pytest +import pyspark +import os +import shutil +import tempfile +import json +import avro.datafile +import avro.io + +from helpers.cluster import ClickHouseCluster +from helpers.s3_tools import ( + LocalUploader, + S3Uploader, + AzureUploader, + LocalDownloader, + S3Downloader, + prepare_s3_bucket, +) +from helpers.iceberg_utils import ( + get_uuid_str, + default_upload_directory, + default_download_directory, +) + +def get_spark(): + builder = ( + pyspark.sql.SparkSession.builder.appName("test_storage_iceberg_multistorage") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.iceberg.spark.SparkSessionCatalog", + ) + .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", "/var/lib/clickhouse/user_files/iceberg_data") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + ) + .master("local") + ) + return builder.getOrCreate() + + +@pytest.fixture(scope="package") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__, with_spark=True) + cluster.add_instance( + "node1", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + ], + user_configs=["configs/users.d/users.xml"], + with_minio=True, + with_azurite=True, + stay_alive=True, + ) + + cluster.start() + + prepare_s3_bucket(cluster) + + cluster.spark_session = get_spark() + + cluster.default_s3_uploader = S3Uploader(cluster.minio_client, cluster.minio_bucket) + cluster.default_s3_downloader = S3Downloader(cluster.minio_client, cluster.minio_bucket) + + cluster.azure_container_name = "mycontainer" + cluster.blob_service_client.create_container(cluster.azure_container_name) + cluster.default_azure_uploader = AzureUploader(cluster.blob_service_client, cluster.azure_container_name) + + cluster.default_local_uploader = LocalUploader(cluster.instances["node1"]) + cluster.default_local_downloader = LocalDownloader(cluster.instances["node1"]) + + # Create extra S3 buckets for test_four_different_locations + for i in range(1, 4): + bucket_name = f"{cluster.minio_bucket}-storage{i}" + if not cluster.minio_client.bucket_exists(bucket_name): + cluster.minio_client.make_bucket(bucket_name) + + yield cluster + + finally: + cluster.shutdown() + + +def modify_avro_file(avro_path: str, field_path: list, modifier_func) -> None: + """ + Modify a field in an AVRO file, preserving the rest of it as is. + + field_path: list of keys to navigate to the field + modifier_func: function that takes old value and returns new value + """ + with open(avro_path, 'rb') as f: + reader = avro.datafile.DataFileReader(f, avro.io.DatumReader()) + schema = reader.datum_reader.writers_schema + # Preserve all file metadata (partition-spec, format-version, etc.) + metadata = dict(reader.meta) + records = list(reader) + reader.close() + + for record in records: + obj = record + for key in field_path[:-1]: + if obj is None or key not in obj: + break + obj = obj[key] + else: + if obj and field_path[-1] in obj: + obj[field_path[-1]] = modifier_func(obj[field_path[-1]]) + + with open(avro_path, 'wb') as f: + writer = avro.datafile.DataFileWriter(f, avro.io.DatumWriter(), schema) + for key, value in metadata.items(): + if not key.startswith('avro.'): + writer.set_meta(key, value) + for record in records: + writer.append(record) + writer.close() + + +def get_absolute_path(storage_type: str, cluster, relative_path: str) -> str: + """Convert relative path to absolute path for given storage type.""" + relative_path = relative_path.lstrip("/") + + if storage_type == "s3": + return f"s3a://{cluster.minio_bucket}/{relative_path}" + elif storage_type.startswith("s3:"): # s3:bucket_name format + bucket = storage_type.split(":")[1] + return f"s3a://{bucket}/{relative_path}" + elif storage_type == "azure": + return f"abfs://{cluster.azure_container_name}@{cluster.azurite_account}/{relative_path}" + elif storage_type.startswith("azure:"): # azure:container_name format + container = storage_type.split(":")[1] + return f"abfs://{container}@{cluster.azurite_account}/{relative_path}" + elif storage_type == "local": + return f"file:///{relative_path}" + else: + raise ValueError(f"Unknown storage type: {storage_type}") + + +def get_uploader(storage_type: str, cluster): + if storage_type == "s3": + return cluster.default_s3_uploader + elif storage_type.startswith("s3:"): + bucket = storage_type.split(":")[1] + return S3Uploader(cluster.minio_client, bucket) + elif storage_type == "azure": + return cluster.default_azure_uploader + elif storage_type.startswith("azure:"): + container = storage_type.split(":")[1] + return AzureUploader(cluster.blob_service_client, container) + elif storage_type == "local": + return cluster.default_local_uploader + else: + raise ValueError(f"Unknown storage type: {storage_type}") + + +def get_table_function(metadata_storage: str): + if metadata_storage == "s3" or metadata_storage.startswith("s3:"): + return "icebergS3" + elif metadata_storage == "azure" or metadata_storage.startswith("azure:"): + return "icebergAzure" + elif metadata_storage == "local": + return "icebergLocal" + else: + raise ValueError(f"Unknown storage type: {metadata_storage}") + + +def get_query_args(metadata_storage: str, cluster, table_path: str): + """Get query arguments for the iceberg table function.""" + minio_url = f"http://{cluster.minio_host}:{cluster.minio_port}" + if metadata_storage == "s3": + return f"s3, filename='{table_path}/', format=Parquet, url='{minio_url}/{cluster.minio_bucket}/'" + elif metadata_storage.startswith("s3:"): + bucket = metadata_storage.split(":")[1] + return f"s3, filename='{table_path}/', format=Parquet, url='{minio_url}/{bucket}/'" + elif metadata_storage == "azure": + return f"azure, container='{cluster.azure_container_name}', storage_account_url='{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', blob_path='{table_path}/', format=Parquet" + elif metadata_storage.startswith("azure:"): + container = metadata_storage.split(":")[1] + return f"azure, container='{container}', storage_account_url='{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', blob_path='{table_path}/', format=Parquet" + elif metadata_storage == "local": + return f"local, path='/{table_path}', format=Parquet" + else: + raise ValueError(f"Unknown storage type: {metadata_storage}") + + +def find_files(directory: str, suffix: str) -> list: + """Find files ending with given suffix.""" + result = [] + for root, _, files in os.walk(directory): + for f in files: + if f.endswith(suffix): + result.append(os.path.join(root, f)) + return result + + +def path_modifier(old_path: str, new_storage: str, cluster, base_path: str): + """Create a new absolute path for a different storage location.""" + # Extract just the filename/relative portion + if "://" in old_path: + # Parse out the path part after protocol://bucket/ + parts = old_path.split("/") + # Find where the actual path starts (after bucket) + for i, part in enumerate(parts): + if base_path.split("/")[0] in part or "var" in part: + relative = "/".join(parts[i:]) + break + else: + relative = parts[-1] + else: + relative = old_path.lstrip("/") + + return get_absolute_path(new_storage, cluster, relative) + + +# ============================================================================= +# Tests +# ============================================================================= + +STORAGE_TYPES = ["s3", "azure", "local"] + +def _get_type_family(t): + if t.startswith("s3"): + return "s3" + elif t.startswith("azure"): + return "azure" + return t + +def _generate_valid_combinations(): + """ + Generate valid storage combinations. + Rule: all components must be same type family as metadata, OR local. + Local doesn't need credentials, so S3+local and Azure+local work. + But S3+Azure doesn't work (credentials aren't interchangeable). + """ + combinations = [] + for metadata in STORAGE_TYPES: + main_family = _get_type_family(metadata) + for manifest_list in STORAGE_TYPES: + if _get_type_family(manifest_list) not in (main_family, "local"): + continue + for manifest in STORAGE_TYPES: + if _get_type_family(manifest) not in (main_family, "local"): + continue + for data in STORAGE_TYPES: + if _get_type_family(data) not in (main_family, "local"): + continue + combinations.append((metadata, manifest_list, manifest, data)) + return combinations + +VALID_COMBINATIONS = _generate_valid_combinations() + +@pytest.mark.parametrize("metadata_storage,manifest_list_storage,manifest_storage,data_storage", VALID_COMBINATIONS) +def test_multi_storage_combinations(started_cluster, metadata_storage, manifest_list_storage, manifest_storage, data_storage): + """ + Test Iceberg table with all components in different storage locations. + """ + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = f"test_combo_{get_uuid_str()}" + + spark.sql(f"CREATE TABLE {TABLE_NAME} (id INT, value STRING) USING iceberg OPTIONS('format-version'='2')") + spark.sql(f"INSERT INTO {TABLE_NAME} VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')") + + # Upload to default S3 first + default_upload_directory(started_cluster, "s3", f"/iceberg_data/default/{TABLE_NAME}/", f"/iceberg_data/default/{TABLE_NAME}/") + + # Download all files + temp_dir = tempfile.mkdtemp() + host_path = os.path.join(temp_dir, TABLE_NAME) + os.makedirs(host_path, exist_ok=True) + + default_download_directory(started_cluster, "s3", f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", host_path) + + base_path = f"var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}" + metadata_dir = os.path.join(host_path, "metadata") + data_dir = os.path.join(host_path, "data") + + # Step 1: Modify manifest files to point to data_storage + manifest_files = [f for f in find_files(metadata_dir, ".avro") if not os.path.basename(f).startswith("snap-")] + for mf in manifest_files: + modify_avro_file(mf, ["data_file", "file_path"], + lambda p: path_modifier(p, data_storage, started_cluster, base_path)) + + # Step 2: Modify manifest-list files to point to manifest_storage + manifest_list_files = [f for f in find_files(metadata_dir, ".avro") if os.path.basename(f).startswith("snap-")] + for ml in manifest_list_files: + modify_avro_file(ml, ["manifest_path"], + lambda p: path_modifier(p, manifest_storage, started_cluster, base_path)) + + # Step 3: Modify metadata.json to point to manifest_list_storage + for mj in find_files(metadata_dir, ".metadata.json"): + with open(mj, 'r') as f: + data = json.load(f) + + data["location"] = get_absolute_path(metadata_storage, started_cluster, base_path) + + # Update snapshot manifest-list paths + if "snapshots" in data: + for snap in data["snapshots"]: + if "manifest-list" in snap: + snap["manifest-list"] = path_modifier(snap["manifest-list"], manifest_list_storage, started_cluster, base_path) + + with open(mj, 'w') as f: + json.dump(data, f, indent=2) + + # Step 4: Upload to respective storages + # Metadata files (*.metadata.json, version-hint.text) + meta_uploader = get_uploader(metadata_storage, started_cluster) + for f in find_files(metadata_dir, ".metadata.json") + find_files(metadata_dir, "version-hint.text"): + rel = os.path.relpath(f, host_path) + meta_uploader.upload_file(f, f"{base_path}/{rel}") + + # Manifest-list files + ml_uploader = get_uploader(manifest_list_storage, started_cluster) + for f in manifest_list_files: + rel = os.path.relpath(f, host_path) + ml_uploader.upload_file(f, f"{base_path}/{rel}") + + # Manifest files + m_uploader = get_uploader(manifest_storage, started_cluster) + for f in manifest_files: + rel = os.path.relpath(f, host_path) + m_uploader.upload_file(f, f"{base_path}/{rel}") + + # Data files + d_uploader = get_uploader(data_storage, started_cluster) + if os.path.exists(data_dir): + for f in find_files(data_dir, ".parquet"): + rel = os.path.relpath(f, host_path) + d_uploader.upload_file(f, f"{base_path}/{rel}") + + shutil.rmtree(temp_dir) + + func = get_table_function(metadata_storage) + args = get_query_args(metadata_storage, started_cluster, base_path) + + assert instance.query(f"SELECT * FROM {func}({args}) ORDER BY id") == "1\talpha\n2\tbeta\n3\tgamma\n" + + +# S3 is the primary use case for cross-bucket access. +# Azure cross-container: not supported (account_key not extractable from credential object). +def test_four_different_s3_buckets(started_cluster): + """S3: each component in a different bucket (metadata, manifest-list, manifest, data).""" + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = f"test_four_buckets_{get_uuid_str()}" + buckets = [ + started_cluster.minio_bucket, + f"{started_cluster.minio_bucket}-storage1", + f"{started_cluster.minio_bucket}-storage2", + f"{started_cluster.minio_bucket}-storage3", + ] + + metadata_storage = f"s3:{buckets[0]}" + manifest_list_storage = f"s3:{buckets[1]}" + manifest_storage = f"s3:{buckets[2]}" + data_storage = f"s3:{buckets[3]}" + + uploaders = {f"s3:{b}": S3Uploader(started_cluster.minio_client, b) for b in buckets} + + spark.sql(f"CREATE TABLE {TABLE_NAME} (id INT, name STRING, score INT) USING iceberg OPTIONS('format-version'='2')") + spark.sql(f"INSERT INTO {TABLE_NAME} VALUES (1, 'Alice', 100), (2, 'Bob', 85), (3, 'Carol', 92)") + + default_upload_directory(started_cluster, "s3", f"/iceberg_data/default/{TABLE_NAME}/", f"/iceberg_data/default/{TABLE_NAME}/") + + temp_dir = tempfile.mkdtemp() + host_path = os.path.join(temp_dir, TABLE_NAME) + os.makedirs(host_path, exist_ok=True) + + default_download_directory(started_cluster, "s3", f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", host_path) + + base_path = f"var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}" + metadata_dir = os.path.join(host_path, "metadata") + data_dir = os.path.join(host_path, "data") + + manifest_files = [f for f in find_files(metadata_dir, ".avro") if not os.path.basename(f).startswith("snap-")] + for mf in manifest_files: + modify_avro_file(mf, ["data_file", "file_path"], + lambda p: path_modifier(p, data_storage, started_cluster, base_path)) + + manifest_list_files = [f for f in find_files(metadata_dir, ".avro") if os.path.basename(f).startswith("snap-")] + for ml in manifest_list_files: + modify_avro_file(ml, ["manifest_path"], + lambda p: path_modifier(p, manifest_storage, started_cluster, base_path)) + + for mj in find_files(metadata_dir, ".metadata.json"): + with open(mj, 'r') as f: + data = json.load(f) + data["location"] = get_absolute_path(metadata_storage, started_cluster, base_path) + if "snapshots" in data: + for snap in data["snapshots"]: + if "manifest-list" in snap: + snap["manifest-list"] = path_modifier(snap["manifest-list"], manifest_list_storage, started_cluster, base_path) + with open(mj, 'w') as f: + json.dump(data, f, indent=2) + + for f in find_files(metadata_dir, ".metadata.json") + find_files(metadata_dir, "version-hint.text"): + rel = os.path.relpath(f, host_path) + uploaders[metadata_storage].upload_file(f, f"{base_path}/{rel}") + + for f in manifest_list_files: + rel = os.path.relpath(f, host_path) + uploaders[manifest_list_storage].upload_file(f, f"{base_path}/{rel}") + + for f in manifest_files: + rel = os.path.relpath(f, host_path) + uploaders[manifest_storage].upload_file(f, f"{base_path}/{rel}") + + if os.path.exists(data_dir): + for f in find_files(data_dir, ".parquet"): + rel = os.path.relpath(f, host_path) + uploaders[data_storage].upload_file(f, f"{base_path}/{rel}") + + shutil.rmtree(temp_dir) + + minio_url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}" + result = instance.query(f"SELECT * FROM icebergS3(s3, filename='{base_path}/', format=Parquet, url='{minio_url}/{buckets[0]}/') ORDER BY id") + + assert result == "1\tAlice\t100\n2\tBob\t85\n3\tCarol\t92\n" \ No newline at end of file diff --git a/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py b/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py index 5cb1c02a0c07..9a60da2b2301 100644 --- a/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py +++ b/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py @@ -55,7 +55,7 @@ def execute_spark_query(query: str): execute_spark_query( f""" - INSERT INTO {TABLE_NAME} VALUES (ARRAY(named_struct('name', 'Singapore', 'zip', 12345), named_struct('name', 'Moscow', 'zip', 54321)), ARRAY(1,2)); + INSERT INTO {TABLE_NAME} VALUES (ARRAY(named_struct('city', 'Singapore', 'zip', 12345), named_struct('city', 'Moscow', 'zip', 54321)), ARRAY(1,2)); """ ) diff --git a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json index 8d367d20f041..a983881af8f0 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "d4b695ca-ceeb-4537-8a2a-eee90dc6e313", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test", + "location" : "s3a://test/field_ids_complex_test", "last-sequence-number" : 1, "last-updated-ms" : 1757661733693, "last-column-id" : 9, @@ -96,7 +96,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", + "manifest-list" : "s3a://test/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json index 2d149abb44e7..d6c9079228ac 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "149ecc15-7afc-4311-86b3-3a4c8d4ec08e", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test", + "location" : "s3a://test/field_ids_struct_test", "last-sequence-number" : 1, "last-updated-ms" : 1753959190403, "last-column-id" : 6, @@ -84,7 +84,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", + "manifest-list" : "s3a://test/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json index 32225eb618ad..1ddc3492cc82 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "8f1f9ae2-18bb-421e-b640-ec2f85e67bce", - "location" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test", + "location" : "s3a://test/field_ids_table_test", "last-sequence-number" : 1, "last-updated-ms" : 1752481476160, "last-column-id" : 1, @@ -56,7 +56,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", + "manifest-list" : "s3a://test/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", "schema-id" : 0 } ], "statistics" : [ ], From 85dc463d15028c35f83da98767d32527363de1cf Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Mon, 1 Jun 2026 22:38:55 +0200 Subject: [PATCH 2/2] Iceberg: resolve in-table paths via path resolver for relocated tables Fixes `04034_iceberg_spark_style_location` (S3_ERROR 404 reading `warehouse/db/spark_table/metadata/snap-*.avro`). When an Iceberg table's metadata `location` differs from where the files actually live (e.g. a Spark-relocated table whose `location` is `s3a://spark-bucket/warehouse/db/spark_table` while the objects are in the configured base storage), the manifest-list / manifest / data paths in the metadata are spelled with that foreign prefix. `tryResolveObjectStorageForPath` matched such a path against `table_location` and returned the raw URI key on the base storage, so reads hit a non-existent key and failed with a 404. The raw key is only valid for paths whose bucket matches the base storage (handled by the earlier base-bucket branch). For a path that matches `table_location` but not the base bucket, only `IcebergPathResolver::resolve` can map it (strip `table_location`, prepend `table_root`), so defer to it by returning `std::nullopt`. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Storages/ObjectStorage/Utils.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index ab0315c46eb6..7b3ddcec6e2a 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -509,7 +509,14 @@ std::optional> tryResolveObjectStor /*enable_url_encoding*/ false); if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized)) - use_base_storage = true; + { + /// The path points inside the table location but is spelled with a different + /// scheme/bucket than the base storage (e.g. a Spark-relocated table whose + /// metadata `location` is `s3a://spark-bucket/...`). The real object lives in the + /// base storage at `table_root` + suffix, which only `IcebergPathResolver::resolve` + /// can compute, so defer to it instead of using the raw metadata key. + return std::nullopt; + } } if (use_base_storage)