Skip to content

Add low-latency raw memory search#173

Open
strongkeep-debug wants to merge 4 commits into
XortexAI:mainfrom
strongkeep-debug:codex/163-search-fast-path
Open

Add low-latency raw memory search#173
strongkeep-debug wants to merge 4 commits into
XortexAI:mainfrom
strongkeep-debug:codex/163-search-fast-path

Conversation

@strongkeep-debug
Copy link
Copy Markdown

@strongkeep-debug strongkeep-debug commented May 11, 2026

Addresses #163.

This PR turns memory search into a true low-latency path. Raw search now goes through RetrievalPipeline.search_raw and returns ranked profile, temporal, summary, snippet, and code annotation hits without retrieval-plan tool selection. answer=true synthesizes from those already-fetched hits when a caller wants a generated answer, and the root /search alias is wired for clients that need the shorter path.

Area Change Evidence
Raw retrieval Selected domains are searched directly and ranked by score, including the code annotation domain requested by the issue. test_raw_search_returns_ranked_hits_without_tool_selection confirms no tool-selection call is made and verifies a code hit keeps file and symbol metadata.
API contract /v1/memory/search accepts code in the domain list and includes it in the default raw search domain set. test_memory_search_route_accepts_code_domain covers the request validator and serialized response shape.
Optional answer answer=true synthesizes from collected raw hits without doing agentic retrieval planning first. test_root_search_alias_can_synthesize_answer covers the alias and answer mode.
Caching and latency Profile catalogs and retrieval plans are cached for the agentic path, and bounded p50/p95/p99 latency snapshots are recorded for raw, answer, and agentic modes. test_retrieval_pipeline_caches_catalog_and_retrieval_plan covers cache reuse.
Robustness Raw search now normalizes missing/non-finite backend scores and keeps healthy domain results when another requested domain fails. test_raw_search_skips_failed_domains_and_normalizes_scores and the API route regression cover both pipeline and serialization behavior.

Validation was run locally:

.\.venv\Scripts\python.exe -m pytest -q
50 passed

.\.venv\Scripts\python.exe -m pytest tests\integration\test_retrieval_pipeline.py tests\api\test_memory_search_routes.py -q
12 passed

.\.venv\Scripts\python.exe -m ruff check src\pipelines\retrieval.py src\api\routes\memory.py tests\integration\test_retrieval_pipeline.py tests\api\test_memory_search_routes.py
All checks passed!

git diff --check
passed

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a low-latency raw search endpoint and enhances the existing search functionality with optional answer synthesis and latency tracking. Key changes include the implementation of search_raw and answer_from_sources in the retrieval pipeline, the addition of TTL-based caching for profile catalogs and retrieval plans, and the inclusion of detailed latency metrics in search responses. Feedback focuses on optimizing performance by parallelizing domain searches, ensuring the profile catalog retrieval is asynchronous to avoid blocking the event loop, and managing cache memory usage through bounded collections and hashed keys.

Comment thread src/pipelines/retrieval.py Outdated
Comment on lines +280 to +287
if "profile" in domain_set:
results.extend(await self._search_profile_raw(query, user_id, top_k))
if "temporal" in domain_set:
results.extend(await self._search_temporal(query, user_id, top_k))
if "summary" in domain_set:
results.extend(await self._search_summary(query, user_id, top_k))
if "snippet" in domain_set:
results.extend(await self._search_snippet(query, user_id, top_k))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of search_raw executes searches across different domains sequentially. To achieve true low-latency as intended by this PR, these searches should be executed in parallel using asyncio.gather.

Suggested change
if "profile" in domain_set:
results.extend(await self._search_profile_raw(query, user_id, top_k))
if "temporal" in domain_set:
results.extend(await self._search_temporal(query, user_id, top_k))
if "summary" in domain_set:
results.extend(await self._search_summary(query, user_id, top_k))
if "snippet" in domain_set:
results.extend(await self._search_snippet(query, user_id, top_k))
domain_set = set(domains)
tasks = []
if "profile" in domain_set:
tasks.append(self._search_profile_raw(query, user_id, top_k))
if "temporal" in domain_set:
tasks.append(self._search_temporal(query, user_id, top_k))
if "summary" in domain_set:
tasks.append(self._search_summary(query, user_id, top_k))
if "snippet" in domain_set:
tasks.append(self._search_snippet(query, user_id, top_k))
task_results = await asyncio.gather(*tasks)
results: List[SourceRecord] = [item for sublist in task_results for item in sublist]

Comment thread src/pipelines/retrieval.py Outdated
Comment on lines +572 to +584
def _get_profile_catalog(self, user_id: str):
cached = self._profile_catalog_cache.get(user_id)
now = time.monotonic()
if cached and cached[0] > now:
return cached[1], cached[2]

catalog, results = self._fetch_profile_catalog(user_id)
self._profile_catalog_cache[user_id] = (
now + _CACHE_TTL_SECONDS,
catalog,
results,
)
return catalog, results
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The _get_profile_catalog method is synchronous and performs network I/O via _fetch_profile_catalog (which calls vector_store.search_by_metadata). Calling this from an async context like run or search_raw will block the entire event loop, significantly impacting performance and defeating the purpose of a low-latency path. This should be made asynchronous.

Suggested change
def _get_profile_catalog(self, user_id: str):
cached = self._profile_catalog_cache.get(user_id)
now = time.monotonic()
if cached and cached[0] > now:
return cached[1], cached[2]
catalog, results = self._fetch_profile_catalog(user_id)
self._profile_catalog_cache[user_id] = (
now + _CACHE_TTL_SECONDS,
catalog,
results,
)
return catalog, results
async def _get_profile_catalog(self, user_id: str):
cached = self._profile_catalog_cache.get(user_id)
now = time.monotonic()
if cached and cached[0] > now:
return cached[1], cached[2]
# Assuming search_by_metadata is made async or wrapped in an executor
catalog, results = await self._fetch_profile_catalog(user_id)
self._profile_catalog_cache[user_id] = (
now + _CACHE_TTL_SECONDS,
catalog,
results,
)
return catalog, results

Comment thread src/pipelines/retrieval.py Outdated
Comment on lines +140 to +142
self._profile_catalog_cache: Dict[str, tuple[float, List[Dict[str, str]], List[Any]]] = {}
self._retrieval_plan_cache: Dict[tuple[str, str, int, str], tuple[float, AIMessage]] = {}
self._latency_samples: Dict[str, List[float]] = {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The caches _profile_catalog_cache and _retrieval_plan_cache are unbounded dictionaries that only expire entries on access. This can lead to a memory leak as entries for users who do not return will persist indefinitely. Consider using a cache with a maximum size and an eviction policy (e.g., cachetools.TTLCache).

Comment thread src/pipelines/retrieval.py Outdated
]

ai_response: AIMessage = await self.model_with_tools.ainvoke(messages)
plan_key = (user_id, query.strip(), top_k, catalog_text)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using the entire catalog_text as part of the cache key for _retrieval_plan_cache can be memory-intensive if the catalog is large. Consider using a hash of the catalog_text instead.

        import hashlib
        catalog_hash = hashlib.sha256(catalog_text.encode()).hexdigest()
        plan_key = (user_id, query.strip(), top_k, catalog_hash)

@strongkeep-debug
Copy link
Copy Markdown
Author

strongkeep-debug commented May 11, 2026

Follow-up after the latest push: 5548b63 now includes the requested code domain in the raw search path, preserves repository/file/symbol/type/severity metadata, and hardens raw search against missing or non-finite backend scores. It also keeps healthy domain results if another requested domain fails.

The earlier performance review items are addressed in the same branch: raw domain searches run concurrently, profile catalog lookup is async, and both caches are bounded with hashed plan keys. Current local verification is 50 passed for the full suite, 12 passed for the targeted retrieval/API tests, touched-file Ruff clean, and git diff --check clean. The current PR label check is green.

Update: GitHub Actions Test Suite is now green as well: Unit, API, and Integration Tests and End-to-End Tests both passed on the current head, and the PR remains mergeable with a clean merge state.

@ved015
Copy link
Copy Markdown
Contributor

ved015 commented May 13, 2026

@strongkeep-debug thank you for your contribution pls review the gemini suggestions and resolve them pls make sure to also add comment on the suggestions :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants