diff --git a/python/cuvs/cuvs/tests/test_mg_cagra.py b/python/cuvs/cuvs/tests/test_mg_cagra.py index 903c16ea24..133779a82c 100644 --- a/python/cuvs/cuvs/tests/test_mg_cagra.py +++ b/python/cuvs/cuvs/tests/test_mg_cagra.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # @@ -15,15 +15,37 @@ from cuvs.tests.ann_utils import calc_recall, generate_data -# Check if multi-GPU functionality is available -def has_multiple_gpus(): - """Check if system has multiple GPUs available.""" +MIN_ROWS_PER_SHARDED_GRAPH_DEGREE = 4 + + +def get_gpu_count(): + """Return the number of visible CUDA devices.""" try: import cupy as cp - return cp.cuda.runtime.getDeviceCount() > 1 + return cp.cuda.runtime.getDeviceCount() except Exception: - return False + return 0 + + +# Check if multi-GPU functionality is available +def has_multiple_gpus(): + """Check if system has multiple GPUs available.""" + return get_gpu_count() > 1 + + +def _n_rows_for_distribution( + n_rows, distribution_mode, graph_degree, intermediate_graph_degree +): + """Keep sharded CAGRA graphs large enough as GPU count increases.""" + if distribution_mode != "sharded": + return n_rows + + min_rows_per_shard = ( + max(graph_degree, intermediate_graph_degree) + * MIN_ROWS_PER_SHARDED_GRAPH_DEGREE + ) + return max(n_rows, max(1, get_gpu_count()) * min_rows_per_shard) # Mark tests that require multiple GPUs @@ -54,6 +76,10 @@ def run_mg_cagra_build_search_test( Note: Multi-GPU CAGRA requires host memory arrays (NumPy), not device arrays. """ + n_rows = _n_rows_for_distribution( + n_rows, distribution_mode, graph_degree, intermediate_graph_degree + ) + # Generate host memory arrays (NumPy) dataset = generate_data((n_rows, n_cols), dtype) if metric == "inner_product": @@ -79,8 +105,7 @@ def run_mg_cagra_build_search_test( assert index.trained # Search parameters - if search_params is None: - search_params = {} + search_params = dict(search_params or {}) search_params_obj = mg_cagra.SearchParams( search_mode=search_mode, merge_mode=merge_mode, @@ -102,6 +127,9 @@ def run_mg_cagra_build_search_test( assert isinstance(neighbors, np.ndarray) assert distances.shape == (n_queries, k) assert neighbors.shape == (n_queries, k) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) if not compare: return distances, neighbors @@ -122,7 +150,13 @@ def run_mg_cagra_build_search_test( # Multi-GPU implementation may have lower recall due to data # distribution across GPUs # This is acceptable as long as the functionality works correctly - assert recall > 0.3, f"Recall too low: {recall:.3f}" + assert recall > 0.3, ( + f"Recall too low: {recall:.3f} " + f"(n_rows={n_rows}, distribution_mode={distribution_mode}, " + f"graph_degree={graph_degree}, " + f"intermediate_graph_degree={intermediate_graph_degree}, " + f"num_gpus={get_gpu_count()})" + ) return distances, neighbors @@ -202,18 +236,28 @@ def test_mg_cagra_distribution_modes(distribution_mode): @requires_multiple_gpus -@pytest.mark.parametrize("search_mode", ["load_balancer", "round_robin"]) -@pytest.mark.parametrize("merge_mode", ["merge_on_root_rank", "tree_merge"]) -def test_mg_cagra_search_params(search_mode, merge_mode): - """Test different multi-GPU search parameters.""" +@pytest.mark.parametrize( + "distribution_mode,search_mode,merge_mode,n_rows_per_batch", + [ + ("replicated", "load_balancer", "tree_merge", 500), + ("replicated", "round_robin", "tree_merge", 2000), + ("sharded", "load_balancer", "merge_on_root_rank", 500), + ("sharded", "load_balancer", "tree_merge", 500), + ], +) +def test_mg_cagra_search_params( + distribution_mode, search_mode, merge_mode, n_rows_per_batch +): + """Test the relevant replicated and sharded search parameters.""" run_mg_cagra_build_search_test( n_rows=1500, n_cols=8, n_queries=15, k=5, + distribution_mode=distribution_mode, search_mode=search_mode, merge_mode=merge_mode, - n_rows_per_batch=500, + n_rows_per_batch=n_rows_per_batch, graph_degree=32, intermediate_graph_degree=64, ) @@ -239,7 +283,12 @@ def test_mg_cagra_metrics(metric): @requires_multiple_gpus def test_mg_cagra_serialize(): """Test save/load functionality for multi-GPU CAGRA.""" - n_rows, n_cols = 2000, 8 + graph_degree = 32 + intermediate_graph_degree = 64 + n_rows = _n_rows_for_distribution( + 2000, "sharded", graph_degree, intermediate_graph_degree + ) + n_cols = 8 k = 5 # Generate data @@ -250,7 +299,8 @@ def test_mg_cagra_serialize(): # Build original index build_params = mg_cagra.IndexParams( - graph_degree=32, intermediate_graph_degree=64 + graph_degree=graph_degree, + intermediate_graph_degree=intermediate_graph_degree, ) original_index = mg_cagra.build(build_params, dataset, resources=resources) @@ -342,6 +392,9 @@ def test_mg_cagra_distribute(): assert distances.shape == (20, k) assert neighbors.shape == (20, k) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) finally: if os.path.exists(temp_filename): @@ -355,7 +408,12 @@ def test_memory_location_validation(): except ImportError: pytest.skip("CuPy not available for memory location tests") - n_rows, n_cols = 1500, 8 + graph_degree = 32 + intermediate_graph_degree = 64 + n_rows = _n_rows_for_distribution( + 1500, "sharded", graph_degree, intermediate_graph_degree + ) + n_cols = 8 # Create host and device arrays host_data = generate_data((n_rows, n_cols), np.float32) @@ -363,7 +421,8 @@ def test_memory_location_validation(): resources = MultiGpuResources() build_params = mg_cagra.IndexParams( - graph_degree=32, intermediate_graph_degree=64 + graph_degree=graph_degree, + intermediate_graph_degree=intermediate_graph_degree, ) # Test that device arrays are rejected for build @@ -389,6 +448,9 @@ def test_memory_location_validation(): ) assert isinstance(distances, np.ndarray) assert isinstance(neighbors, np.ndarray) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) def test_parameter_validation(): @@ -447,7 +509,12 @@ def test_untrained_index_error(): @requires_multiple_gpus def test_mg_cagra_with_prealloc_output(): """Test multi-GPU CAGRA search with pre-allocated output arrays.""" - n_rows, n_cols = 1500, 8 + graph_degree = 32 + intermediate_graph_degree = 64 + n_rows = _n_rows_for_distribution( + 1500, "sharded", graph_degree, intermediate_graph_degree + ) + n_cols = 8 n_queries = 20 k = 5 @@ -459,7 +526,8 @@ def test_mg_cagra_with_prealloc_output(): # Build index build_params = mg_cagra.IndexParams( - graph_degree=32, intermediate_graph_degree=64 + graph_degree=graph_degree, + intermediate_graph_degree=intermediate_graph_degree, ) index = mg_cagra.build(build_params, dataset, resources=resources) @@ -484,6 +552,9 @@ def test_mg_cagra_with_prealloc_output(): assert ret_neighbors is neighbors assert distances.shape == (n_queries, k) assert neighbors.shape == (n_queries, k) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) def test_index_repr(): @@ -500,7 +571,12 @@ def test_mg_cagra_simple(): pytest.skip("Multi-GPU tests require multiple GPUs") # Use simple test case that should definitely work - n_rows, n_cols = 1000, 8 + graph_degree = 16 + intermediate_graph_degree = 32 + n_rows = _n_rows_for_distribution( + 1000, "sharded", graph_degree, intermediate_graph_degree + ) + n_cols = 8 n_queries, k = 20, 5 # Generate data @@ -512,8 +588,8 @@ def test_mg_cagra_simple(): # Use small graph for reliable testing build_params = mg_cagra.IndexParams( metric="sqeuclidean", - graph_degree=16, - intermediate_graph_degree=32, + graph_degree=graph_degree, + intermediate_graph_degree=intermediate_graph_degree, ) # Build index @@ -547,7 +623,12 @@ def test_mg_cagra_simple(): @requires_multiple_gpus def test_mg_cagra_integration(): """Integration test covering build, search, and serialization.""" - n_rows, n_cols = 2000, 8 + graph_degree = 32 + intermediate_graph_degree = 64 + n_rows = _n_rows_for_distribution( + 2000, "sharded", graph_degree, intermediate_graph_degree + ) + n_cols = 8 k = 5 # Generate initial dataset @@ -560,8 +641,8 @@ def test_mg_cagra_integration(): build_params = mg_cagra.IndexParams( distribution_mode="sharded", metric="sqeuclidean", - graph_degree=32, - intermediate_graph_degree=64, + graph_degree=graph_degree, + intermediate_graph_degree=intermediate_graph_degree, ) index = mg_cagra.build(build_params, dataset, resources=resources) diff --git a/python/cuvs/cuvs/tests/test_mg_ivf_flat.py b/python/cuvs/cuvs/tests/test_mg_ivf_flat.py index 99dff4e221..04b2ee0fcd 100644 --- a/python/cuvs/cuvs/tests/test_mg_ivf_flat.py +++ b/python/cuvs/cuvs/tests/test_mg_ivf_flat.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # @@ -15,15 +15,66 @@ from cuvs.tests.ann_utils import calc_recall, generate_data -# Check if multi-GPU functionality is available -def has_multiple_gpus(): - """Check if system has multiple GPUs available.""" +MIN_ROWS_PER_SHARDED_LIST = 20 + + +def get_gpu_count(): + """Return the number of visible CUDA devices.""" try: import cupy as cp - return cp.cuda.runtime.getDeviceCount() > 1 + return cp.cuda.runtime.getDeviceCount() except Exception: - return False + return 0 + + +# Check if multi-GPU functionality is available +def has_multiple_gpus(): + """Check if system has multiple GPUs available.""" + return get_gpu_count() > 1 + + +def _n_rows_for_distribution(n_rows, n_lists, distribution_mode): + """Keep sharded IVF indexes large enough as GPU count increases.""" + if distribution_mode != "sharded": + return n_rows + + min_rows = max(1, get_gpu_count()) * n_lists * MIN_ROWS_PER_SHARDED_LIST + return max(n_rows, min_rows) + + +def _default_n_probes(n_lists, compare): + if compare: + return n_lists + + return min(n_lists, max(20, (n_lists * 3) // 4)) + + +def _sharded_append_indices(n_existing_rows, n_new_rows): + """Create local IDs for appending to each shard of a sharded index.""" + n_gpus = max(1, get_gpu_count()) + existing_rows_per_shard = (n_existing_rows + n_gpus - 1) // n_gpus + new_rows_per_shard = (n_new_rows + n_gpus - 1) // n_gpus + indices = np.empty(n_new_rows, dtype=np.int64) + + for rank in range(n_gpus): + new_offset = rank * new_rows_per_shard + n_rank_new_rows = min(new_rows_per_shard, n_new_rows - new_offset) + if n_rank_new_rows <= 0: + continue + + existing_offset = rank * existing_rows_per_shard + n_rank_existing_rows = max( + 0, + min(existing_rows_per_shard, n_existing_rows - existing_offset), + ) + indices[new_offset : new_offset + n_rank_new_rows] = np.arange( + n_rank_existing_rows, + n_rank_existing_rows + n_rank_new_rows, + dtype=np.int64, + ) + + return indices # Mark tests that require multiple GPUs @@ -47,6 +98,7 @@ def run_mg_ivf_flat_build_search_test( add_data_on_build=True, search_params=None, n_lists=None, + min_recall=0.8, ): """ Run a multi-GPU IVF-Flat build and search test. @@ -54,6 +106,15 @@ def run_mg_ivf_flat_build_search_test( Note: Multi-GPU IVF-Flat requires host memory arrays (NumPy), not device arrays. """ + # Build parameters - use fewer clusters for better recall + # with smaller datasets. + if n_lists is None: + # Use fewer clusters for smaller datasets to ensure enough points + # per cluster. + n_lists = min(1024, max(64, n_rows // 50)) + + n_rows = _n_rows_for_distribution(n_rows, n_lists, distribution_mode) + # Generate host memory arrays (NumPy) dataset = generate_data((n_rows, n_cols), dtype) if metric == "inner_product": @@ -66,13 +127,6 @@ def run_mg_ivf_flat_build_search_test( # Multi-GPU resources resources = MultiGpuResources() - # Build parameters - use fewer clusters for better recall - # with smaller datasets - if n_lists is None: - # Use fewer clusters for smaller datasets to ensure enough points - # per cluster - n_lists = min(1024, max(64, n_rows // 50)) - build_params = mg_ivf_flat.IndexParams( metric=metric, distribution_mode=distribution_mode, @@ -86,21 +140,12 @@ def run_mg_ivf_flat_build_search_test( # If not adding data on build, extend the index if not add_data_on_build: - dataset_1 = dataset[: n_rows // 2, :] - dataset_2 = dataset[n_rows // 2 :, :] - indices_1 = np.arange(n_rows // 2, dtype=np.int64) - indices_2 = np.arange(n_rows // 2, n_rows, dtype=np.int64) - - mg_ivf_flat.extend(index, dataset_1, indices_1, resources=resources) - mg_ivf_flat.extend(index, dataset_2, indices_2, resources=resources) + mg_ivf_flat.extend(index, dataset, resources=resources) # Search parameters - if search_params is None: - search_params = {} - # Use higher n_probes for better recall in multi-GPU setting + search_params = dict(search_params or {}) if "n_probes" not in search_params: - # Use many clusters for good recall - search majority of clusters - search_params["n_probes"] = min(n_lists, max(20, (n_lists * 3) // 4)) + search_params["n_probes"] = _default_n_probes(n_lists, compare) search_params_obj = mg_ivf_flat.SearchParams( search_mode=search_mode, merge_mode=merge_mode, @@ -122,6 +167,9 @@ def run_mg_ivf_flat_build_search_test( assert isinstance(neighbors, np.ndarray) assert distances.shape == (n_queries, k) assert neighbors.shape == (n_queries, k) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) if not compare: return distances, neighbors @@ -141,12 +189,14 @@ def run_mg_ivf_flat_build_search_test( skl_idx = nn_skl.kneighbors(queries, return_distance=False) recall = calc_recall(neighbors, skl_idx) - # Multi-GPU implementation may have lower recall due to data distribution - # across GPUs - # This is acceptable as long as the functionality works correctly - assert recall > 0.3, ( - f"Recall too low: {recall:.3f} (n_lists={n_lists}, " - f"n_probes={search_params.get('n_probes', 'default')})" + # Full-probe IVF-Flat should be close to exact search. Keep the threshold + # below 1.0 to allow for tie-breaking and lower precision dtypes. + assert recall >= min_recall, ( + f"Recall too low: {recall:.3f} (min_recall={min_recall}, " + f"n_rows={n_rows}, n_lists={n_lists}, " + f"n_probes={search_params.get('n_probes', 'default')}, " + f"distribution_mode={distribution_mode}, " + f"num_gpus={get_gpu_count()})" ) return distances, neighbors @@ -221,18 +271,45 @@ def test_mg_ivf_flat_distribution_modes(distribution_mode): @requires_multiple_gpus -@pytest.mark.parametrize("search_mode", ["load_balancer", "round_robin"]) -@pytest.mark.parametrize("merge_mode", ["merge_on_root_rank", "tree_merge"]) -def test_mg_ivf_flat_search_params(search_mode, merge_mode): - """Test different multi-GPU search parameters.""" +@pytest.mark.parametrize("distribution_mode", ["sharded", "replicated"]) +def test_mg_ivf_flat_partial_probes(distribution_mode): + """Test approximate search with partial probes in both distribution modes.""" + n_lists = 30 run_mg_ivf_flat_build_search_test( n_rows=1500, n_cols=8, n_queries=15, k=5, + distribution_mode=distribution_mode, + search_params={"n_probes": n_lists // 2}, + n_lists=n_lists, + compare=False, + ) + + +@requires_multiple_gpus +@pytest.mark.parametrize( + "distribution_mode,search_mode,merge_mode,n_rows_per_batch", + [ + ("replicated", "load_balancer", "tree_merge", 500), + ("replicated", "round_robin", "tree_merge", 2000), + ("sharded", "load_balancer", "merge_on_root_rank", 500), + ("sharded", "load_balancer", "tree_merge", 500), + ], +) +def test_mg_ivf_flat_search_params( + distribution_mode, search_mode, merge_mode, n_rows_per_batch +): + """Test the relevant replicated and sharded search parameters.""" + run_mg_ivf_flat_build_search_test( + n_rows=1500, + n_cols=8, + n_queries=15, + k=5, + distribution_mode=distribution_mode, search_mode=search_mode, merge_mode=merge_mode, - n_rows_per_batch=500, + n_rows_per_batch=n_rows_per_batch, n_lists=30, ) @@ -252,13 +329,15 @@ def test_mg_ivf_flat_metrics(metric): @requires_multiple_gpus -def test_mg_ivf_flat_extend(): +@pytest.mark.parametrize("distribution_mode", ["sharded", "replicated"]) +def test_mg_ivf_flat_extend(distribution_mode): """Test extending multi-GPU IVF-Flat index with new vectors.""" run_mg_ivf_flat_build_search_test( n_rows=1500, n_cols=8, n_queries=15, k=5, + distribution_mode=distribution_mode, add_data_on_build=False, n_lists=30, ) @@ -267,7 +346,9 @@ def test_mg_ivf_flat_extend(): @requires_multiple_gpus def test_mg_ivf_flat_serialize(): """Test save/load functionality for multi-GPU IVF-Flat.""" - n_rows, n_cols = 2000, 8 + n_lists = 50 + n_rows = _n_rows_for_distribution(2000, n_lists, "sharded") + n_cols = 8 k = 5 # Generate data @@ -277,13 +358,13 @@ def test_mg_ivf_flat_serialize(): resources = MultiGpuResources() # Build original index - build_params = mg_ivf_flat.IndexParams(n_lists=50) + build_params = mg_ivf_flat.IndexParams(n_lists=n_lists) original_index = mg_ivf_flat.build( build_params, dataset, resources=resources ) # Search with original index - search_params = mg_ivf_flat.SearchParams(n_probes=37) + search_params = mg_ivf_flat.SearchParams(n_probes=n_lists) orig_distances, orig_neighbors = mg_ivf_flat.search( search_params, original_index, queries, k, resources=resources ) @@ -385,14 +466,16 @@ def test_memory_location_validation(): except ImportError: pytest.skip("CuPy not available for memory location tests") - n_rows, n_cols = 1500, 8 + n_lists = 30 + n_rows = _n_rows_for_distribution(1500, n_lists, "sharded") + n_cols = 8 # Create host and device arrays host_data = generate_data((n_rows, n_cols), np.float32) device_data = cp.asarray(host_data) resources = MultiGpuResources() - build_params = mg_ivf_flat.IndexParams(n_lists=30) + build_params = mg_ivf_flat.IndexParams(n_lists=n_lists) # Test that device arrays are rejected for build with pytest.raises(ValueError, match="host memory"): @@ -404,7 +487,7 @@ def test_memory_location_validation(): # Test that device arrays are rejected for search queries = generate_data((20, n_cols), np.float32) device_queries = cp.asarray(queries) - search_params = mg_ivf_flat.SearchParams(n_probes=22) + search_params = mg_ivf_flat.SearchParams(n_probes=n_lists) with pytest.raises(ValueError, match="host memory"): mg_ivf_flat.search( @@ -482,7 +565,9 @@ def test_untrained_index_error(): @requires_multiple_gpus def test_mg_ivf_flat_with_prealloc_output(): """Test multi-GPU IVF-Flat search with pre-allocated output arrays.""" - n_rows, n_cols = 1500, 8 # Ensure n_rows > n_lists + n_lists = 30 + n_rows = _n_rows_for_distribution(1500, n_lists, "sharded") + n_cols = 8 n_queries = 20 k = 5 @@ -493,7 +578,7 @@ def test_mg_ivf_flat_with_prealloc_output(): resources = MultiGpuResources() # Build index with fewer clusters to avoid n_rows < n_lists error - build_params = mg_ivf_flat.IndexParams(n_lists=30) + build_params = mg_ivf_flat.IndexParams(n_lists=n_lists) index = mg_ivf_flat.build(build_params, dataset, resources=resources) # Pre-allocate output arrays in host memory @@ -501,7 +586,7 @@ def test_mg_ivf_flat_with_prealloc_output(): distances = np.empty((n_queries, k), dtype=np.float32) # Search with pre-allocated arrays - search_params = mg_ivf_flat.SearchParams(n_probes=20) + search_params = mg_ivf_flat.SearchParams(n_probes=n_lists) ret_distances, ret_neighbors = mg_ivf_flat.search( search_params, index, @@ -533,7 +618,9 @@ def test_mg_ivf_flat_simple(): pytest.skip("Multi-GPU tests require multiple GPUs") # Use simple test case that should definitely work - n_rows, n_cols = 1000, 8 + n_lists = 32 + n_rows = _n_rows_for_distribution(1000, n_lists, "sharded") + n_cols = 8 n_queries, k = 20, 5 # Generate data @@ -545,7 +632,7 @@ def test_mg_ivf_flat_simple(): # Use very few clusters for high recall build_params = mg_ivf_flat.IndexParams( metric="sqeuclidean", - n_lists=32, # Very few clusters + n_lists=n_lists, # Very few clusters ) # Build index @@ -553,7 +640,7 @@ def test_mg_ivf_flat_simple(): # Search with many probes for maximum recall search_params = mg_ivf_flat.SearchParams( - n_probes=32 + n_probes=n_lists ) # Search all clusters distances, neighbors = mg_ivf_flat.search( search_params, index, queries, k, resources=resources @@ -581,7 +668,9 @@ def test_mg_ivf_flat_simple(): @requires_multiple_gpus def test_mg_ivf_flat_integration(): """Integration test covering build, search, extend, and serialization.""" - n_rows, n_cols = 2000, 8 + n_lists = 50 + n_rows = _n_rows_for_distribution(2000, n_lists, "sharded") + n_cols = 8 k = 5 # Generate initial dataset @@ -592,13 +681,13 @@ def test_mg_ivf_flat_integration(): # Build initial index build_params = mg_ivf_flat.IndexParams( - distribution_mode="sharded", metric="sqeuclidean", n_lists=50 + distribution_mode="sharded", metric="sqeuclidean", n_lists=n_lists ) index = mg_ivf_flat.build(build_params, dataset, resources=resources) # Initial search search_params = mg_ivf_flat.SearchParams( - n_probes=37, + n_probes=n_lists, search_mode="load_balancer", merge_mode="merge_on_root_rank", ) @@ -608,8 +697,7 @@ def test_mg_ivf_flat_integration(): # Extend index with new vectors new_vectors = generate_data((200, n_cols), np.float32) - # Provide indices for extend operation on non-empty index - new_indices = np.arange(n_rows, n_rows + 200, dtype=np.int64) + new_indices = _sharded_append_indices(n_rows, new_vectors.shape[0]) mg_ivf_flat.extend(index, new_vectors, new_indices, resources=resources) # Search after extend diff --git a/python/cuvs/cuvs/tests/test_mg_ivf_pq.py b/python/cuvs/cuvs/tests/test_mg_ivf_pq.py index 6c6cf8415b..9d5c22da09 100644 --- a/python/cuvs/cuvs/tests/test_mg_ivf_pq.py +++ b/python/cuvs/cuvs/tests/test_mg_ivf_pq.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # @@ -15,15 +15,67 @@ from cuvs.tests.ann_utils import calc_recall, generate_data -# Check if multi-GPU functionality is available -def has_multiple_gpus(): - """Check if system has multiple GPUs available.""" +MIN_IVF_PQ_LISTS = 20 +MIN_ROWS_PER_SHARDED_LIST = 10 + + +def get_gpu_count(): + """Return the number of visible CUDA devices.""" try: import cupy as cp - return cp.cuda.runtime.getDeviceCount() > 1 + return cp.cuda.runtime.getDeviceCount() except Exception: - return False + return 0 + + +# Check if multi-GPU functionality is available +def has_multiple_gpus(): + """Check if system has multiple GPUs available.""" + return get_gpu_count() > 1 + + +def _n_rows_for_distribution(n_rows, n_lists, distribution_mode): + """Keep sharded IVF indexes large enough as GPU count increases.""" + if distribution_mode != "sharded": + return n_rows + + min_rows = max(1, get_gpu_count()) * n_lists * MIN_ROWS_PER_SHARDED_LIST + return max(n_rows, min_rows) + + +def _default_n_probes(n_lists, compare): + if compare: + return n_lists + + return min(n_lists, max(20, (n_lists * 3) // 4)) + + +def _sharded_append_indices(n_existing_rows, n_new_rows): + """Create local IDs for appending to each shard of a sharded index.""" + n_gpus = max(1, get_gpu_count()) + existing_rows_per_shard = (n_existing_rows + n_gpus - 1) // n_gpus + new_rows_per_shard = (n_new_rows + n_gpus - 1) // n_gpus + indices = np.empty(n_new_rows, dtype=np.int64) + + for rank in range(n_gpus): + new_offset = rank * new_rows_per_shard + n_rank_new_rows = min(new_rows_per_shard, n_new_rows - new_offset) + if n_rank_new_rows <= 0: + continue + + existing_offset = rank * existing_rows_per_shard + n_rank_existing_rows = max( + 0, + min(existing_rows_per_shard, n_existing_rows - existing_offset), + ) + indices[new_offset : new_offset + n_rank_new_rows] = np.arange( + n_rank_existing_rows, + n_rank_existing_rows + n_rank_new_rows, + dtype=np.int64, + ) + + return indices # Mark tests that require multiple GPUs @@ -57,6 +109,15 @@ def run_mg_ivf_pq_build_search_test( Note: Multi-GPU IVF-PQ requires host memory arrays (NumPy), not device arrays. """ + # Build parameters - use fewer clusters for better recall with smaller + # datasets. + if n_lists is None: + # Keep helper-driven smoke tests cheap while avoiding very sparse + # sharded IVF lists. + n_lists = min(1024, max(MIN_IVF_PQ_LISTS, n_rows // 50)) + + n_rows = _n_rows_for_distribution(n_rows, n_lists, distribution_mode) + # Generate host memory arrays (NumPy) dataset = generate_data((n_rows, n_cols), dtype) if metric == "inner_product": @@ -69,13 +130,6 @@ def run_mg_ivf_pq_build_search_test( # Multi-GPU resources resources = MultiGpuResources() - # Build parameters - use fewer clusters for better recall with smaller - # datasets - if n_lists is None: - # Use fewer clusters for smaller datasets to ensure enough points per - # cluster - n_lists = min(1024, max(64, n_rows // 50)) - build_params = mg_ivf_pq.IndexParams( metric=metric, distribution_mode=distribution_mode, @@ -92,21 +146,12 @@ def run_mg_ivf_pq_build_search_test( # If not adding data on build, extend the index if not add_data_on_build: - dataset_1 = dataset[: n_rows // 2, :] - dataset_2 = dataset[n_rows // 2 :, :] - indices_1 = np.arange(n_rows // 2, dtype=np.int64) - indices_2 = np.arange(n_rows // 2, n_rows, dtype=np.int64) - - mg_ivf_pq.extend(index, dataset_1, indices_1, resources=resources) - mg_ivf_pq.extend(index, dataset_2, indices_2, resources=resources) + mg_ivf_pq.extend(index, dataset, resources=resources) # Search parameters - if search_params is None: - search_params = {} - # Use higher n_probes for better recall in multi-GPU setting + search_params = dict(search_params or {}) if "n_probes" not in search_params: - # Use many clusters for good recall - search majority of clusters - search_params["n_probes"] = min(n_lists, max(20, (n_lists * 3) // 4)) + search_params["n_probes"] = _default_n_probes(n_lists, compare) search_params_obj = mg_ivf_pq.SearchParams( search_mode=search_mode, merge_mode=merge_mode, @@ -128,6 +173,9 @@ def run_mg_ivf_pq_build_search_test( assert isinstance(neighbors, np.ndarray) assert distances.shape == (n_queries, k) assert neighbors.shape == (n_queries, k) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) if not compare: return distances, neighbors @@ -224,18 +272,45 @@ def test_mg_ivf_pq_distribution_modes(distribution_mode): @requires_multiple_gpus -@pytest.mark.parametrize("search_mode", ["load_balancer", "round_robin"]) -@pytest.mark.parametrize("merge_mode", ["merge_on_root_rank", "tree_merge"]) -def test_mg_ivf_pq_search_params(search_mode, merge_mode): - """Test different multi-GPU search parameters for IVF-PQ.""" +@pytest.mark.parametrize("distribution_mode", ["sharded", "replicated"]) +def test_mg_ivf_pq_partial_probes(distribution_mode): + """Test approximate search with partial probes in both distribution modes.""" + n_lists = 30 run_mg_ivf_pq_build_search_test( n_rows=1500, n_cols=32, n_queries=15, k=5, + distribution_mode=distribution_mode, + search_params={"n_probes": n_lists // 2}, + n_lists=n_lists, + compare=False, + ) + + +@requires_multiple_gpus +@pytest.mark.parametrize( + "distribution_mode,search_mode,merge_mode,n_rows_per_batch", + [ + ("replicated", "load_balancer", "tree_merge", 500), + ("replicated", "round_robin", "tree_merge", 2000), + ("sharded", "load_balancer", "merge_on_root_rank", 500), + ("sharded", "load_balancer", "tree_merge", 500), + ], +) +def test_mg_ivf_pq_search_params( + distribution_mode, search_mode, merge_mode, n_rows_per_batch +): + """Test the relevant replicated and sharded search parameters for IVF-PQ.""" + run_mg_ivf_pq_build_search_test( + n_rows=1500, + n_cols=32, + n_queries=15, + k=5, + distribution_mode=distribution_mode, search_mode=search_mode, merge_mode=merge_mode, - n_rows_per_batch=500, + n_rows_per_batch=n_rows_per_batch, n_lists=30, compare=False, ) @@ -275,13 +350,15 @@ def test_mg_ivf_pq_metrics(metric): @requires_multiple_gpus -def test_mg_ivf_pq_extend(): +@pytest.mark.parametrize("distribution_mode", ["sharded", "replicated"]) +def test_mg_ivf_pq_extend(distribution_mode): """Test extending index with new vectors.""" run_mg_ivf_pq_build_search_test( n_rows=1000, n_cols=32, n_queries=100, k=10, + distribution_mode=distribution_mode, add_data_on_build=False, # This triggers extend functionality compare=False, ) @@ -291,7 +368,9 @@ def test_mg_ivf_pq_extend(): def test_mg_ivf_pq_serialize(): """Test serialization and deserialization.""" # Generate data - n_rows, n_cols = 1000, 32 + n_lists = 50 + n_rows = _n_rows_for_distribution(1000, n_lists, "sharded") + n_cols = 32 dataset = generate_data((n_rows, n_cols), np.float32) queries = generate_data((100, n_cols), np.float32) @@ -300,14 +379,14 @@ def test_mg_ivf_pq_serialize(): # Build index build_params = mg_ivf_pq.IndexParams( metric="euclidean", - n_lists=100, + n_lists=n_lists, pq_bits=8, pq_dim=16, ) index = mg_ivf_pq.build(build_params, dataset, resources=resources) # Search before serialization - search_params = mg_ivf_pq.SearchParams(n_probes=50) + search_params = mg_ivf_pq.SearchParams(n_probes=n_lists) distances_1, neighbors_1 = mg_ivf_pq.search( search_params, index, queries, 10, resources=resources ) @@ -398,6 +477,9 @@ def test_mg_ivf_pq_distribute(): # Verify results shape assert distances.shape == (100, k) assert neighbors.shape == (100, k) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) finally: if os.path.exists(temp_filename): @@ -411,14 +493,15 @@ def test_memory_location_validation(): except ImportError: pytest.skip("CuPy not available") + n_lists = 20 + n_rows = _n_rows_for_distribution(1000, n_lists, "sharded") + # Generate device arrays (should fail) - use enough data points for n_lists - dataset_gpu = cp.random.random((1000, 32), dtype=cp.float32) + dataset_gpu = cp.random.random((n_rows, 32), dtype=cp.float32) queries_gpu = cp.random.random((100, 32), dtype=cp.float32) - # Create parameters with smaller n_lists for the small dataset - build_params = mg_ivf_pq.IndexParams( - n_lists=20 - ) # Smaller n_lists for 1000 points + # Create parameters with smaller n_lists for the validation dataset + build_params = mg_ivf_pq.IndexParams(n_lists=n_lists) search_params = mg_ivf_pq.SearchParams() # These should raise ValueError about memory location @@ -510,7 +593,9 @@ def test_untrained_index_error(): @requires_multiple_gpus def test_mg_ivf_pq_with_prealloc_output(): """Test multi-GPU IVF-PQ search with pre-allocated output arrays.""" - n_rows, n_cols = 1500, 32 # Ensure n_rows > n_lists + n_lists = 30 + n_rows = _n_rows_for_distribution(1500, n_lists, "sharded") + n_cols = 32 n_queries = 20 k = 5 @@ -521,7 +606,7 @@ def test_mg_ivf_pq_with_prealloc_output(): resources = MultiGpuResources() # Build index with fewer clusters to avoid n_rows < n_lists error - build_params = mg_ivf_pq.IndexParams(n_lists=30, pq_bits=8, pq_dim=16) + build_params = mg_ivf_pq.IndexParams(n_lists=n_lists, pq_bits=8, pq_dim=16) index = mg_ivf_pq.build(build_params, dataset, resources=resources) # Pre-allocate output arrays in host memory @@ -529,7 +614,7 @@ def test_mg_ivf_pq_with_prealloc_output(): distances = np.empty((n_queries, k), dtype=np.float32) # Search with pre-allocated arrays - search_params = mg_ivf_pq.SearchParams(n_probes=20) + search_params = mg_ivf_pq.SearchParams(n_probes=n_lists) ret_distances, ret_neighbors = mg_ivf_pq.search( search_params, index, @@ -545,6 +630,9 @@ def test_mg_ivf_pq_with_prealloc_output(): assert ret_neighbors is neighbors assert distances.shape == (n_queries, k) assert neighbors.shape == (n_queries, k) + assert np.all(neighbors >= 0) + assert np.all(neighbors < n_rows) + assert np.all(np.isfinite(distances)) def test_index_repr(): @@ -561,7 +649,9 @@ def test_mg_ivf_pq_simple(): pytest.skip("Multi-GPU tests require multiple GPUs") # Use simple test case that should definitely work - n_rows, n_cols = 1000, 32 + n_lists = 32 + n_rows = _n_rows_for_distribution(1000, n_lists, "sharded") + n_cols = 32 n_queries, k = 20, 5 # Generate data @@ -573,7 +663,7 @@ def test_mg_ivf_pq_simple(): # Use very few clusters for high recall build_params = mg_ivf_pq.IndexParams( metric="sqeuclidean", - n_lists=32, # Very few clusters + n_lists=n_lists, # Very few clusters pq_bits=8, pq_dim=16, ) @@ -582,7 +672,9 @@ def test_mg_ivf_pq_simple(): index = mg_ivf_pq.build(build_params, dataset, resources=resources) # Search with many probes for maximum recall - search_params = mg_ivf_pq.SearchParams(n_probes=32) # Search all clusters + search_params = mg_ivf_pq.SearchParams( + n_probes=n_lists + ) # Search all clusters distances, neighbors = mg_ivf_pq.search( search_params, index, queries, k, resources=resources ) @@ -609,7 +701,9 @@ def test_mg_ivf_pq_simple(): @requires_multiple_gpus def test_mg_ivf_pq_integration(): """Integration test covering build, search, extend, and serialization.""" - n_rows, n_cols = 2000, 32 + n_lists = 50 + n_rows = _n_rows_for_distribution(2000, n_lists, "sharded") + n_cols = 32 k = 5 # Generate initial dataset @@ -622,7 +716,7 @@ def test_mg_ivf_pq_integration(): build_params = mg_ivf_pq.IndexParams( distribution_mode="sharded", metric="sqeuclidean", - n_lists=50, + n_lists=n_lists, pq_bits=8, pq_dim=16, ) @@ -630,7 +724,7 @@ def test_mg_ivf_pq_integration(): # Initial search search_params = mg_ivf_pq.SearchParams( - n_probes=37, + n_probes=n_lists, search_mode="load_balancer", merge_mode="merge_on_root_rank", ) @@ -640,8 +734,7 @@ def test_mg_ivf_pq_integration(): # Extend index with new vectors new_vectors = generate_data((200, n_cols), np.float32) - # Provide indices for extend operation on non-empty index - new_indices = np.arange(n_rows, n_rows + 200, dtype=np.int64) + new_indices = _sharded_append_indices(n_rows, new_vectors.shape[0]) mg_ivf_pq.extend(index, new_vectors, new_indices, resources=resources) # Search after extend