Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 107 additions & 26 deletions python/cuvs/cuvs/tests/test_mg_cagra.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#

Expand All @@ -15,15 +15,37 @@
from cuvs.tests.ann_utils import calc_recall, generate_data


# Check if multi-GPU functionality is available
def has_multiple_gpus():
"""Check if system has multiple GPUs available."""
MIN_ROWS_PER_SHARDED_GRAPH_DEGREE = 4


def get_gpu_count():
"""Return the number of visible CUDA devices."""
try:
import cupy as cp

return cp.cuda.runtime.getDeviceCount() > 1
return cp.cuda.runtime.getDeviceCount()
except Exception:
return False
return 0


# Check if multi-GPU functionality is available
def has_multiple_gpus():
"""Check if system has multiple GPUs available."""
return get_gpu_count() > 1


def _n_rows_for_distribution(
n_rows, distribution_mode, graph_degree, intermediate_graph_degree
):
"""Keep sharded CAGRA graphs large enough as GPU count increases."""
if distribution_mode != "sharded":
return n_rows

min_rows_per_shard = (
max(graph_degree, intermediate_graph_degree)
* MIN_ROWS_PER_SHARDED_GRAPH_DEGREE
)
return max(n_rows, max(1, get_gpu_count()) * min_rows_per_shard)


# Mark tests that require multiple GPUs
Expand Down Expand Up @@ -54,6 +76,10 @@ def run_mg_cagra_build_search_test(
Note: Multi-GPU CAGRA requires host memory arrays (NumPy), not device
arrays.
"""
n_rows = _n_rows_for_distribution(
n_rows, distribution_mode, graph_degree, intermediate_graph_degree
)

# Generate host memory arrays (NumPy)
dataset = generate_data((n_rows, n_cols), dtype)
if metric == "inner_product":
Expand All @@ -79,8 +105,7 @@ def run_mg_cagra_build_search_test(
assert index.trained

# Search parameters
if search_params is None:
search_params = {}
search_params = dict(search_params or {})
search_params_obj = mg_cagra.SearchParams(
search_mode=search_mode,
merge_mode=merge_mode,
Expand All @@ -102,6 +127,9 @@ def run_mg_cagra_build_search_test(
assert isinstance(neighbors, np.ndarray)
assert distances.shape == (n_queries, k)
assert neighbors.shape == (n_queries, k)
assert np.all(neighbors >= 0)
assert np.all(neighbors < n_rows)
assert np.all(np.isfinite(distances))

if not compare:
return distances, neighbors
Expand All @@ -122,7 +150,13 @@ def run_mg_cagra_build_search_test(
# Multi-GPU implementation may have lower recall due to data
# distribution across GPUs
# This is acceptable as long as the functionality works correctly
assert recall > 0.3, f"Recall too low: {recall:.3f}"
assert recall > 0.3, (
f"Recall too low: {recall:.3f} "
f"(n_rows={n_rows}, distribution_mode={distribution_mode}, "
f"graph_degree={graph_degree}, "
f"intermediate_graph_degree={intermediate_graph_degree}, "
f"num_gpus={get_gpu_count()})"
)

return distances, neighbors

Expand Down Expand Up @@ -202,18 +236,28 @@ def test_mg_cagra_distribution_modes(distribution_mode):


@requires_multiple_gpus
@pytest.mark.parametrize("search_mode", ["load_balancer", "round_robin"])
@pytest.mark.parametrize("merge_mode", ["merge_on_root_rank", "tree_merge"])
def test_mg_cagra_search_params(search_mode, merge_mode):
"""Test different multi-GPU search parameters."""
@pytest.mark.parametrize(
"distribution_mode,search_mode,merge_mode,n_rows_per_batch",
[
("replicated", "load_balancer", "tree_merge", 500),
("replicated", "round_robin", "tree_merge", 2000),
("sharded", "load_balancer", "merge_on_root_rank", 500),
("sharded", "load_balancer", "tree_merge", 500),
],
)
def test_mg_cagra_search_params(
distribution_mode, search_mode, merge_mode, n_rows_per_batch
):
"""Test the relevant replicated and sharded search parameters."""
run_mg_cagra_build_search_test(
n_rows=1500,
n_cols=8,
n_queries=15,
k=5,
distribution_mode=distribution_mode,
search_mode=search_mode,
merge_mode=merge_mode,
n_rows_per_batch=500,
n_rows_per_batch=n_rows_per_batch,
graph_degree=32,
intermediate_graph_degree=64,
)
Expand All @@ -239,7 +283,12 @@ def test_mg_cagra_metrics(metric):
@requires_multiple_gpus
def test_mg_cagra_serialize():
"""Test save/load functionality for multi-GPU CAGRA."""
n_rows, n_cols = 2000, 8
graph_degree = 32
intermediate_graph_degree = 64
n_rows = _n_rows_for_distribution(
2000, "sharded", graph_degree, intermediate_graph_degree
)
n_cols = 8
k = 5

# Generate data
Expand All @@ -250,7 +299,8 @@ def test_mg_cagra_serialize():

# Build original index
build_params = mg_cagra.IndexParams(
graph_degree=32, intermediate_graph_degree=64
graph_degree=graph_degree,
intermediate_graph_degree=intermediate_graph_degree,
)
original_index = mg_cagra.build(build_params, dataset, resources=resources)

Expand Down Expand Up @@ -342,6 +392,9 @@ def test_mg_cagra_distribute():

assert distances.shape == (20, k)
assert neighbors.shape == (20, k)
assert np.all(neighbors >= 0)
assert np.all(neighbors < n_rows)
assert np.all(np.isfinite(distances))

finally:
if os.path.exists(temp_filename):
Expand All @@ -355,15 +408,21 @@ def test_memory_location_validation():
except ImportError:
pytest.skip("CuPy not available for memory location tests")

n_rows, n_cols = 1500, 8
graph_degree = 32
intermediate_graph_degree = 64
n_rows = _n_rows_for_distribution(
1500, "sharded", graph_degree, intermediate_graph_degree
)
n_cols = 8

# Create host and device arrays
host_data = generate_data((n_rows, n_cols), np.float32)
device_data = cp.asarray(host_data)

resources = MultiGpuResources()
build_params = mg_cagra.IndexParams(
graph_degree=32, intermediate_graph_degree=64
graph_degree=graph_degree,
intermediate_graph_degree=intermediate_graph_degree,
)

# Test that device arrays are rejected for build
Expand All @@ -389,6 +448,9 @@ def test_memory_location_validation():
)
assert isinstance(distances, np.ndarray)
assert isinstance(neighbors, np.ndarray)
assert np.all(neighbors >= 0)
assert np.all(neighbors < n_rows)
assert np.all(np.isfinite(distances))


def test_parameter_validation():
Expand Down Expand Up @@ -447,7 +509,12 @@ def test_untrained_index_error():
@requires_multiple_gpus
def test_mg_cagra_with_prealloc_output():
"""Test multi-GPU CAGRA search with pre-allocated output arrays."""
n_rows, n_cols = 1500, 8
graph_degree = 32
intermediate_graph_degree = 64
n_rows = _n_rows_for_distribution(
1500, "sharded", graph_degree, intermediate_graph_degree
)
n_cols = 8
n_queries = 20
k = 5

Expand All @@ -459,7 +526,8 @@ def test_mg_cagra_with_prealloc_output():

# Build index
build_params = mg_cagra.IndexParams(
graph_degree=32, intermediate_graph_degree=64
graph_degree=graph_degree,
intermediate_graph_degree=intermediate_graph_degree,
)
index = mg_cagra.build(build_params, dataset, resources=resources)

Expand All @@ -484,6 +552,9 @@ def test_mg_cagra_with_prealloc_output():
assert ret_neighbors is neighbors
assert distances.shape == (n_queries, k)
assert neighbors.shape == (n_queries, k)
assert np.all(neighbors >= 0)
assert np.all(neighbors < n_rows)
assert np.all(np.isfinite(distances))


def test_index_repr():
Expand All @@ -500,7 +571,12 @@ def test_mg_cagra_simple():
pytest.skip("Multi-GPU tests require multiple GPUs")

# Use simple test case that should definitely work
n_rows, n_cols = 1000, 8
graph_degree = 16
intermediate_graph_degree = 32
n_rows = _n_rows_for_distribution(
1000, "sharded", graph_degree, intermediate_graph_degree
)
n_cols = 8
n_queries, k = 20, 5

# Generate data
Expand All @@ -512,8 +588,8 @@ def test_mg_cagra_simple():
# Use small graph for reliable testing
build_params = mg_cagra.IndexParams(
metric="sqeuclidean",
graph_degree=16,
intermediate_graph_degree=32,
graph_degree=graph_degree,
intermediate_graph_degree=intermediate_graph_degree,
)

# Build index
Expand Down Expand Up @@ -547,7 +623,12 @@ def test_mg_cagra_simple():
@requires_multiple_gpus
def test_mg_cagra_integration():
"""Integration test covering build, search, and serialization."""
n_rows, n_cols = 2000, 8
graph_degree = 32
intermediate_graph_degree = 64
n_rows = _n_rows_for_distribution(
2000, "sharded", graph_degree, intermediate_graph_degree
)
n_cols = 8
k = 5

# Generate initial dataset
Expand All @@ -560,8 +641,8 @@ def test_mg_cagra_integration():
build_params = mg_cagra.IndexParams(
distribution_mode="sharded",
metric="sqeuclidean",
graph_degree=32,
intermediate_graph_degree=64,
graph_degree=graph_degree,
intermediate_graph_degree=intermediate_graph_degree,
)
index = mg_cagra.build(build_params, dataset, resources=resources)

Expand Down
Loading
Loading