Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_META
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS = 6;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 7;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH;

static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1;

Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,9 @@ Use multiple threads for azure multipart upload.
)", 0) \
DECLARE(Bool, s3_throw_on_zero_files_match, false, R"(
Throw an error, when ListObjects request cannot match any files
)", 0) \
DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"(
Credentials from the base storage are always propagated to secondary object storages when endpoints match. When this setting is enabled, credentials are also propagated when endpoints differ, including less secure connections (for example, from `https` to plain `http`).
)", 0) \
DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"(
Throw an error if matched zero files according to glob expansion rules.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
});
addSettingsChanges(settings_changes_history, "26.3",
{
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
{"http_max_fields", 1000000, 1000, "Reduce default to limit pre-authentication memory usage by HTTP connections."},
{"http_max_field_name_size", 131072, 4096, "Reduce default to limit pre-authentication memory usage by HTTP connections."},
{"http_max_request_header_size", 0, 10485760, "New setting to limit total HTTP request header size before authentication."},
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
LOG_DEBUG(log, "Has no credentials");
}
}
else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end())
else if (!lightweight && table_metadata.requiresCredentials()
&& std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()
&& table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
Expand Down
14 changes: 8 additions & 6 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ namespace DB

struct URIConverter
{
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper, bool enable_url_encoding = true)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery());
uri = macros.expand(mapper[uri.getScheme()]).empty()
? uri
: Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding);
}
};

Expand All @@ -32,7 +34,7 @@ namespace ErrorCodes
namespace S3
{

URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters)
URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters, bool enable_url_encoding)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
Expand All @@ -54,9 +56,9 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
else
uri_str = uri_;

uri = Poco::URI(uri_str);
uri = Poco::URI(uri_str, enable_url_encoding);
/// Keep a copy of how Poco parsed the original string before any mapping
Poco::URI original_uri(uri_str);
Poco::URI original_uri(uri_str, enable_url_encoding);
bool looks_like_presigned = false;
for (const auto & [qk, qv] : original_uri.getQueryParameters())
{
Expand Down Expand Up @@ -101,7 +103,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
}

if (!mapper.empty())
URIConverter::modifyURI(uri, mapper);
URIConverter::modifyURI(uri, mapper, enable_url_encoding);
}

storage_name = "S3";
Expand Down
3 changes: 2 additions & 1 deletion src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ struct URI
explicit URI(
const std::string & uri_,
bool allow_archive_path_syntax = false,
bool keep_presigned_query_parameters = true);
bool keep_presigned_query_parameters = true,
bool enable_url_encoding = true);
void addRegionToURI(const std::string & region);

static void validateBucket(const std::string & bucket, const Poco::URI & uri);
Expand Down
6 changes: 2 additions & 4 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
data_lake_metadata = object->data_lake_metadata.value();

#if USE_AVRO
if (std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
{
iceberg_info = dynamic_cast<IcebergDataObjectInfo &>(*object).info;
}
if (auto iceberg_object = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
iceberg_info = iceberg_object->info;
#endif

file_meta_info = object->relative_path_with_metadata.file_meta_info;
Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/IcebergMetadataLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,16 @@ void insertRowToLogTable(
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg metadata log table is not configured");
}

String normalized_table_path = table_path;
while (normalized_table_path.size() > 1 && normalized_table_path.back() == '/')
normalized_table_path.pop_back();

iceberg_metadata_log->add(
DB::IcebergMetadataLogElement{
.current_time = spec.tv_sec,
.query_id = local_context->getCurrentQueryId(),
.content_type = row_log_level,
.table_path = table_path,
.table_path = normalized_table_path,
.file_path = file_path.serialize(),
.metadata_content = get_row(),
.row_in_file = row_in_file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}
}


const auto file_path_key = IcebergPathFromMetadata::deserialize(
const auto file_path_from_metadata = IcebergPathFromMetadata::deserialize(
getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>());
/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
Expand Down Expand Up @@ -248,7 +247,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
case FileContentType::DATA: {
return std::make_shared<const ParsedManifestFileEntry>(
FileContentType::DATA,
file_path_key,
file_path_from_metadata,
row_index,
status,
sequence_number,
Expand Down Expand Up @@ -295,7 +294,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}
return std::make_shared<const ParsedManifestFileEntry>(
FileContentType::POSITION_DELETE,
file_path_key,
file_path_from_metadata,
row_index,
status,
sequence_number,
Expand Down Expand Up @@ -326,7 +325,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
c_data_file_equality_ids);
return std::make_shared<const ParsedManifestFileEntry>(
FileContentType::EQUALITY_DELETE,
file_path_key,
file_path_from_metadata,
row_index,
status,
sequence_number,
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class IDataLakeMetadata : boost::noncopyable

virtual bool operator==(const IDataLakeMetadata & other) const = 0;

/// Returns the full table location URI (e.g. `s3a://bucket/prefix/table/`)
virtual std::string getTableLocation() const { return {}; }

/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
Expand Down
Loading
Loading