diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 962b123234bb..46bcdc223186 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -650,6 +650,20 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( return QueryProcessingStage::Enum::FetchColumns; } +NamesAndTypesList IStorageCluster::getHivePartitionColumnsWithoutVirtuals() const +{ + // Virtual columns can contain hive columns, so we remove these hive coulmns to avoid duplicates. + // In non-cluster case these columns are filtered in DB::prepareReadingFromFormat function. + auto virtual_columns = getVirtualsList(); + NamesAndTypesList hive_partition_filtered; + for (const auto & hive_name_and_type : hive_partition_columns_to_read_from_file_path) + { + if (!virtual_columns.contains(hive_name_and_type.name)) + hive_partition_filtered.emplace_back(hive_name_and_type); + } + return hive_partition_filtered; +} + ContextPtr ReadFromCluster::updateSettings(const Settings & settings) { Settings new_settings{settings}; diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2c0c6d3c6029..4eb64b9b830b 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -105,6 +105,10 @@ class IStorageCluster : public IStorage throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName()); } + NamesAndTypesList getHivePartitionColumnsWithoutVirtuals() const; + + NamesAndTypesList hive_partition_columns_to_read_from_file_path; + private: static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index e5131d06ae2e..39ad348c7f8b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -622,7 +622,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten predicate, filter, getVirtualsList(), - hive_partition_columns_to_read_from_file_path, + getHivePartitionColumnsWithoutVirtuals(), nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/false, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 52c7d5951855..bcaf293ad4e7 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -214,7 +214,6 @@ class StorageObjectStorageCluster : public IStorageCluster const String engine_name; StorageObjectStorageConfigurationPtr configuration; const ObjectStoragePtr object_storage; - NamesAndTypesList hive_partition_columns_to_read_from_file_path; bool cluster_name_in_settings; /// non-clustered storage to fall back on pure realisation if needed diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 08485417dc50..1aefb32a9883 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -68,16 +68,23 @@ StorageFileCluster::StorageFileCluster( auto & storage_columns = storage_metadata.columns; + const auto sample_path = paths.empty() ? "" : paths.front(); + /// Not grabbing the file_columns because it is not necessary to do it here. std::tie(hive_partition_columns_to_read_from_file_path, std::ignore) = HivePartitioningUtils::setupHivePartitioningForFileURLLikeStorage( storage_columns, - paths.empty() ? "" : paths.front(), + sample_path, columns_.empty(), std::nullopt, context); storage_metadata.setConstraints(constraints_); - setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.columns, context)); + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage( + storage_metadata.columns, + context, + std::nullopt, + PartitionStrategyFactory::StrategyType::NONE, + sample_path)); setInMemoryMetadata(storage_metadata); } @@ -139,7 +146,7 @@ RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( std::nullopt, predicate, getVirtualsList(), - hive_partition_columns_to_read_from_file_path, + getHivePartitionColumnsWithoutVirtuals(), context ); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index eb2a70f60b89..dc4c49573623 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -45,7 +45,6 @@ class StorageFileCluster : public IStorageCluster Strings paths; String filename; String format_name; - NamesAndTypesList hive_partition_columns_to_read_from_file_path; }; } diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 0d3723f4e1fc..26e80d0f8981 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -167,7 +167,7 @@ RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), - hive_partition_columns_to_read_from_file_path, + getHivePartitionColumnsWithoutVirtuals(), context ); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index e8ea21ffd306..e644608e20b1 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -47,7 +47,6 @@ class StorageURLCluster : public IStorageCluster String uri; String format_name; - NamesAndTypesList hive_partition_columns_to_read_from_file_path; }; diff --git a/tests/integration/test_file_cluster/test.py b/tests/integration/test_file_cluster/test.py index 04ed4c51cee3..4e97bb0beb43 100644 --- a/tests/integration/test_file_cluster/test.py +++ b/tests/integration/test_file_cluster/test.py @@ -1,6 +1,7 @@ import csv import logging import time +import uuid import pytest @@ -211,3 +212,60 @@ def test_format_detection(started_cluster): "select * from fileCluster('my_cluster', 'file_for_format_detection*', auto, 's String, i UInt32', auto) ORDER BY (i, s)" ) assert result == expected_result + + +def test_hive_partitioning_with_where_condition(started_cluster): + test_id = uuid.uuid4().hex[:8] + hive_glob = f"hive_file_cluster_{test_id}/date=*/data.csv" + + for node_name in ("s0_0_0", "s0_0_1", "s0_1_0"): + node = started_cluster.instances[node_name] + for i in range(1, 5): + node.query( + f""" + INSERT INTO TABLE FUNCTION file( + 'hive_file_cluster_{test_id}/date=2000-01-0{i}/data.csv', 'CSVWithNames', 'd UInt64') + SELECT number FROM numbers(10) + SETTINGS engine_file_truncate_on_insert=1 + """ + ) + + node = started_cluster.instances["s0_0_0"] + + result = node.query( + f""" + SELECT count() FROM file('{hive_glob}', 'CSVWithNames', 'd UInt64') + WHERE date='2000-01-02' + SETTINGS use_hive_partitioning=1 + """ + ) + assert result.strip() == "10" + + result = node.query( + f""" + SELECT date, d FROM file('{hive_glob}', 'CSVWithNames', 'd UInt64') + WHERE date='2000-01-02' + LIMIT 1 + SETTINGS use_hive_partitioning=1 + """ + ) + assert "2000-01-02" in result + + result = node.query( + f""" + SELECT count() FROM fileCluster('my_cluster', '{hive_glob}', 'CSVWithNames', 'd UInt64') + WHERE date='2000-01-02' + SETTINGS use_hive_partitioning=1 + """ + ) + assert result.strip() == "10" + + result = node.query( + f""" + SELECT date, d FROM fileCluster('my_cluster', '{hive_glob}', 'CSVWithNames', 'd UInt64') + WHERE date='2000-01-02' + LIMIT 1 + SETTINGS use_hive_partitioning=1 + """ + ) + assert "2000-01-02" in result diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index b990c0709f08..c53f215a8520 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1490,3 +1490,38 @@ def test_object_storage_remote_initiator_without_cluster_function(started_cluste assert users[1:] == ["s0_0_0\tdefault", "s0_0_1\tfoo", "s0_1_0\tfoo"] + + +def test_hive_partitioning_with_where_condition(started_cluster): + node = started_cluster.instances["s0_0_0"] + test_id = uuid.uuid4().hex[:8] + + for i in range(1, 5): + node.query( + f""" + INSERT INTO FUNCTION s3('http://minio1:9001/root/hive/{test_id}/date=2000-01-0{i}/data.csv', + 'minio','{minio_secret_key}','CSVWithNames','d UInt64') + SELECT number FROM numbers(10) + SETTINGS s3_truncate_on_insert=1 + """) + + # Direct query + result = node.query( + f""" + SELECT count() FROM s3('http://minio1:9001/root/hive/{test_id}/date=*/data.csv', + 'minio','{minio_secret_key}','CSVWithNames','d UInt64') + WHERE date='2000-01-02' + SETTINGS use_hive_partitioning=1 + """ + ) + assert result.strip() == "10" + + result = node.query( + f""" + SELECT count() FROM s3Cluster('cluster_simple', 'http://minio1:9001/root/hive/{test_id}/date=*/data.csv', + 'minio','{minio_secret_key}','CSVWithNames','d UInt64') + WHERE date='2000-01-02' + SETTINGS use_hive_partitioning=1 + """ + ) + assert result.strip() == "10" diff --git a/tests/integration/test_storage_url/test.py b/tests/integration/test_storage_url/test.py index 8c7bc908f932..d93d28f30702 100644 --- a/tests/integration/test_storage_url/test.py +++ b/tests/integration/test_storage_url/test.py @@ -42,6 +42,38 @@ def test_partition_by(): assert result.strip() == "1\t2\t3" +def test_hive_partitioning_with_where_condition(): + test_id = uuid.uuid4().hex[:8] + base_url = f"http://nginx:80/hive_url_cluster_{test_id}" + + node1.query( + f""" + INSERT INTO FUNCTION url(url_file, url='{base_url}/date=2000-01-01/data.csv', format='CSVWithNames', structure='d UInt64') + SELECT number FROM numbers(10) + """ + ) + + # 'ur' table function does not work with globs, so we have to test hive partitioning with a single file. + result = node1.query( + f""" + SELECT count() FROM url('{base_url}/date=2000-01-01/data.csv', 'CSVWithNames', 'd UInt64') + WHERE date='2000-01-01' + SETTINGS use_hive_partitioning=1 + """ + ) + assert result.strip() == "10" + + result = node1.query( + f""" + SELECT count() FROM urlCluster( + 'test_cluster_two_shards', '{base_url}/date=2000-01-01/data.csv', 'CSVWithNames', 'd UInt64') + WHERE date='2000-01-01' + SETTINGS use_hive_partitioning=1 + """ + ) + assert result.strip() == "10" + + def test_url_cluster(): result = node1.query( f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"