Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gateway/compose.local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ networks:
sds-gateway-local-opensearch-net:
driver: bridge
sds-network-local:
external: true # make it external if running with traefik on this machine
# external: true # make it external if running with traefik on this machine
# should match traefik's network name
name: sds-network-local
driver: bridge
Expand Down
13 changes: 11 additions & 2 deletions gateway/sds_gateway/api_methods/helpers/search_captures.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,29 @@ def _build_os_query_for_captures(
# Need to paginate/limit OpenSearch results list before grouping
# and then paginate/limit the grouped captures
def get_composite_captures(
captures: QuerySet[Capture], request: Request | None = None
captures: QuerySet[Capture],
request: Request | None = None,
bulk_metadata: dict[str, dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""Get captures as composite objects, grouping multi-channel captures.

Args:
captures: QuerySet of Capture objects
request: Optional Django REST framework request for serializer context
bulk_metadata: Optional pre-loaded OpenSearch metadata mapping
``uuid_str → metadata_dict``. When provided, the
serialization path populates related capture
instances' internal cache so ``get_opensearch_metadata()``
returns without additional round-trips.
Returns:
list: List of composite capture data
"""
grouped_captures = group_captures_by_top_level_dir(captures)
composite_captures = []

context = {"request": request} if request else {}
context: dict[str, Any] = {"request": request} if request else {}
if bulk_metadata is not None:
context["bulk_metadata"] = bulk_metadata

for capture_list in grouped_captures.values():
if len(capture_list) > 1:
Expand Down
158 changes: 111 additions & 47 deletions gateway/sds_gateway/api_methods/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import datetime
import json
import logging
import threading
import uuid
from enum import StrEnum
from pathlib import Path
Expand All @@ -12,6 +12,7 @@

from blake3 import blake3 as Blake3 # noqa: N812
from django.conf import settings
from django.core.signals import request_started
from django.db import models
from django.db.models import Count
from django.db.models import ProtectedError
Expand All @@ -22,13 +23,28 @@
from django.db.models.signals import pre_delete
from django.dispatch import receiver
from django.template.defaultfilters import slugify
from loguru import logger as log

from .utils.opensearch_client import get_opensearch_client

if TYPE_CHECKING:
from sds_gateway.users.models import User

log = logging.getLogger(__name__)
# Thread-local storage for request-scoped OpenSearch metadata cache.
# Used as a safety net in get_opensearch_metadata() when the per-instance
# cache (_opensearch_metadata_cache) is missing (e.g. on fresh model
# instances created by get_capture()'s list(related_captures)).
_request_cache = threading.local()


def _clear_request_cache(sender, **kwargs):
"""Clear thread-local OpenSearch metadata cache at start of each request."""
for attr in ("opensearch_metadata",):
if hasattr(_request_cache, attr):
delattr(_request_cache, attr)


request_started.connect(_clear_request_cache, weak=False)

DRF_RF_FILENAME_REGEX_STR = r"^rf@\d+\.\d+\.h5$"

Expand Down Expand Up @@ -455,13 +471,14 @@ def get_capture(self) -> dict[str, Any]:
is_deleted=False,
).order_by("channel")

if related_captures.count() <= 1:
capture_list = list(related_captures)
if len(capture_list) <= 1:
# Only one capture found, return as single capture
return {"capture": self, "is_composite": False}

# Multiple captures found, create composite
return {
"captures": list(related_captures),
"captures": capture_list,
"is_composite": True,
"top_level_dir": self.top_level_dir,
"capture_type": self.capture_type,
Expand All @@ -475,7 +492,7 @@ def get_drf_data_files_queryset(self) -> QuerySet[File]:
)

if self.capture_type != CaptureType.DigitalRF:
log.warning("Capture %s is not a DigitalRF capture", self.uuid)
log.warning(f"Capture {self.uuid} is not a DigitalRF capture")
return File.objects.none()

return get_capture_files(self, include_deleted=False).filter(
Expand Down Expand Up @@ -546,13 +563,9 @@ def get_files_summary(self) -> dict[str, Any]:
}
if summary["total_size"] < size:
log.warning(
(
"Capture %s: total_size (%s) < data_files total_size (%s); "
"using data total."
),
str(self.uuid),
summary["total_size"],
size,
f"Capture {self.uuid}: total_size ({summary['total_size']}) "
f"< data_files total_size ({size}); "
"using data total."
)
summary["total_size"] = size

Expand All @@ -571,8 +584,23 @@ def get_opensearch_metadata(self) -> dict[str, Any]:
dict: Frequency metadata (center_frequency, sample_rate, etc.)
"""
if hasattr(self, "_opensearch_metadata_cache"):
log.trace(f"meta_cache HIT for {self.uuid}")
return self._opensearch_metadata_cache

# Fallback: thread-local cache (populated by set_bulk_metadata_cache).
# Catches fresh model instances (e.g. from get_capture() →
# list(related_captures)) that don't share the original queryset
# instance identity.
tl = getattr(_request_cache, "opensearch_metadata", None)
if tl is not None:
cached = tl.get(str(self.uuid))
if cached is not None:
log.trace(f"meta_cache thread-local HIT for {self.uuid}")
self._opensearch_metadata_cache = cached
return cached

log.trace(f"meta_cache MISS for {self.uuid}")

result: dict[str, Any] = {}
try:
client = get_opensearch_client()
Expand All @@ -591,8 +619,8 @@ def get_opensearch_metadata(self) -> dict[str, Any]:
# It's already a string value
index_name = f"captures-{self.capture_type}"

log.info(
"Querying OpenSearch index '%s' for capture %s", index_name, self.uuid
log.debug(
f"Querying OpenSearch index '{index_name}' for capture {self.uuid}"
)

response = client.search(
Expand All @@ -605,10 +633,10 @@ def get_opensearch_metadata(self) -> dict[str, Any]:
source = response["hits"]["hits"][0]["_source"]
result = self._extract_metadata_from_source(source)
else:
log.warning("No OpenSearch data found for capture %s", self.uuid)
log.warning(f"No OpenSearch data found for capture {self.uuid}")

except Exception:
log.exception("Error querying OpenSearch for capture %s", self.uuid)
except Exception: # noqa: BLE001
log.exception(f"Error querying OpenSearch for capture {self.uuid}")

self._opensearch_metadata_cache = result
return self._opensearch_metadata_cache
Expand All @@ -617,7 +645,7 @@ def _extract_metadata_from_source(self, source: dict[str, Any]) -> dict[str, Any
"""Extract frequency metadata from OpenSearch source data."""

search_props = source.get("search_props", {})
log.info("OpenSearch data for %s: search_props=%s", self.uuid, search_props)
log.debug(f"OpenSearch data for {self.uuid}: search_props={search_props}")

# Try search_props first (preferred)
center_frequency = search_props.get("center_frequency")
Expand All @@ -628,8 +656,8 @@ def _extract_metadata_from_source(self, source: dict[str, Any]) -> dict[str, Any
if not center_frequency or not sample_rate:
capture_props = source.get("capture_props", {})
log.info(
"search_props incomplete, checking capture_props: %s",
list(capture_props.keys()),
f"search_props incomplete, checking capture_props: "
f"{list(capture_props.keys())}"
)

if self.capture_type == CaptureType.DigitalRF:
Expand Down Expand Up @@ -665,7 +693,7 @@ def _extract_drf_file_cadence_from_search_props(
end_time = search_props.get("end_time")

if start_time is None or end_time is None:
log.warning("Start or end time not found for DRF capture %s", self.uuid)
log.warning(f"Start or end time not found for DRF capture {self.uuid}")
return None

if count == 0:
Expand Down Expand Up @@ -704,12 +732,52 @@ def bulk_load_frequency_metadata(
)
frequency_data.update(type_frequency_data)

except Exception:
except Exception: # noqa: BLE001
log.exception("Error bulk loading frequency metadata")
return {}
else:
return frequency_data

@classmethod
def set_bulk_metadata_cache(
cls, captures: list["Capture"], metadata: dict[str, dict[str, Any]]
) -> None:
"""Attach bulk-loaded metadata to each capture instance.

Sets ``_opensearch_metadata_cache`` on each capture so that
``get_opensearch_metadata()`` returns early without an additional
OpenSearch round-trip.

Args:
captures: List of Capture instances to attach metadata to.
metadata: Mapping from ``uuid_str`` to metadata dict
(as returned by ``bulk_load_frequency_metadata``).
"""
# Clear any stale thread-local data before setting fresh data
if hasattr(_request_cache, "opensearch_metadata"):
del _request_cache.opensearch_metadata

loaded = 0
missing = 0
for capture in captures:
cache_key = str(capture.uuid)
if cache_key in metadata:
capture._opensearch_metadata_cache = metadata[cache_key] # noqa: SLF001
loaded += 1
else:
missing += 1
capture._opensearch_metadata_cache = {} # noqa: SLF001

# Also store in thread-local cache so fresh instances (e.g. from
# get_capture() → list(related_captures)) can still find their
# metadata without an individual round-trip.
_request_cache.opensearch_metadata = metadata

log.debug(
f"set_bulk_metadata_cache: loaded={loaded}, missing={missing}, "
f"total={len(captures)}"
)

def debug_opensearch_response(self) -> dict[str, Any] | None:
"""
Debug method to see exactly what OpenSearch returns for this capture.
Expand All @@ -730,9 +798,9 @@ def debug_opensearch_response(self) -> dict[str, Any] | None:
query = {"query": {"term": {"_id": str(self.uuid)}}}

log.debug("=== DEBUG: OpenSearch Query ===")
log.debug("Index: %s", index_name)
log.debug("UUID: %s", self.uuid)
log.debug("Query: %s", query)
log.debug(f"Index: {index_name}")
log.debug(f"UUID: {self.uuid}")
log.debug(f"Query: {query}")

response = client.search(
index=index_name,
Expand All @@ -741,21 +809,21 @@ def debug_opensearch_response(self) -> dict[str, Any] | None:
)

log.debug("=== DEBUG: OpenSearch Response ===")
log.debug("Total hits: %s", response["hits"]["total"]["value"])
log.debug(f"Total hits: {response['hits']['total']['value']}")

if response["hits"]["total"]["value"] > 0:
source = response["hits"]["hits"][0]["_source"]
log.debug("=== DEBUG: Full Source Data ===")
log.debug("Source keys: %s", list(source.keys()))
log.debug(f"Source keys: {list(source.keys())}")

search_props = source.get("search_props", {})
log.debug("=== DEBUG: search_props ===")
log.debug("search_props keys: %s", list(search_props.keys()))
log.debug("search_props content: %s", search_props)
log.debug(f"search_props keys: {list(search_props.keys())}")
log.debug(f"search_props content: {search_props}")

capture_props = source.get("capture_props", {})
log.debug("=== DEBUG: capture_props ===")
log.debug("capture_props keys: %s", list(capture_props.keys()))
log.debug(f"capture_props keys: {list(capture_props.keys())}")
if capture_props:
# Just show a few key fields to avoid log spam
key_fields = [
Expand All @@ -768,12 +836,10 @@ def debug_opensearch_response(self) -> dict[str, Any] | None:
]
for field in key_fields:
if field in capture_props:
log.debug(
"capture_props.%s = %s", field, capture_props[field]
)
log.debug(f"capture_props.{field} = {capture_props[field]}")

return source
except Exception:
except Exception: # noqa: BLE001
log.exception("=== DEBUG: Exception occurred ===")
log.exception("Error occurred during frequency metadata extraction")
return None
Expand Down Expand Up @@ -1028,17 +1094,16 @@ def get_authors_display(self):
# from_db should have already converted JSON string to list
if not isinstance(self.authors, list):
log.warning(
"Dataset %s: authors field is not a list (type: %s)",
self.uuid,
type(self.authors).__name__,
f"Dataset {self.uuid}: authors field is not a list "
f"(type: {type(self.authors).__name__})",
)
return []

# Check if authors are in old string format and need conversion
if self.authors and isinstance(self.authors[0], str):
log.warning(
"Dataset %s: authors still in old string format, needs migration",
self.uuid,
f"Dataset {self.uuid}: authors still in old string format, "
f"needs migration",
)
# Convert old format for backward compatibility
return [{"name": author, "orcid_id": ""} for author in self.authors]
Expand Down Expand Up @@ -1528,8 +1593,7 @@ def _extract_drf_capture_props(
# if still no center_frequency, log warning but continue
if not center_frequency:
log.warning(
"No center frequency found for DRF capture %s",
capture_uuid,
f"No center frequency found for DRF capture {capture_uuid}",
)

return center_frequency, sample_rate
Expand Down Expand Up @@ -1571,16 +1635,13 @@ def _extract_drf_sample_rate(capture_props: dict[str, Any]) -> float | None:
if numerator and denominator and denominator != 0:
# try sample_rate_numerator/denominator first
sample_rate = numerator / denominator
log.info(
"Calculated DRF sample_rate: %s/%s = %s",
numerator,
denominator,
sample_rate,
log.debug(
f"Calculated DRF sample_rate: {numerator}/{denominator} = {sample_rate}",
)
elif capture_props.get("samples_per_second"):
# fallback to samples_per_second if numerator/denominator missing
sample_rate = capture_props.get("samples_per_second")
log.info("Using DRF samples_per_second: %s", sample_rate)
log.debug(f"Using DRF samples_per_second: {sample_rate}")
return sample_rate


Expand Down Expand Up @@ -1686,6 +1747,9 @@ def _extract_bulk_frequency_data(
"sample_rate": sample_rate,
"frequency_min": search_props.get("frequency_min"),
"frequency_max": search_props.get("frequency_max"),
"start_time": search_props.get("start_time"),
"end_time": search_props.get("end_time"),
"file_cadence": None, # requires per-capture file count; skip in bulk
}


Expand Down
Loading
Loading