Add low-latency raw memory search#173
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a low-latency raw search endpoint and enhances the existing search functionality with optional answer synthesis and latency tracking. Key changes include the implementation of search_raw and answer_from_sources in the retrieval pipeline, the addition of TTL-based caching for profile catalogs and retrieval plans, and the inclusion of detailed latency metrics in search responses. Feedback focuses on optimizing performance by parallelizing domain searches, ensuring the profile catalog retrieval is asynchronous to avoid blocking the event loop, and managing cache memory usage through bounded collections and hashed keys.
| if "profile" in domain_set: | ||
| results.extend(await self._search_profile_raw(query, user_id, top_k)) | ||
| if "temporal" in domain_set: | ||
| results.extend(await self._search_temporal(query, user_id, top_k)) | ||
| if "summary" in domain_set: | ||
| results.extend(await self._search_summary(query, user_id, top_k)) | ||
| if "snippet" in domain_set: | ||
| results.extend(await self._search_snippet(query, user_id, top_k)) |
There was a problem hiding this comment.
The current implementation of search_raw executes searches across different domains sequentially. To achieve true low-latency as intended by this PR, these searches should be executed in parallel using asyncio.gather.
| if "profile" in domain_set: | |
| results.extend(await self._search_profile_raw(query, user_id, top_k)) | |
| if "temporal" in domain_set: | |
| results.extend(await self._search_temporal(query, user_id, top_k)) | |
| if "summary" in domain_set: | |
| results.extend(await self._search_summary(query, user_id, top_k)) | |
| if "snippet" in domain_set: | |
| results.extend(await self._search_snippet(query, user_id, top_k)) | |
| domain_set = set(domains) | |
| tasks = [] | |
| if "profile" in domain_set: | |
| tasks.append(self._search_profile_raw(query, user_id, top_k)) | |
| if "temporal" in domain_set: | |
| tasks.append(self._search_temporal(query, user_id, top_k)) | |
| if "summary" in domain_set: | |
| tasks.append(self._search_summary(query, user_id, top_k)) | |
| if "snippet" in domain_set: | |
| tasks.append(self._search_snippet(query, user_id, top_k)) | |
| task_results = await asyncio.gather(*tasks) | |
| results: List[SourceRecord] = [item for sublist in task_results for item in sublist] |
| def _get_profile_catalog(self, user_id: str): | ||
| cached = self._profile_catalog_cache.get(user_id) | ||
| now = time.monotonic() | ||
| if cached and cached[0] > now: | ||
| return cached[1], cached[2] | ||
|
|
||
| catalog, results = self._fetch_profile_catalog(user_id) | ||
| self._profile_catalog_cache[user_id] = ( | ||
| now + _CACHE_TTL_SECONDS, | ||
| catalog, | ||
| results, | ||
| ) | ||
| return catalog, results |
There was a problem hiding this comment.
The _get_profile_catalog method is synchronous and performs network I/O via _fetch_profile_catalog (which calls vector_store.search_by_metadata). Calling this from an async context like run or search_raw will block the entire event loop, significantly impacting performance and defeating the purpose of a low-latency path. This should be made asynchronous.
| def _get_profile_catalog(self, user_id: str): | |
| cached = self._profile_catalog_cache.get(user_id) | |
| now = time.monotonic() | |
| if cached and cached[0] > now: | |
| return cached[1], cached[2] | |
| catalog, results = self._fetch_profile_catalog(user_id) | |
| self._profile_catalog_cache[user_id] = ( | |
| now + _CACHE_TTL_SECONDS, | |
| catalog, | |
| results, | |
| ) | |
| return catalog, results | |
| async def _get_profile_catalog(self, user_id: str): | |
| cached = self._profile_catalog_cache.get(user_id) | |
| now = time.monotonic() | |
| if cached and cached[0] > now: | |
| return cached[1], cached[2] | |
| # Assuming search_by_metadata is made async or wrapped in an executor | |
| catalog, results = await self._fetch_profile_catalog(user_id) | |
| self._profile_catalog_cache[user_id] = ( | |
| now + _CACHE_TTL_SECONDS, | |
| catalog, | |
| results, | |
| ) | |
| return catalog, results |
| self._profile_catalog_cache: Dict[str, tuple[float, List[Dict[str, str]], List[Any]]] = {} | ||
| self._retrieval_plan_cache: Dict[tuple[str, str, int, str], tuple[float, AIMessage]] = {} | ||
| self._latency_samples: Dict[str, List[float]] = {} |
There was a problem hiding this comment.
The caches _profile_catalog_cache and _retrieval_plan_cache are unbounded dictionaries that only expire entries on access. This can lead to a memory leak as entries for users who do not return will persist indefinitely. Consider using a cache with a maximum size and an eviction policy (e.g., cachetools.TTLCache).
| ] | ||
|
|
||
| ai_response: AIMessage = await self.model_with_tools.ainvoke(messages) | ||
| plan_key = (user_id, query.strip(), top_k, catalog_text) |
There was a problem hiding this comment.
Using the entire catalog_text as part of the cache key for _retrieval_plan_cache can be memory-intensive if the catalog is large. Consider using a hash of the catalog_text instead.
import hashlib
catalog_hash = hashlib.sha256(catalog_text.encode()).hexdigest()
plan_key = (user_id, query.strip(), top_k, catalog_hash)|
Follow-up after the latest push: The earlier performance review items are addressed in the same branch: raw domain searches run concurrently, profile catalog lookup is async, and both caches are bounded with hashed plan keys. Current local verification is Update: GitHub Actions Test Suite is now green as well: Unit, API, and Integration Tests and End-to-End Tests both passed on the current head, and the PR remains mergeable with a clean merge state. |
|
@strongkeep-debug thank you for your contribution pls review the gemini suggestions and resolve them pls make sure to also add comment on the suggestions :) |
Addresses #163.
This PR turns memory search into a true low-latency path. Raw search now goes through
RetrievalPipeline.search_rawand returns ranked profile, temporal, summary, snippet, and code annotation hits without retrieval-plan tool selection.answer=truesynthesizes from those already-fetched hits when a caller wants a generated answer, and the root/searchalias is wired for clients that need the shorter path.test_raw_search_returns_ranked_hits_without_tool_selectionconfirms no tool-selection call is made and verifies a code hit keeps file and symbol metadata./v1/memory/searchacceptscodein the domain list and includes it in the default raw search domain set.test_memory_search_route_accepts_code_domaincovers the request validator and serialized response shape.answer=truesynthesizes from collected raw hits without doing agentic retrieval planning first.test_root_search_alias_can_synthesize_answercovers the alias and answer mode.test_retrieval_pipeline_caches_catalog_and_retrieval_plancovers cache reuse.test_raw_search_skips_failed_domains_and_normalizes_scoresand the API route regression cover both pipeline and serialization behavior.Validation was run locally: