Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,20 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
return QueryProcessingStage::Enum::FetchColumns;
}

NamesAndTypesList IStorageCluster::getHivePartitionColumnsWithoutVirtuals() const
{
// Virtual columns can contain hive columns, so we remove these hive coulmns to avoid duplicates.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc, that wasn't supposed to be the case

// In non-cluster case these columns are filtered in DB::prepareReadingFromFormat function.
auto virtual_columns = getVirtualsList();
NamesAndTypesList hive_partition_filtered;
for (const auto & hive_name_and_type : hive_partition_columns_to_read_from_file_path)
{
if (!virtual_columns.contains(hive_name_and_type.name))
hive_partition_filtered.emplace_back(hive_name_and_type);
}
return hive_partition_filtered;
}

ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
{
Settings new_settings{settings};
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ class IStorageCluster : public IStorage
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
}

NamesAndTypesList getHivePartitionColumnsWithoutVirtuals() const;

NamesAndTypesList hive_partition_columns_to_read_from_file_path;

private:
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
predicate,
filter,
getVirtualsList(),
hive_partition_columns_to_read_from_file_path,
getHivePartitionColumnsWithoutVirtuals(),
nullptr,
local_context->getFileProgressCallback(),
/*ignore_archive_globs=*/false,
Expand Down
1 change: 0 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ class StorageObjectStorageCluster : public IStorageCluster
const String engine_name;
StorageObjectStorageConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
NamesAndTypesList hive_partition_columns_to_read_from_file_path;
bool cluster_name_in_settings;

/// non-clustered storage to fall back on pure realisation if needed
Expand Down
13 changes: 10 additions & 3 deletions src/Storages/StorageFileCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,23 @@ StorageFileCluster::StorageFileCluster(

auto & storage_columns = storage_metadata.columns;

const auto sample_path = paths.empty() ? "" : paths.front();

/// Not grabbing the file_columns because it is not necessary to do it here.
std::tie(hive_partition_columns_to_read_from_file_path, std::ignore) = HivePartitioningUtils::setupHivePartitioningForFileURLLikeStorage(
storage_columns,
paths.empty() ? "" : paths.front(),
sample_path,
columns_.empty(),
std::nullopt,
context);

storage_metadata.setConstraints(constraints_);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.columns, context));
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(
storage_metadata.columns,
context,
std::nullopt,
PartitionStrategyFactory::StrategyType::NONE,
sample_path));
setInMemoryMetadata(storage_metadata);
}

Expand Down Expand Up @@ -139,7 +146,7 @@ RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
std::nullopt,
predicate,
getVirtualsList(),
hive_partition_columns_to_read_from_file_path,
getHivePartitionColumnsWithoutVirtuals(),
context
);
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
Expand Down
1 change: 0 additions & 1 deletion src/Storages/StorageFileCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class StorageFileCluster : public IStorageCluster
Strings paths;
String filename;
String format_name;
NamesAndTypesList hive_partition_columns_to_read_from_file_path;
};

}
2 changes: 1 addition & 1 deletion src/Storages/StorageURLCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(
context->getSettingsRef()[Setting::glob_expansion_max_elements],
predicate,
getVirtualsList(),
hive_partition_columns_to_read_from_file_path,
getHivePartitionColumnsWithoutVirtuals(),
context
);
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
Expand Down
1 change: 0 additions & 1 deletion src/Storages/StorageURLCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class StorageURLCluster : public IStorageCluster

String uri;
String format_name;
NamesAndTypesList hive_partition_columns_to_read_from_file_path;
};


Expand Down
58 changes: 58 additions & 0 deletions tests/integration/test_file_cluster/test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import logging
import time
import uuid

import pytest

Expand Down Expand Up @@ -211,3 +212,60 @@ def test_format_detection(started_cluster):
"select * from fileCluster('my_cluster', 'file_for_format_detection*', auto, 's String, i UInt32', auto) ORDER BY (i, s)"
)
assert result == expected_result


def test_hive_partitioning_with_where_condition(started_cluster):
test_id = uuid.uuid4().hex[:8]
hive_glob = f"hive_file_cluster_{test_id}/date=*/data.csv"

for node_name in ("s0_0_0", "s0_0_1", "s0_1_0"):
node = started_cluster.instances[node_name]
for i in range(1, 5):
node.query(
f"""
INSERT INTO TABLE FUNCTION file(
'hive_file_cluster_{test_id}/date=2000-01-0{i}/data.csv', 'CSVWithNames', 'd UInt64')
SELECT number FROM numbers(10)
SETTINGS engine_file_truncate_on_insert=1
"""
)

node = started_cluster.instances["s0_0_0"]

result = node.query(
f"""
SELECT count() FROM file('{hive_glob}', 'CSVWithNames', 'd UInt64')
WHERE date='2000-01-02'
SETTINGS use_hive_partitioning=1
"""
)
assert result.strip() == "10"

result = node.query(
f"""
SELECT date, d FROM file('{hive_glob}', 'CSVWithNames', 'd UInt64')
WHERE date='2000-01-02'
LIMIT 1
SETTINGS use_hive_partitioning=1
"""
)
assert "2000-01-02" in result

result = node.query(
f"""
SELECT count() FROM fileCluster('my_cluster', '{hive_glob}', 'CSVWithNames', 'd UInt64')
WHERE date='2000-01-02'
SETTINGS use_hive_partitioning=1
"""
)
assert result.strip() == "10"

result = node.query(
f"""
SELECT date, d FROM fileCluster('my_cluster', '{hive_glob}', 'CSVWithNames', 'd UInt64')
WHERE date='2000-01-02'
LIMIT 1
SETTINGS use_hive_partitioning=1
"""
)
assert "2000-01-02" in result
35 changes: 35 additions & 0 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1490,3 +1490,38 @@ def test_object_storage_remote_initiator_without_cluster_function(started_cluste
assert users[1:] == ["s0_0_0\tdefault",
"s0_0_1\tfoo",
"s0_1_0\tfoo"]


def test_hive_partitioning_with_where_condition(started_cluster):
node = started_cluster.instances["s0_0_0"]
test_id = uuid.uuid4().hex[:8]

for i in range(1, 5):
node.query(
f"""
INSERT INTO FUNCTION s3('http://minio1:9001/root/hive/{test_id}/date=2000-01-0{i}/data.csv',
'minio','{minio_secret_key}','CSVWithNames','d UInt64')
SELECT number FROM numbers(10)
SETTINGS s3_truncate_on_insert=1
""")

# Direct query
result = node.query(
f"""
SELECT count() FROM s3('http://minio1:9001/root/hive/{test_id}/date=*/data.csv',
'minio','{minio_secret_key}','CSVWithNames','d UInt64')
WHERE date='2000-01-02'
SETTINGS use_hive_partitioning=1
"""
)
assert result.strip() == "10"

result = node.query(
f"""
SELECT count() FROM s3Cluster('cluster_simple', 'http://minio1:9001/root/hive/{test_id}/date=*/data.csv',
'minio','{minio_secret_key}','CSVWithNames','d UInt64')
WHERE date='2000-01-02'
SETTINGS use_hive_partitioning=1
"""
)
assert result.strip() == "10"
32 changes: 32 additions & 0 deletions tests/integration/test_storage_url/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,38 @@ def test_partition_by():
assert result.strip() == "1\t2\t3"


def test_hive_partitioning_with_where_condition():
test_id = uuid.uuid4().hex[:8]
base_url = f"http://nginx:80/hive_url_cluster_{test_id}"

node1.query(
f"""
INSERT INTO FUNCTION url(url_file, url='{base_url}/date=2000-01-01/data.csv', format='CSVWithNames', structure='d UInt64')
SELECT number FROM numbers(10)
"""
)

# 'ur' table function does not work with globs, so we have to test hive partitioning with a single file.
result = node1.query(
f"""
SELECT count() FROM url('{base_url}/date=2000-01-01/data.csv', 'CSVWithNames', 'd UInt64')
WHERE date='2000-01-01'
SETTINGS use_hive_partitioning=1
"""
)
assert result.strip() == "10"

result = node1.query(
f"""
SELECT count() FROM urlCluster(
'test_cluster_two_shards', '{base_url}/date=2000-01-01/data.csv', 'CSVWithNames', 'd UInt64')
WHERE date='2000-01-01'
SETTINGS use_hive_partitioning=1
"""
)
assert result.strip() == "10"


def test_url_cluster():
result = node1.query(
f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
Expand Down
Loading