From d45b8f51a65c0a11f0744e723d18cbb6c20ac6fd Mon Sep 17 00:00:00 2001 From: Lucas Parzianello Date: Mon, 1 Jun 2026 17:59:41 -0400 Subject: [PATCH 1/3] gwy: opsearch queries in bulk for capture listing --- .../api_methods/helpers/search_captures.py | 16 +- gateway/sds_gateway/api_methods/models.py | 91 +++- .../serializers/capture_serializers.py | 14 +- .../tests/test_capture_endpoints.py | 501 ++++++++++++++++++ .../api_methods/views/capture_endpoints.py | 23 +- gateway/sds_gateway/users/views/captures.py | 14 +- gateway/sds_gateway/users/views_deprecated.py | 16 +- 7 files changed, 660 insertions(+), 15 deletions(-) diff --git a/gateway/sds_gateway/api_methods/helpers/search_captures.py b/gateway/sds_gateway/api_methods/helpers/search_captures.py index 0d3e604b..e1ef4379 100644 --- a/gateway/sds_gateway/api_methods/helpers/search_captures.py +++ b/gateway/sds_gateway/api_methods/helpers/search_captures.py @@ -348,25 +348,37 @@ def _build_os_query_for_captures( # Need to paginate/limit OpenSearch results list before grouping # and then paginate/limit the grouped captures def get_composite_captures( - captures: QuerySet[Capture], request: Request | None = None + captures: QuerySet[Capture], + request: Request | None = None, + bulk_metadata: dict[str, dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: """Get captures as composite objects, grouping multi-channel captures. Args: captures: QuerySet of Capture objects request: Optional Django REST framework request for serializer context + bulk_metadata: Optional pre-loaded OpenSearch metadata mapping + ``uuid_str → metadata_dict``. When provided, the + serialization path populates related capture + instances' internal cache so ``get_opensearch_metadata()`` + returns without additional round-trips. Returns: list: List of composite capture data """ grouped_captures = group_captures_by_top_level_dir(captures) composite_captures = [] - context = {"request": request} if request else {} + context: dict[str, Any] = {"request": request} if request else {} + if bulk_metadata is not None: + context["bulk_metadata"] = bulk_metadata for capture_list in grouped_captures.values(): if len(capture_list) > 1: # Multiple captures with same top_level_dir - create composite composite_data = build_composite_capture_data(capture_list) + # Inject original instances so CompositeCaptureSerializer finds them + # with tier-1 cache already populated, avoiding extra DB query. + composite_data["_captures_by_uuid"] = {str(c.uuid): c for c in capture_list} composite_captures.append(composite_data) else: # Single capture - serialize normally diff --git a/gateway/sds_gateway/api_methods/models.py b/gateway/sds_gateway/api_methods/models.py index 608ca39c..07c4b9dc 100644 --- a/gateway/sds_gateway/api_methods/models.py +++ b/gateway/sds_gateway/api_methods/models.py @@ -3,6 +3,7 @@ import datetime import json import logging +import threading import uuid from enum import StrEnum from pathlib import Path @@ -12,6 +13,7 @@ from blake3 import blake3 as Blake3 # noqa: N812 from django.conf import settings +from django.core.signals import request_started from django.db import models from django.db.models import Count from django.db.models import ProtectedError @@ -30,6 +32,22 @@ log = logging.getLogger(__name__) +# Thread-local storage for request-scoped OpenSearch metadata cache. +# Used as a safety net in get_opensearch_metadata() when the per-instance +# cache (_opensearch_metadata_cache) is missing (e.g. on fresh model +# instances created by get_capture()'s list(related_captures)). +_request_cache = threading.local() + + +def _clear_request_cache(sender, **kwargs): + """Clear thread-local OpenSearch metadata cache at start of each request.""" + for attr in ("opensearch_metadata",): + if hasattr(_request_cache, attr): + delattr(_request_cache, attr) + + +request_started.connect(_clear_request_cache, weak=False) + DRF_RF_FILENAME_REGEX_STR = r"^rf@\d+\.\d+\.h5$" @@ -455,13 +473,14 @@ def get_capture(self) -> dict[str, Any]: is_deleted=False, ).order_by("channel") - if related_captures.count() <= 1: + capture_list = list(related_captures) + if len(capture_list) <= 1: # Only one capture found, return as single capture return {"capture": self, "is_composite": False} # Multiple captures found, create composite return { - "captures": list(related_captures), + "captures": capture_list, "is_composite": True, "top_level_dir": self.top_level_dir, "capture_type": self.capture_type, @@ -571,8 +590,23 @@ def get_opensearch_metadata(self) -> dict[str, Any]: dict: Frequency metadata (center_frequency, sample_rate, etc.) """ if hasattr(self, "_opensearch_metadata_cache"): + log.trace("meta_cache HIT for %s", self.uuid) return self._opensearch_metadata_cache + # Fallback: thread-local cache (populated by set_bulk_metadata_cache). + # Catches fresh model instances (e.g. from get_capture() → + # list(related_captures)) that don't share the original queryset + # instance identity. + tl = getattr(_request_cache, "opensearch_metadata", None) + if tl is not None: + cached = tl.get(str(self.uuid)) + if cached is not None: + log.trace("meta_cache thread-local HIT for %s", self.uuid) + self._opensearch_metadata_cache = cached + return cached + + log.trace("meta_cache MISS for %s", self.uuid) + result: dict[str, Any] = {} try: client = get_opensearch_client() @@ -591,7 +625,7 @@ def get_opensearch_metadata(self) -> dict[str, Any]: # It's already a string value index_name = f"captures-{self.capture_type}" - log.info( + log.debug( "Querying OpenSearch index '%s' for capture %s", index_name, self.uuid ) @@ -617,7 +651,7 @@ def _extract_metadata_from_source(self, source: dict[str, Any]) -> dict[str, Any """Extract frequency metadata from OpenSearch source data.""" search_props = source.get("search_props", {}) - log.info("OpenSearch data for %s: search_props=%s", self.uuid, search_props) + log.debug("OpenSearch data for %s: search_props=%s", self.uuid, search_props) # Try search_props first (preferred) center_frequency = search_props.get("center_frequency") @@ -710,6 +744,48 @@ def bulk_load_frequency_metadata( else: return frequency_data + @classmethod + def set_bulk_metadata_cache( + cls, captures: list["Capture"], metadata: dict[str, dict[str, Any]] + ) -> None: + """Attach bulk-loaded metadata to each capture instance. + + Sets ``_opensearch_metadata_cache`` on each capture so that + ``get_opensearch_metadata()`` returns early without an additional + OpenSearch round-trip. + + Args: + captures: List of Capture instances to attach metadata to. + metadata: Mapping from ``uuid_str`` to metadata dict + (as returned by ``bulk_load_frequency_metadata``). + """ + # Clear any stale thread-local data before setting fresh data + if hasattr(_request_cache, "opensearch_metadata"): + del _request_cache.opensearch_metadata + + loaded = 0 + missing = 0 + for capture in captures: + cache_key = str(capture.uuid) + if cache_key in metadata: + capture._opensearch_metadata_cache = metadata[cache_key] # noqa: SLF001 + loaded += 1 + else: + missing += 1 + capture._opensearch_metadata_cache = {} # noqa: SLF001 + + # Also store in thread-local cache so fresh instances (e.g. from + # get_capture() → list(related_captures)) can still find their + # metadata without an individual round-trip. + _request_cache.opensearch_metadata = metadata + + log.debug( + "set_bulk_metadata_cache: loaded=%d, missing=%d, total=%d", + loaded, + missing, + len(captures), + ) + def debug_opensearch_response(self) -> dict[str, Any] | None: """ Debug method to see exactly what OpenSearch returns for this capture. @@ -1571,7 +1647,7 @@ def _extract_drf_sample_rate(capture_props: dict[str, Any]) -> float | None: if numerator and denominator and denominator != 0: # try sample_rate_numerator/denominator first sample_rate = numerator / denominator - log.info( + log.debug( "Calculated DRF sample_rate: %s/%s = %s", numerator, denominator, @@ -1580,7 +1656,7 @@ def _extract_drf_sample_rate(capture_props: dict[str, Any]) -> float | None: elif capture_props.get("samples_per_second"): # fallback to samples_per_second if numerator/denominator missing sample_rate = capture_props.get("samples_per_second") - log.info("Using DRF samples_per_second: %s", sample_rate) + log.debug("Using DRF samples_per_second: %s", sample_rate) return sample_rate @@ -1686,6 +1762,9 @@ def _extract_bulk_frequency_data( "sample_rate": sample_rate, "frequency_min": search_props.get("frequency_min"), "frequency_max": search_props.get("frequency_max"), + "start_time": search_props.get("start_time"), + "end_time": search_props.get("end_time"), + "file_cadence": None, # requires per-capture file count; skip in bulk } diff --git a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py index 4235bff0..fd5c4cb4 100644 --- a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py +++ b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py @@ -7,6 +7,7 @@ from django.utils import timezone as django_timezone from drf_spectacular.utils import extend_schema_field +from loguru import logger as log from rest_framework import serializers from rest_framework.utils.serializer_helpers import ReturnList @@ -862,8 +863,19 @@ def serialize_capture_or_composite( if capture_data["is_composite"]: # Serialize as composite + captures = capture_data["captures"] # fresh DB instances (no cache yet) + # Populate cache from bulk-loaded metadata to avoid individual + # OpenSearch round-trips for each related capture. + bulk_meta: dict[str, Any] | None = (context or {}).get("bulk_metadata") + if bulk_meta is not None: + Capture.set_bulk_metadata_cache(captures, bulk_meta) + log.debug( + "set_bulk_metadata_cache applied to %d related captures " + "via serialize_capture_or_composite (multi-channel)", + len(captures), + ) composite_data = build_composite_capture_data( - capture_data["captures"], + captures, include_serializer_aux=True, ) serializer = CompositeCaptureSerializer(composite_data, context=context) diff --git a/gateway/sds_gateway/api_methods/tests/test_capture_endpoints.py b/gateway/sds_gateway/api_methods/tests/test_capture_endpoints.py index 8358e70d..405eb93b 100644 --- a/gateway/sds_gateway/api_methods/tests/test_capture_endpoints.py +++ b/gateway/sds_gateway/api_methods/tests/test_capture_endpoints.py @@ -8,7 +8,9 @@ import uuid from pathlib import Path from typing import TYPE_CHECKING +from typing import Any from typing import cast +from unittest.mock import MagicMock from unittest.mock import patch from django.contrib.auth import get_user_model @@ -29,6 +31,13 @@ from sds_gateway.api_methods.models import File from sds_gateway.api_methods.models import ItemType from sds_gateway.api_methods.models import UserSharePermission +from sds_gateway.api_methods.models import _request_cache as test_request_cache +from sds_gateway.api_methods.serializers.capture_serializers import ( + CompositeCaptureSerializer, +) +from sds_gateway.api_methods.serializers.capture_serializers import ( + build_composite_capture_data, +) from sds_gateway.api_methods.tests.factories import UserSharePermissionFactory from sds_gateway.api_methods.utils.metadata_schemas import get_mapping_by_capture_type from sds_gateway.api_methods.utils.opensearch_client import get_opensearch_client @@ -2430,3 +2439,495 @@ def test_list_captures_opensearch_general_error(self, mock_retrieve) -> None: assert "detail" in res_json, "'detail' field missing from failed response" detail = res_json["detail"] assert "opensearch" not in detail.lower(), f"Unexpected detail: '{detail}'" + + +class CaptureBulkMetadataLoadingTests(APITestCase): + """Tests for the bulk OpenSearch metadata loading optimization. + + Verifies that ``set_bulk_metadata_cache`` and + ``bulk_load_frequency_metadata`` correctly replace O(n) individual + round-trips with 2 bulk queries (one per capture type). + """ + + def setUp(self) -> None: + self.user = User.objects.create( + email="bulk-test@example.com", + password="testpass123", # noqa: S106 + is_approved=True, + ) + api_key, key = UserAPIKey.objects.create_key( + name="bulk-test-key", + user=self.user, + ) + self.api_key = cast("AbstractAPIKey", api_key) + self.key = cast("str", key) + self.client.credentials(HTTP_AUTHORIZATION=f"Api-Key: {self.key}") + self.list_url = reverse("api:captures-list") + + def test_set_bulk_metadata_cache_attaches_to_instances(self) -> None: + """set_bulk_metadata_cache populates _opensearch_metadata_cache.""" + cap1 = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-test-dir-1"), + ) + cap2 = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch1", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-test-dir-2"), + ) + + bulk_meta = { + str(cap1.uuid): { + "center_frequency": 1_000_000_000.0, + "sample_rate": 10_000_000, + "start_time": 1_000_000_000, + "end_time": 1_000_000_010, + "file_cadence": None, + }, + str(cap2.uuid): { + "center_frequency": 2_000_000_000.0, + "sample_rate": 20_000_000, + "start_time": 1_000_000_010, + "end_time": 1_000_000_020, + "file_cadence": None, + }, + } + + Capture.set_bulk_metadata_cache([cap1, cap2], bulk_meta) + + assert cap1._opensearch_metadata_cache["center_frequency"] == 1_000_000_000.0 # noqa: PLR2004,SLF001 + assert cap1._opensearch_metadata_cache["start_time"] == 1_000_000_000 # noqa: PLR2004,SLF001 + assert cap2._opensearch_metadata_cache["center_frequency"] == 2_000_000_000.0 # noqa: PLR2004,SLF001 + assert cap2._opensearch_metadata_cache["start_time"] == 1_000_000_010 # noqa: PLR2004,SLF001 + + def test_set_bulk_metadata_cache_handles_missing_uuid(self) -> None: + """Missing UUIDs get empty dict, not KeyError.""" + cap = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-test-dir-3"), + ) + bulk_meta: dict[str, dict[str, Any]] = {} + + Capture.set_bulk_metadata_cache([cap], bulk_meta) + + assert cap._opensearch_metadata_cache == {} # noqa: SLF001 + # Should not raise on subsequent get_opensearch_metadata calls + meta = cap.get_opensearch_metadata() + assert meta == {} + + @patch( + "sds_gateway.api_methods.models.get_opensearch_client", + ) + def test_get_opensearch_metadata_hits_bulk_cache( + self, + mock_get_client, + ) -> None: + """When bulk cache is populated, get_opensearch_metadata skips OpenSearch.""" + cap = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-test-dir-4"), + ) + + # Populate bulk cache + Capture.set_bulk_metadata_cache( + [cap], + { + str(cap.uuid): { + "center_frequency": 3_000_000_000.0, + "sample_rate": 15_000_000, + "start_time": 2_000_000_000, + "end_time": 2_000_000_005, + "file_cadence": None, + }, + }, + ) + + # Now call get_opensearch_metadata - should NOT hit OpenSearch + meta = cap.get_opensearch_metadata() + + assert meta["center_frequency"] == 3_000_000_000.0 # noqa: PLR2004 + assert meta["sample_rate"] == 15_000_000 # noqa: PLR2004 + assert mock_get_client.return_value.search.call_count == 0 + + @patch( + "sds_gateway.api_methods.models.get_opensearch_client", + ) + def test_get_opensearch_metadata_misses_without_cache( + self, + mock_get_client, + ) -> None: + """Without bulk cache, get_opensearch_metadata queries OpenSearch.""" + cap = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-test-dir-5"), + ) + + _ = cap.get_opensearch_metadata() + + assert mock_get_client.return_value.search.call_count == 1 + + @patch( + "sds_gateway.api_methods.serializers.capture_serializers.retrieve_indexed_metadata", + return_value={}, + ) + @patch( + "sds_gateway.api_methods.views.capture_endpoints.Capture.set_bulk_metadata_cache", + new_callable=MagicMock, + ) + @patch( + "sds_gateway.api_methods.views.capture_endpoints.Capture.bulk_load_frequency_metadata", + new_callable=lambda: MagicMock(return_value={}), + ) + def test_list_endpoint_calls_bulk_load_once( + self, + mock_bulk_load, + mock_set_cache, + mock_retrieve, + ) -> None: + """GET /captures/ calls bulk_load_frequency_metadata once, not per capture.""" + # Create 5 DRF captures + for i in range(5): + Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel=f"ch{i}", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir(f"bulk-test-dir-{i}"), + ) + + response = self.client.get(self.list_url) + assert response.status_code == status.HTTP_200_OK + + # bulk_load_frequency_metadata should be called exactly once + mock_bulk_load.assert_called_once() + # set_bulk_metadata_cache should be called exactly once + mock_set_cache.assert_called_once() + + @patch( + "sds_gateway.api_methods.views.capture_endpoints.Capture.set_bulk_metadata_cache", + new_callable=MagicMock, + ) + @patch( + "sds_gateway.api_methods.views.capture_endpoints.Capture.bulk_load_frequency_metadata", + new_callable=lambda: MagicMock(return_value={}), + ) + def test_bulk_load_receives_all_captures( + self, + mock_bulk_load, + mock_set_cache, + ) -> None: + """bulk_load_frequency_metadata receives the full queryset (not just page).""" + for i in range(3): + Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel=f"ch{i}", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir(f"bulk-test-dir-{i}"), + ) + + self.client.get(self.list_url) + + # Verify bulk_load was called with a QuerySet-like object + call_args = mock_bulk_load.call_args + captured_captures = call_args[0][0] # first positional arg + # Should be the full queryset (not paginated) + min_count = 3 + assert len(captured_captures) >= min_count + + @patch( + "sds_gateway.api_methods.models.get_opensearch_client", + ) + def test_thread_local_cache_hits_with_fresh_instance( + self, + mock_get_client, + ) -> None: + """Fresh Capture instances find metadata in thread-local cache.""" + cap = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-thread-local-test"), + ) + + # Populate bulk cache (also sets thread-local cache) + Capture.set_bulk_metadata_cache( + [cap], + { + str(cap.uuid): { + "center_frequency": 3_000_000_000.0, + "sample_rate": 15_000_000, + "start_time": 2_000_000_000, + "end_time": 2_000_000_005, + "file_cadence": None, + }, + }, + ) + + # Get a FRESH instance (no _opensearch_metadata_cache) + fresh_cap = Capture.objects.get(uuid=cap.uuid) + assert not hasattr(fresh_cap, "_opensearch_metadata_cache") + + # Should hit thread-local cache, not OpenSearch + meta = fresh_cap.get_opensearch_metadata() + assert meta["center_frequency"] == 3_000_000_000.0 # noqa: PLR2004 + assert meta["sample_rate"] == 15_000_000 # noqa: PLR2004 + assert mock_get_client.return_value.search.call_count == 0 + + @patch( + "sds_gateway.api_methods.models.get_opensearch_client", + ) + def test_thread_local_cache_cleared_between_requests( + self, + mock_get_client, + ) -> None: + """Thread-local cache is cleared between requests — no stale metadata leak.""" + # Mock OpenSearch to return a known value so we can detect when it IS called + mock_get_client.return_value.search.return_value = { + "hits": { + "total": {"value": 1}, + "hits": [ + { + "_id": "mock", + "_source": {"search_props": {}, "capture_props": {}}, + } + ], + }, + } + + cap_a = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="chA", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-clear-test-a"), + ) + cap_b = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="chB", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-clear-test-b"), + ) + + # Request 1: set bulk cache with metadata for cap_a only + meta_for_a = { + str(cap_a.uuid): { + "center_frequency": 1_000_000_000.0, + "sample_rate": 10_000_000, + "start_time": 1_000_000_000, + "end_time": 1_000_000_010, + "file_cadence": None, + }, + } + Capture.set_bulk_metadata_cache([cap_a, cap_b], meta_for_a) + + # Verify thread-local cache was populated + assert hasattr(test_request_cache, "opensearch_metadata") + assert str(cap_a.uuid) in test_request_cache.opensearch_metadata + + # Simulate request_started signal: clear thread-local cache + if hasattr(test_request_cache, "opensearch_metadata"): + del test_request_cache.opensearch_metadata + + # Verify thread-local cache is gone + assert not hasattr(test_request_cache, "opensearch_metadata") + + # Request 2: get fresh instance of cap_b (no instance cache) + cap_b_fresh = Capture.objects.get(uuid=cap_b.uuid) + assert not hasattr(cap_b_fresh, "_opensearch_metadata_cache") + + # Should fall through to OpenSearch since thread-local is empty + _ = cap_b_fresh.get_opensearch_metadata() + assert mock_get_client.return_value.search.call_count >= 1 + + @patch( + "sds_gateway.api_methods.models.get_opensearch_client", + ) + def test_enriched_channels_uses_bulk_metadata( + self, + mock_get_client, + ) -> None: + """_captures_by_uuid avoids redundant OpenSearch queries.""" + multi_channel_dir = "/bulk-enriched-channels-test" + + # Create multi-channel captures (same top_level_dir) + cap0 = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=multi_channel_dir, + ) + cap1 = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch1", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=multi_channel_dir, + ) + + captures = [cap0, cap1] + + # Set bulk cache with metadata for both captures + bulk_meta = { + str(cap0.uuid): { + "center_frequency": 2_400_000_000.0, + "sample_rate": 20_000_000, + "start_time": 1_000_000_000, + "end_time": 1_000_000_100, + "file_cadence": None, + }, + str(cap1.uuid): { + "center_frequency": 2_400_000_000.0, + "sample_rate": 20_000_000, + "start_time": 1_000_000_100, + "end_time": 1_000_000_200, + "file_cadence": None, + }, + } + Capture.set_bulk_metadata_cache(captures, bulk_meta) + + # Build composite WITHOUT include_serializer_aux (default) + composite_data = build_composite_capture_data(captures) + + # Inject _captures_by_uuid (as done by the Fix B in search_captures.py) + composite_data["_captures_by_uuid"] = {str(c.uuid): c for c in captures} + + # Serialize with CompositeCaptureSerializer + serializer = CompositeCaptureSerializer(composite_data) + result = serializer.data + + # Verify channels present + assert "channels" in result + channel_count = 2 + assert len(result["channels"]) == channel_count + + # Verify metadata enriched per channel + for channel in result["channels"]: + assert channel["capture_start_epoch_sec"] is not None + assert channel["capture_end_epoch_sec"] is not None + assert channel["length_of_capture_ms"] is not None + assert channel["capture_start_iso_utc"] is not None + + # No OpenSearch calls should have been made + assert mock_get_client.return_value.search.call_count == 0 + + +class CaptureListWithBulkMetadataTests(APITestCase): + """Integration tests verifying capture list endpoint works with bulk loading. + + Uses mocking to simulate OpenSearch responses and verify the end-to-end + flow: bulk load -> cache -> serialize. + """ + + def setUp(self) -> None: + self.user = User.objects.create( + email="bulk-int@example.com", + password="testpass123", # noqa: S106 + is_approved=True, + ) + api_key, key = UserAPIKey.objects.create_key( + name="bulk-int-key", + user=self.user, + ) + self.api_key = cast("AbstractAPIKey", api_key) + self.key = cast("str", key) + self.client.credentials(HTTP_AUTHORIZATION=f"Api-Key: {self.key}") + self.list_url = reverse("api:captures-list") + + @patch( + "sds_gateway.api_methods.models.get_opensearch_client", + ) + def test_list_returns_metadata_from_bulk_query( + self, + mock_get_client, + ) -> None: + """Bulk_load_frequency_metadata populates serialized capture fields.""" + cap = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-int-dir-1"), + ) + + mock_get_client.return_value.search.return_value = { + "hits": { + "total": {"value": 1}, + "hits": [ + { + "_id": str(cap.uuid), + "_source": { + "search_props": { + "center_frequency": 5_000_000_000.0, + "sample_rate": 25_000_000, + "start_time": 3_000_000_000, + "end_time": 3_000_000_015, + }, + "capture_props": {}, + }, + }, + ], + }, + } + + response = self.client.get(self.list_url) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["count"] >= 1 + + @patch( + "sds_gateway.api_methods.models.get_opensearch_client", + ) + def test_bulk_load_does_not_crash_on_mismatched_hits( + self, + mock_get_client, + ) -> None: + """Fewer bulk hits than requested leaves remaining captures with empty cache.""" + cap1 = Capture.objects.create( + capture_type=CaptureType.DigitalRF, + channel="ch0", + index_name="captures-test-bulk", + owner=self.user, + top_level_dir=_normalize_top_level_dir("bulk-int-dir-2"), + ) + # Simulate bulk query returning only 1 of 2 hits + mock_get_client.return_value.search.return_value = { + "hits": { + "total": {"value": 1}, + "hits": [ + { + "_id": str(cap1.uuid), + "_source": { + "search_props": { + "center_frequency": 1_000_000_000.0, + "sample_rate": 10_000_000, + "start_time": 1_000_000_000, + "end_time": 1_000_000_005, + }, + "capture_props": {}, + }, + }, + ], + }, + } + + # Should not raise - cap2 gets empty cache + response = self.client.get(self.list_url) + assert response.status_code == status.HTTP_200_OK diff --git a/gateway/sds_gateway/api_methods/views/capture_endpoints.py b/gateway/sds_gateway/api_methods/views/capture_endpoints.py index 3515c7ff..aad1708a 100644 --- a/gateway/sds_gateway/api_methods/views/capture_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/capture_endpoints.py @@ -672,8 +672,27 @@ def _paginate_composite_captures( ) -> Response: """Paginate and serialize composite capture results.""" - # Get composite captures - composite_captures = get_composite_captures(captures, request=request) + # Bulk-load OpenSearch metadata for all captures before + # grouping and serialization. This replaces O(n) individual + # round-trips (one per capture) with 2 bulk queries + # (DRF + RadioHound) and caches results on each instance. + capture_list = list(captures) + log.debug( + "Bulk-loading OpenSearch metadata for %d captures", + len(capture_list), + ) + bulk_metadata = Capture.bulk_load_frequency_metadata(captures) + Capture.set_bulk_metadata_cache(capture_list, bulk_metadata) + + # Get composite captures — pass the materialized list to + # guarantee instance identity so the per-instance cache + # populated by set_bulk_metadata_cache is visible in the + # serialization path. + composite_captures = get_composite_captures( + capture_list, + request=request, + bulk_metadata=bulk_metadata, + ) # Manual pagination for composite captures paginator = CapturePagination() diff --git a/gateway/sds_gateway/users/views/captures.py b/gateway/sds_gateway/users/views/captures.py index 1973f257..e29486ea 100644 --- a/gateway/sds_gateway/users/views/captures.py +++ b/gateway/sds_gateway/users/views/captures.py @@ -133,10 +133,20 @@ def _get_captures_for_template( ) -> list[dict[str, Any]]: """Get enhanced captures for the template.""" enhanced_captures = [] - for capture in captures: + + # Bulk-load OpenSearch metadata before serialization loop + captures_list = list(captures) + if captures_list: + temp_qs = Capture.objects.filter(uuid__in=[c.uuid for c in captures_list]) + bulk_metadata = Capture.bulk_load_frequency_metadata(temp_qs) + Capture.set_bulk_metadata_cache(captures_list, bulk_metadata) + else: + bulk_metadata = {} + + for capture in captures_list: # Use composite serialization to handle multi-channel captures properly capture_data = serialize_capture_or_composite( - capture, context={"request": request} + capture, context={"request": request, "bulk_metadata": bulk_metadata} ) # Add ownership flags for template display diff --git a/gateway/sds_gateway/users/views_deprecated.py b/gateway/sds_gateway/users/views_deprecated.py index 169aed84..ab67417b 100644 --- a/gateway/sds_gateway/users/views_deprecated.py +++ b/gateway/sds_gateway/users/views_deprecated.py @@ -1286,9 +1286,21 @@ def _get_captures_for_template( ) -> list[dict[str, Any]]: """Get enhanced captures for the template.""" enhanced_captures = [] - for capture in captures: + + # Bulk-load OpenSearch metadata before serialization loop + captures_list = list(captures) + if captures_list: + temp_qs = Capture.objects.filter(uuid__in=[c.uuid for c in captures_list]) + bulk_metadata = Capture.bulk_load_frequency_metadata(temp_qs) + Capture.set_bulk_metadata_cache(captures_list, bulk_metadata) + else: + bulk_metadata = {} + + for capture in captures_list: # Use composite serialization to handle multi-channel captures properly - capture_data = serialize_capture_or_composite(capture) + capture_data = serialize_capture_or_composite( + capture, context={"bulk_metadata": bulk_metadata} + ) # Composite serialization omits top-level name; # templates and API need a display name From 4f0b6345f16111f6c1a97a00ac2f42174f1a24f6 Mon Sep 17 00:00:00 2001 From: Lucas Parzianello Date: Mon, 1 Jun 2026 18:13:51 -0400 Subject: [PATCH 2/3] models.py using loguru --- gateway/sds_gateway/api_methods/models.py | 87 ++++++++++------------- 1 file changed, 36 insertions(+), 51 deletions(-) diff --git a/gateway/sds_gateway/api_methods/models.py b/gateway/sds_gateway/api_methods/models.py index 07c4b9dc..8c56b3c8 100644 --- a/gateway/sds_gateway/api_methods/models.py +++ b/gateway/sds_gateway/api_methods/models.py @@ -2,7 +2,6 @@ import datetime import json -import logging import threading import uuid from enum import StrEnum @@ -24,14 +23,13 @@ from django.db.models.signals import pre_delete from django.dispatch import receiver from django.template.defaultfilters import slugify +from loguru import logger as log from .utils.opensearch_client import get_opensearch_client if TYPE_CHECKING: from sds_gateway.users.models import User -log = logging.getLogger(__name__) - # Thread-local storage for request-scoped OpenSearch metadata cache. # Used as a safety net in get_opensearch_metadata() when the per-instance # cache (_opensearch_metadata_cache) is missing (e.g. on fresh model @@ -494,7 +492,7 @@ def get_drf_data_files_queryset(self) -> QuerySet[File]: ) if self.capture_type != CaptureType.DigitalRF: - log.warning("Capture %s is not a DigitalRF capture", self.uuid) + log.warning(f"Capture {self.uuid} is not a DigitalRF capture") return File.objects.none() return get_capture_files(self, include_deleted=False).filter( @@ -565,13 +563,9 @@ def get_files_summary(self) -> dict[str, Any]: } if summary["total_size"] < size: log.warning( - ( - "Capture %s: total_size (%s) < data_files total_size (%s); " - "using data total." - ), - str(self.uuid), - summary["total_size"], - size, + f"Capture {self.uuid}: total_size ({summary['total_size']}) " + f"< data_files total_size ({size}); " + "using data total." ) summary["total_size"] = size @@ -590,7 +584,7 @@ def get_opensearch_metadata(self) -> dict[str, Any]: dict: Frequency metadata (center_frequency, sample_rate, etc.) """ if hasattr(self, "_opensearch_metadata_cache"): - log.trace("meta_cache HIT for %s", self.uuid) + log.trace(f"meta_cache HIT for {self.uuid}") return self._opensearch_metadata_cache # Fallback: thread-local cache (populated by set_bulk_metadata_cache). @@ -601,11 +595,11 @@ def get_opensearch_metadata(self) -> dict[str, Any]: if tl is not None: cached = tl.get(str(self.uuid)) if cached is not None: - log.trace("meta_cache thread-local HIT for %s", self.uuid) + log.trace(f"meta_cache thread-local HIT for {self.uuid}") self._opensearch_metadata_cache = cached return cached - log.trace("meta_cache MISS for %s", self.uuid) + log.trace(f"meta_cache MISS for {self.uuid}") result: dict[str, Any] = {} try: @@ -626,7 +620,7 @@ def get_opensearch_metadata(self) -> dict[str, Any]: index_name = f"captures-{self.capture_type}" log.debug( - "Querying OpenSearch index '%s' for capture %s", index_name, self.uuid + f"Querying OpenSearch index '{index_name}' for capture {self.uuid}" ) response = client.search( @@ -639,10 +633,10 @@ def get_opensearch_metadata(self) -> dict[str, Any]: source = response["hits"]["hits"][0]["_source"] result = self._extract_metadata_from_source(source) else: - log.warning("No OpenSearch data found for capture %s", self.uuid) + log.warning(f"No OpenSearch data found for capture {self.uuid}") - except Exception: - log.exception("Error querying OpenSearch for capture %s", self.uuid) + except Exception: # noqa: BLE001 + log.exception(f"Error querying OpenSearch for capture {self.uuid}") self._opensearch_metadata_cache = result return self._opensearch_metadata_cache @@ -651,7 +645,7 @@ def _extract_metadata_from_source(self, source: dict[str, Any]) -> dict[str, Any """Extract frequency metadata from OpenSearch source data.""" search_props = source.get("search_props", {}) - log.debug("OpenSearch data for %s: search_props=%s", self.uuid, search_props) + log.debug(f"OpenSearch data for {self.uuid}: search_props={search_props}") # Try search_props first (preferred) center_frequency = search_props.get("center_frequency") @@ -662,8 +656,8 @@ def _extract_metadata_from_source(self, source: dict[str, Any]) -> dict[str, Any if not center_frequency or not sample_rate: capture_props = source.get("capture_props", {}) log.info( - "search_props incomplete, checking capture_props: %s", - list(capture_props.keys()), + f"search_props incomplete, checking capture_props: " + f"{list(capture_props.keys())}" ) if self.capture_type == CaptureType.DigitalRF: @@ -699,7 +693,7 @@ def _extract_drf_file_cadence_from_search_props( end_time = search_props.get("end_time") if start_time is None or end_time is None: - log.warning("Start or end time not found for DRF capture %s", self.uuid) + log.warning(f"Start or end time not found for DRF capture {self.uuid}") return None if count == 0: @@ -738,7 +732,7 @@ def bulk_load_frequency_metadata( ) frequency_data.update(type_frequency_data) - except Exception: + except Exception: # noqa: BLE001 log.exception("Error bulk loading frequency metadata") return {} else: @@ -780,10 +774,8 @@ def set_bulk_metadata_cache( _request_cache.opensearch_metadata = metadata log.debug( - "set_bulk_metadata_cache: loaded=%d, missing=%d, total=%d", - loaded, - missing, - len(captures), + f"set_bulk_metadata_cache: loaded={loaded}, missing={missing}, " + f"total={len(captures)}" ) def debug_opensearch_response(self) -> dict[str, Any] | None: @@ -806,9 +798,9 @@ def debug_opensearch_response(self) -> dict[str, Any] | None: query = {"query": {"term": {"_id": str(self.uuid)}}} log.debug("=== DEBUG: OpenSearch Query ===") - log.debug("Index: %s", index_name) - log.debug("UUID: %s", self.uuid) - log.debug("Query: %s", query) + log.debug(f"Index: {index_name}") + log.debug(f"UUID: {self.uuid}") + log.debug(f"Query: {query}") response = client.search( index=index_name, @@ -817,21 +809,21 @@ def debug_opensearch_response(self) -> dict[str, Any] | None: ) log.debug("=== DEBUG: OpenSearch Response ===") - log.debug("Total hits: %s", response["hits"]["total"]["value"]) + log.debug(f"Total hits: {response['hits']['total']['value']}") if response["hits"]["total"]["value"] > 0: source = response["hits"]["hits"][0]["_source"] log.debug("=== DEBUG: Full Source Data ===") - log.debug("Source keys: %s", list(source.keys())) + log.debug(f"Source keys: {list(source.keys())}") search_props = source.get("search_props", {}) log.debug("=== DEBUG: search_props ===") - log.debug("search_props keys: %s", list(search_props.keys())) - log.debug("search_props content: %s", search_props) + log.debug(f"search_props keys: {list(search_props.keys())}") + log.debug(f"search_props content: {search_props}") capture_props = source.get("capture_props", {}) log.debug("=== DEBUG: capture_props ===") - log.debug("capture_props keys: %s", list(capture_props.keys())) + log.debug(f"capture_props keys: {list(capture_props.keys())}") if capture_props: # Just show a few key fields to avoid log spam key_fields = [ @@ -844,12 +836,10 @@ def debug_opensearch_response(self) -> dict[str, Any] | None: ] for field in key_fields: if field in capture_props: - log.debug( - "capture_props.%s = %s", field, capture_props[field] - ) + log.debug(f"capture_props.{field} = {capture_props[field]}") return source - except Exception: + except Exception: # noqa: BLE001 log.exception("=== DEBUG: Exception occurred ===") log.exception("Error occurred during frequency metadata extraction") return None @@ -1104,17 +1094,16 @@ def get_authors_display(self): # from_db should have already converted JSON string to list if not isinstance(self.authors, list): log.warning( - "Dataset %s: authors field is not a list (type: %s)", - self.uuid, - type(self.authors).__name__, + f"Dataset {self.uuid}: authors field is not a list " + f"(type: {type(self.authors).__name__})", ) return [] # Check if authors are in old string format and need conversion if self.authors and isinstance(self.authors[0], str): log.warning( - "Dataset %s: authors still in old string format, needs migration", - self.uuid, + f"Dataset {self.uuid}: authors still in old string format, " + f"needs migration", ) # Convert old format for backward compatibility return [{"name": author, "orcid_id": ""} for author in self.authors] @@ -1604,8 +1593,7 @@ def _extract_drf_capture_props( # if still no center_frequency, log warning but continue if not center_frequency: log.warning( - "No center frequency found for DRF capture %s", - capture_uuid, + f"No center frequency found for DRF capture {capture_uuid}", ) return center_frequency, sample_rate @@ -1648,15 +1636,12 @@ def _extract_drf_sample_rate(capture_props: dict[str, Any]) -> float | None: # try sample_rate_numerator/denominator first sample_rate = numerator / denominator log.debug( - "Calculated DRF sample_rate: %s/%s = %s", - numerator, - denominator, - sample_rate, + f"Calculated DRF sample_rate: {numerator}/{denominator} = {sample_rate}", ) elif capture_props.get("samples_per_second"): # fallback to samples_per_second if numerator/denominator missing sample_rate = capture_props.get("samples_per_second") - log.debug("Using DRF samples_per_second: %s", sample_rate) + log.debug(f"Using DRF samples_per_second: {sample_rate}") return sample_rate From be169e6b1a38eb9574bd3e5cfeaee437fae72eef Mon Sep 17 00:00:00 2001 From: Lucas Parzianello Date: Mon, 1 Jun 2026 18:29:32 -0400 Subject: [PATCH 3/3] gwy: fixed failing test --- gateway/compose.local.yaml | 2 +- gateway/sds_gateway/api_methods/helpers/search_captures.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/gateway/compose.local.yaml b/gateway/compose.local.yaml index 9dd1d14e..2f18cf9e 100644 --- a/gateway/compose.local.yaml +++ b/gateway/compose.local.yaml @@ -26,7 +26,7 @@ networks: sds-gateway-local-opensearch-net: driver: bridge sds-network-local: - external: true # make it external if running with traefik on this machine + # external: true # make it external if running with traefik on this machine # should match traefik's network name name: sds-network-local driver: bridge diff --git a/gateway/sds_gateway/api_methods/helpers/search_captures.py b/gateway/sds_gateway/api_methods/helpers/search_captures.py index e1ef4379..6cfa408b 100644 --- a/gateway/sds_gateway/api_methods/helpers/search_captures.py +++ b/gateway/sds_gateway/api_methods/helpers/search_captures.py @@ -376,9 +376,6 @@ def get_composite_captures( if len(capture_list) > 1: # Multiple captures with same top_level_dir - create composite composite_data = build_composite_capture_data(capture_list) - # Inject original instances so CompositeCaptureSerializer finds them - # with tier-1 cache already populated, avoiding extra DB query. - composite_data["_captures_by_uuid"] = {str(c.uuid): c for c in capture_list} composite_captures.append(composite_data) else: # Single capture - serialize normally