From 9661daca4910a38dd5530c9e1b1dfc174e4e0f38 Mon Sep 17 00:00:00 2001 From: hunterbastian <81837742+hunterbastian@users.noreply.github.com> Date: Mon, 11 May 2026 21:12:28 -0400 Subject: [PATCH 1/4] Add durable job tracking for ingest and scanner work --- src/api/routes/memory.py | 318 +++++++++++++++++++++++++++++-------- src/api/routes/scanner.py | 193 ++++++++++++++++++++-- src/jobs/__init__.py | 1 + src/jobs/durable.py | 295 ++++++++++++++++++++++++++++++++++ src/scanner/code_store.py | 12 ++ tests/test_durable_jobs.py | 130 +++++++++++++++ 6 files changed, 866 insertions(+), 83 deletions(-) create mode 100644 src/jobs/__init__.py create mode 100644 src/jobs/durable.py create mode 100644 tests/test_durable_jobs.py diff --git a/src/api/routes/memory.py b/src/api/routes/memory.py index b4be36d..4da7fc3 100644 --- a/src/api/routes/memory.py +++ b/src/api/routes/memory.py @@ -24,7 +24,6 @@ from src.api.schemas import ( APIResponse, BatchIngestRequest, - BatchIngestResponse, DomainResult, IngestRequest, IngestResponse, @@ -47,6 +46,14 @@ import re from playwright.sync_api import sync_playwright +from src.jobs.durable import ( + FAILED, + QUEUED, + get_default_job_store, + run_job, + serialize_job, +) + logger = logging.getLogger("xmem.api.routes.memory") _ingest_semaphore = asyncio.Semaphore(5) @@ -98,7 +105,12 @@ def _wrap(request: Request, data: Any, elapsed_ms: float) -> JSONResponse: return resp -def _error(request: Request, detail: str, code: int, elapsed_ms: float = 0) -> JSONResponse: +def _error( + request: Request, + detail: str, + code: int, + elapsed_ms: float = 0, +) -> JSONResponse: body = APIResponse( status=StatusEnum.ERROR, request_id=getattr(request.state, "request_id", None), @@ -108,6 +120,106 @@ def _error(request: Request, detail: str, code: int, elapsed_ms: float = 0) -> J return JSONResponse(content=body.model_dump(), status_code=code) +def _current_user_id(user: dict) -> str: + return user.get("username") or user.get("name") or user["id"] + + +def _job_status_data(job: Dict[str, Any]) -> Dict[str, Any]: + public = serialize_job(job) or {} + return { + "job_id": public.get("job_id"), + "job_type": public.get("job_type"), + "status": public.get("status"), + "retry_count": public.get("retry_count", 0), + "max_attempts": public.get("max_attempts", 0), + "timeout_seconds": public.get("timeout_seconds"), + "error": public.get("error"), + "error_state": public.get("error_state"), + "result": public.get("result"), + "created_at": public.get("created_at"), + "updated_at": public.get("updated_at"), + "started_at": public.get("started_at"), + "completed_at": public.get("completed_at"), + "dead_lettered_at": public.get("dead_lettered_at"), + } + + +def _job_accepted( + request: Request, + job: Dict[str, Any], + created: bool, + status_url: str, + elapsed_ms: float, +) -> JSONResponse: + data = { + "job_id": job["job_id"], + "status": job.get("status", QUEUED), + "created": created, + "status_url": status_url, + } + return _wrap(request, data, elapsed_ms) + + +async def _run_ingest_payload( + payload: Dict[str, Any], + user_id: str, +) -> Dict[str, Any]: + pipeline = get_ingest_pipeline() + async with _ingest_semaphore: + result = await pipeline.run( + user_query=payload["user_query"], + agent_response=payload.get("agent_response") or "Acknowledged.", + user_id=user_id, + session_datetime=payload.get("session_datetime", ""), + image_url=payload.get("image_url", ""), + effort_level=payload.get("effort_level", "low"), + ) + data = IngestResponse( + model=_model_name(pipeline.model), + classification=_safe_classifications(result), + profile=_build_domain_result( + result.get("profile_judge"), + result.get("profile_weaver"), + ), + temporal=_build_domain_result( + result.get("temporal_judge"), + result.get("temporal_weaver"), + ), + summary=_build_domain_result( + result.get("summary_judge"), + result.get("summary_weaver"), + ), + image=_build_domain_result( + result.get("image_judge"), + result.get("image_weaver"), + ), + ) + return data.model_dump() + + +async def _run_batch_ingest_payload( + payload: Dict[str, Any], + user_id: str, +) -> Dict[str, Any]: + results = [] + for item in payload["items"]: + results.append(await _run_ingest_payload(item, user_id)) + return {"results": results} + + +async def _run_scrape_payload(payload: Dict[str, Any]) -> Dict[str, Any]: + result = await _scrape_chat_share(payload["url"]) + pairs = result["pairs"] + if not pairs: + raise ValueError(_chat_share_error_message(result)) + return ScrapeResponse(pairs=pairs).model_dump() + + +def _schedule_job(job: Dict[str, Any], handler) -> None: + if job.get("status") in {QUEUED, FAILED}: + asyncio.create_task(run_job(get_default_job_store(), job["job_id"], handler)) + + def _detect_chat_provider(*urls: str) -> str: for url in urls: lowered = (url or "").lower() @@ -554,38 +666,44 @@ async def _scrape_chat_share(url: str) -> Dict[str, Any]: ) async def ingest_memory(req: IngestRequest, request: Request, user: dict = Depends(require_api_key)): start = time.perf_counter() - pipeline = get_ingest_pipeline() - - # Get username from authenticated user - user_id = user.get("username") or user.get("name") or user["id"] + user_id = _current_user_id(user) + payload = req.model_dump() + payload["user_id"] = user_id try: - async with _ingest_semaphore: - result = await asyncio.wait_for( - pipeline.run( - user_query=req.user_query, - agent_response=req.agent_response or "Acknowledged.", - user_id=user_id, - session_datetime=req.session_datetime, - image_url=req.image_url, - effort_level=req.effort_level, - ), - timeout=120.0 - ) - data = IngestResponse( - model=_model_name(pipeline.model), - classification=_safe_classifications(result), - profile=_build_domain_result(result.get("profile_judge"), result.get("profile_weaver")), - temporal=_build_domain_result(result.get("temporal_judge"), result.get("temporal_weaver")), - summary=_build_domain_result(result.get("summary_judge"), result.get("summary_weaver")), - image=_build_domain_result(result.get("image_judge"), result.get("image_weaver")), + store = get_default_job_store() + job, created = await asyncio.to_thread( + store.enqueue, + job_type="memory_ingest", + payload=payload, + idempotency_fields={ + "user_id": user_id, + "user_query": req.user_query, + "agent_response": req.agent_response or "", + "session_datetime": req.session_datetime, + "image_url": req.image_url, + "effort_level": req.effort_level, + }, + user_id=user_id, + timeout_seconds=120.0, + max_attempts=3, + ) + _schedule_job( + job, + lambda: _run_ingest_payload(payload, user_id), ) elapsed = round((time.perf_counter() - start) * 1000, 2) - return _wrap(request, data, elapsed) + return _job_accepted( + request, + job, + created, + f"/v1/memory/ingest/{job['job_id']}/status", + elapsed, + ) except Exception as exc: elapsed = round((time.perf_counter() - start) * 1000, 2) - logger.exception("Ingest failed for user=%s", user_id) + logger.exception("Ingest enqueue failed for user=%s", user_id) return _error(request, str(exc), 500, elapsed) @@ -596,6 +714,45 @@ def _safe_classifications(result: Dict[str, Any]) -> list: return [] +async def _read_user_job(job_id: str, user_id: str) -> Dict[str, Any] | None: + job = await asyncio.to_thread(get_default_job_store().get, job_id) + if not job: + return None + if job.get("user_id") != user_id: + return None + return job + + +@router.get( + "/ingest/{job_id}/status", + response_model=APIResponse, + summary="Poll an async memory ingest job", +) +async def ingest_job_status(job_id: str, request: Request, user: dict = Depends(require_api_key)): + start = time.perf_counter() + job = await _read_user_job(job_id, _current_user_id(user)) + if not job: + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _error(request, "Job not found.", 404, elapsed) + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _wrap(request, _job_status_data(job), elapsed) + + +@router.get( + "/jobs/{job_id}/status", + response_model=APIResponse, + summary="Poll an async memory job", +) +async def memory_job_status(job_id: str, request: Request, user: dict = Depends(require_api_key)): + start = time.perf_counter() + job = await _read_user_job(job_id, _current_user_id(user)) + if not job: + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _error(request, "Job not found.", 404, elapsed) + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _wrap(request, _job_status_data(job), elapsed) + + # POST /v1/memory/batch-ingest @router.post( "/batch-ingest", @@ -604,38 +761,41 @@ def _safe_classifications(result: Dict[str, Any]) -> list: ) async def batch_ingest_memory(req: BatchIngestRequest, request: Request, user: dict = Depends(require_api_key)): start = time.perf_counter() - pipeline = get_ingest_pipeline() - user_id = user.get("username") or user.get("name") or user["id"] - - results = [] + user_id = _current_user_id(user) + payload = req.model_dump() + payload["user_id"] = user_id - for item in req.items: - result = await asyncio.wait_for( - pipeline.run( - user_query=item.user_query, - agent_response=item.agent_response or "Acknowledged.", - user_id=user_id, - session_datetime=item.session_datetime, - image_url=item.image_url, - effort_level=item.effort_level, - ), - timeout=120.0 + try: + store = get_default_job_store() + job, created = await asyncio.to_thread( + store.enqueue, + job_type="memory_batch_ingest", + payload=payload, + idempotency_fields={ + "user_id": user_id, + "items": payload["items"], + }, + user_id=user_id, + timeout_seconds=max(120.0, min(len(req.items) * 120.0, 3600.0)), + max_attempts=3, ) - - data = IngestResponse( - model=_model_name(pipeline.model), - classification=_safe_classifications(result), - profile=_build_domain_result(result.get("profile_judge"), result.get("profile_weaver")), - temporal=_build_domain_result(result.get("temporal_judge"), result.get("temporal_weaver")), - summary=_build_domain_result(result.get("summary_judge"), result.get("summary_weaver")), - image=_build_domain_result(result.get("image_judge"), result.get("image_weaver")), + _schedule_job( + job, + lambda: _run_batch_ingest_payload(payload, user_id), + ) + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _job_accepted( + request, + job, + created, + f"/v1/memory/jobs/{job['job_id']}/status", + elapsed, ) - results.append(data) - response_data = BatchIngestResponse(results=results) - - elapsed = round((time.perf_counter() - start) * 1000, 2) - return _wrap(request, response_data, elapsed) + except Exception as exc: + elapsed = round((time.perf_counter() - start) * 1000, 2) + logger.exception("Batch ingest enqueue failed for user=%s", user_id) + return _error(request, str(exc), 500, elapsed) @@ -771,26 +931,50 @@ async def _search_summary(pipeline: RetrievalPipeline, query: str, user_id: str, ) async def scrape_chat_link(req: ScrapeRequest, request: Request): start = time.perf_counter() - url = req.url - - try: - result = await _scrape_chat_share(url) - pairs = result["pairs"] - - if not pairs: - elapsed = round((time.perf_counter() - start) * 1000, 2) - return _error(request, _chat_share_error_message(result), 400, elapsed) + payload = req.model_dump() - data = ScrapeResponse(pairs=pairs) + try: + store = get_default_job_store() + job, created = await asyncio.to_thread( + store.enqueue, + job_type="memory_scrape", + payload=payload, + idempotency_fields={"url": req.url}, + user_id="anonymous", + timeout_seconds=60.0, + max_attempts=2, + ) + _schedule_job(job, lambda: _run_scrape_payload(payload)) elapsed = round((time.perf_counter() - start) * 1000, 2) - return _wrap(request, data, elapsed) + return _job_accepted( + request, + job, + created, + f"/v1/memory/scrape/{job['job_id']}/status", + elapsed, + ) except Exception as exc: elapsed = round((time.perf_counter() - start) * 1000, 2) - logger.exception("Scrape failed for url=%s", url) + logger.exception("Scrape enqueue failed for url=%s", req.url) return _error(request, str(exc) or repr(exc), 500, elapsed) +@scrape_router.get( + "/scrape/{job_id}/status", + response_model=APIResponse, + summary="Poll an async scrape job", +) +async def scrape_job_status(job_id: str, request: Request): + start = time.perf_counter() + job = await asyncio.to_thread(get_default_job_store().get, job_id) + if not job or job.get("user_id") != "anonymous": + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _error(request, "Job not found.", 404, elapsed) + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _wrap(request, _job_status_data(job), elapsed) + + # POST /v1/memory/parse_transcript @scrape_router.post( diff --git a/src/api/routes/scanner.py b/src/api/routes/scanner.py index 12a0b96..22d2d97 100644 --- a/src/api/routes/scanner.py +++ b/src/api/routes/scanner.py @@ -39,6 +39,14 @@ from src.api.dependencies import require_api_key from src.config import settings +from src.jobs.durable import ( + FAILED, + QUEUED, + get_default_job_store, + new_attempt_id, + run_job, + serialize_job, +) logger = logging.getLogger("xmem.api.routes.scanner") @@ -59,6 +67,29 @@ def _get_code_store(): return _code_store_singleton +def _schedule_durable_job(job: Dict[str, Any], handler) -> None: + if job.get("status") in {QUEUED, FAILED}: + asyncio.create_task(run_job(get_default_job_store(), job["job_id"], handler)) + + +async def _public_durable_job(job_id: str) -> Optional[Dict[str, Any]]: + job = await asyncio.to_thread(get_default_job_store().get, job_id) + public = serialize_job(job) + if not public: + return None + return { + "job_id": public.get("job_id"), + "status": public.get("status"), + "retry_count": public.get("retry_count", 0), + "max_attempts": public.get("max_attempts", 0), + "timeout_seconds": public.get("timeout_seconds"), + "error": public.get("error"), + "error_state": public.get("error_state"), + "updated_at": public.get("updated_at"), + "dead_lettered_at": public.get("dead_lettered_at"), + } + + # ═══════════════════════════════════════════════════════════════════════════ # Request schemas # ═══════════════════════════════════════════════════════════════════════════ @@ -583,6 +614,46 @@ async def _run_phase2_pipeline_only( _cleanup_clone(org, repo) + +async def _run_scan_job( + job_id: str, + username: str, + org: str, + repo: str, + url: str, + branch: str, + pat: str, + force_full: bool, +) -> Dict[str, Any]: + await _run_scan_pipeline(job_id, username, org, repo, url, branch, pat, force_full) + job = _get_code_store().get_scanner_job(job_id) or {} + if job.get("phase1_status") == "failed" or job.get("phase2_status") == "failed": + raise RuntimeError(job.get("error") or "Scanner job failed") + return { + "scanner_job_id": job_id, + "phase1_status": job.get("phase1_status"), + "phase2_status": job.get("phase2_status"), + } + + +async def _run_phase2_job( + job_id: str, + username: str, + org: str, + repo: str, + url: str, + branch: str, +) -> Dict[str, Any]: + await _run_phase2_pipeline_only(job_id, username, org, repo, url, branch) + job = _get_code_store().get_scanner_job(job_id) or {} + if job.get("phase2_status") == "failed": + raise RuntimeError(job.get("error") or "Scanner Phase 2 job failed") + return { + "scanner_job_id": job_id, + "phase1_status": job.get("phase1_status"), + "phase2_status": job.get("phase2_status"), + } + @router.post( "/validate-url", summary="Validate a GitHub URL and check accessibility", @@ -659,7 +730,8 @@ class PauseScanRequest(BaseModel): @router.post("/pause", summary="Pause an in-progress scan") async def pause_scan(req: PauseScanRequest, user: dict = Depends(require_api_key)): - job_id = f"{req.username}:{req.org_id}:{req.repo}" + username = user.get("username") or user.get("name") or user["id"] + job_id = f"{username}:{req.org_id}:{req.repo}" store = _get_code_store() job = store.get_scanner_job(job_id) @@ -697,7 +769,8 @@ class ResumeScanRequest(BaseModel): @router.post("/resume", summary="Resume a paused scan") async def resume_scan(req: ResumeScanRequest, user: dict = Depends(require_api_key)): - job_id = f"{req.username}:{req.org_id}:{req.repo}" + username = user.get("username") or user.get("name") or user["id"] + job_id = f"{username}:{req.org_id}:{req.repo}" store = _get_code_store() job = store.get_scanner_job(job_id) @@ -721,35 +794,83 @@ async def resume_scan(req: ResumeScanRequest, user: dict = Depends(require_api_k started_at = job.get("started_at", time.time()) if p1 == "paused": + durable_job, _ = await asyncio.to_thread( + get_default_job_store().enqueue, + job_type="scanner_scan_resume", + payload={ + "scanner_job_id": job_id, + "org": req.org_id, + "repo": req.repo, + "branch": branch, + "github_url": url, + }, + idempotency_fields={ + "resume_attempt_id": new_attempt_id(), + "scanner_job_id": job_id, + }, + user_id=username, + timeout_seconds=1800.0, + max_attempts=2, + ) store.upsert_scanner_job( - job_id=job_id, username=req.username, org=req.org_id, repo=req.repo, + job_id=job_id, username=username, org=req.org_id, repo=req.repo, branch=branch, url=url, phase1_status="running", phase2_status="pending", - started_at=started_at, error=None, + started_at=started_at, error=None, durable_job_id=durable_job["job_id"], + retry_count=int(durable_job.get("retry_count") or 0), + timeout_seconds=float(durable_job.get("timeout_seconds") or 1800.0), ) - asyncio.create_task( - _run_scan_pipeline(job_id, req.username, req.org_id, req.repo, url, branch, ""), + _schedule_durable_job( + durable_job, + lambda: _run_scan_job( + job_id, username, req.org_id, req.repo, url, branch, "", True, + ), ) logger.info("Resumed Phase 1 for %s/%s", req.org_id, req.repo) return JSONResponse({ "status": "ok", "message": "Scan resumed from Phase 1.", + "durable_job_id": durable_job["job_id"], + "durable_status": durable_job.get("status"), "phase1_status": "running", "phase2_status": "pending", }) # p2 == "paused" + durable_job, _ = await asyncio.to_thread( + get_default_job_store().enqueue, + job_type="scanner_phase2_resume", + payload={ + "scanner_job_id": job_id, + "org": req.org_id, + "repo": req.repo, + "branch": branch, + "github_url": url, + }, + idempotency_fields={ + "resume_attempt_id": new_attempt_id(), + "scanner_job_id": job_id, + }, + user_id=username, + timeout_seconds=1800.0, + max_attempts=2, + ) store.upsert_scanner_job( - job_id=job_id, username=req.username, org=req.org_id, repo=req.repo, + job_id=job_id, username=username, org=req.org_id, repo=req.repo, branch=branch, url=url, phase1_status="complete", phase2_status="running", - started_at=started_at, error=None, + started_at=started_at, error=None, durable_job_id=durable_job["job_id"], + retry_count=int(durable_job.get("retry_count") or 0), + timeout_seconds=float(durable_job.get("timeout_seconds") or 1800.0), ) - asyncio.create_task( - _run_phase2_pipeline_only(job_id, req.username, req.org_id, req.repo, url, branch), + _schedule_durable_job( + durable_job, + lambda: _run_phase2_job(job_id, username, req.org_id, req.repo, url, branch), ) logger.info("Resumed Phase 2 for %s/%s", req.org_id, req.repo) return JSONResponse({ "status": "ok", "message": "Scan resumed from Phase 2.", + "durable_job_id": durable_job["job_id"], + "durable_status": durable_job.get("status"), "phase1_status": "complete", "phase2_status": "running", }) @@ -832,6 +953,33 @@ async def start_scan(req: ScanRequest, user: dict = Depends(require_api_key)): }) started_at = time.time() + durable_type = "scanner_phase2" if phase2_only else "scanner_scan" + durable_payload = { + "scanner_job_id": job_id, + "org": org, + "repo": repo, + "branch": branch, + "github_url": clone_url, + "force_full": req.force_full, + "remote_sha": remote_sha, + } + durable_job, _ = await asyncio.to_thread( + get_default_job_store().enqueue, + job_type=durable_type, + payload=durable_payload, + idempotency_fields={ + "user_id": username, + "org": org, + "repo": repo, + "branch": branch, + "remote_sha": remote_sha, + "phase2_only": phase2_only, + "force_full": req.force_full, + }, + user_id=username, + timeout_seconds=1800.0, + max_attempts=2, + ) store.upsert_scanner_job( job_id=job_id, username=username, @@ -843,18 +991,22 @@ async def start_scan(req: ScanRequest, user: dict = Depends(require_api_key)): phase2_status="running" if phase2_only else "pending", started_at=started_at, error=None, + durable_job_id=durable_job["job_id"], + retry_count=int(durable_job.get("retry_count") or 0), + timeout_seconds=float(durable_job.get("timeout_seconds") or 1800.0), ) store.upsert_user_repo_entry(username, org, repo, branch) if phase2_only: - asyncio.create_task( - _run_phase2_pipeline_only( - job_id, username, org, repo, clone_url, branch, - ), + _schedule_durable_job( + durable_job, + lambda: _run_phase2_job(job_id, username, org, repo, clone_url, branch), ) return JSONResponse({ "status": "ok", "job_id": job_id, + "durable_job_id": durable_job["job_id"], + "durable_status": durable_job.get("status"), "org": org, "repo": repo, "reused": False, @@ -864,8 +1016,9 @@ async def start_scan(req: ScanRequest, user: dict = Depends(require_api_key)): "phase2_status": "running", }) - asyncio.create_task( - _run_scan_pipeline( + _schedule_durable_job( + durable_job, + lambda: _run_scan_job( job_id, username, org, repo, clone_url, branch, req.pat, req.force_full, ), ) @@ -873,6 +1026,8 @@ async def start_scan(req: ScanRequest, user: dict = Depends(require_api_key)): return JSONResponse({ "status": "ok", "job_id": job_id, + "durable_job_id": durable_job["job_id"], + "durable_status": durable_job.get("status"), "org": org, "repo": repo, "reused": False, @@ -906,7 +1061,13 @@ async def scan_status( "phase2_status": job.get("phase2_status", "not_started"), "elapsed_seconds": round(elapsed, 1), "error": job.get("error"), + "error_state": job.get("error_state"), + "retry_count": job.get("retry_count", 0), + "timeout_seconds": job.get("timeout_seconds"), + "durable_job_id": job.get("durable_job_id"), } + if job.get("durable_job_id"): + resp["durable_job"] = await _public_durable_job(job["durable_job_id"]) pr = job.get("phase1_result") if isinstance(pr, dict) and pr.get("stats"): diff --git a/src/jobs/__init__.py b/src/jobs/__init__.py new file mode 100644 index 0000000..c69c4a7 --- /dev/null +++ b/src/jobs/__init__.py @@ -0,0 +1 @@ +"""Durable background job helpers.""" diff --git a/src/jobs/durable.py b/src/jobs/durable.py new file mode 100644 index 0000000..63dba53 --- /dev/null +++ b/src/jobs/durable.py @@ -0,0 +1,295 @@ +"""Mongo-backed durable job records and async runner helpers. + +The API routes still own the actual domain work. This module provides the +shared persistence contract: idempotency keys, status transitions, retry +counts, timeouts, error state, and dead-letter marking. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import time +import uuid +from datetime import datetime, timezone +from typing import Any, Awaitable, Callable, Dict, Mapping, Optional, Tuple + +logger = logging.getLogger("xmem.jobs.durable") + +JobHandler = Callable[[], Awaitable[Dict[str, Any] | None]] + +QUEUED = "queued" +RUNNING = "running" +SUCCEEDED = "succeeded" +FAILED = "failed" +DEAD_LETTER = "dead_letter" + +TERMINAL_STATUSES = {SUCCEEDED, DEAD_LETTER} +REDACTED_KEYS = { + "authorization", + "cookie", + "gh_token", + "pat", + "password", + "secret", + "token", +} + +_default_store: Optional["DurableJobStore"] = None + + +def utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _normalise(value: Any) -> Any: + if isinstance(value, Mapping): + return {str(k): _normalise(v) for k, v in sorted(value.items())} + if isinstance(value, (list, tuple)): + return [_normalise(v) for v in value] + if isinstance(value, datetime): + return value.isoformat() + return value + + +def stable_hash(value: Mapping[str, Any]) -> str: + encoded = json.dumps( + _normalise(value), + sort_keys=True, + separators=(",", ":"), + default=str, + ) + return hashlib.sha256(encoded.encode("utf-8")).hexdigest() + + +def idempotency_key(job_type: str, fields: Mapping[str, Any]) -> str: + return stable_hash({"job_type": job_type, "fields": fields}) + + +def redact_payload(value: Any) -> Any: + if isinstance(value, Mapping): + redacted: Dict[str, Any] = {} + for key, item in value.items(): + lowered = str(key).lower() + if any(secret_key in lowered for secret_key in REDACTED_KEYS): + redacted[str(key)] = "[redacted]" + else: + redacted[str(key)] = redact_payload(item) + return redacted + if isinstance(value, list): + return [redact_payload(item) for item in value] + return value + + +def serialize_job(doc: Optional[Mapping[str, Any]]) -> Optional[Dict[str, Any]]: + if doc is None: + return None + + def convert(value: Any) -> Any: + if value.__class__.__name__ == "ObjectId": + return str(value) + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, Mapping): + return {str(k): convert(v) for k, v in value.items()} + if isinstance(value, list): + return [convert(item) for item in value] + return value + + return convert(dict(doc)) + + +class DurableJobStore: + """Persistence layer for queue/workflow jobs.""" + + def __init__( + self, + uri: Optional[str] = None, + database: Optional[str] = None, + collection: str = "durable_jobs", + ) -> None: + from pymongo import MongoClient + from src.config import settings + + uri = uri or settings.mongodb_uri + database = database or settings.mongodb_database + + self._client = MongoClient(uri) + self._db = self._client[database] + self.jobs = self._db[collection] + self._ensure_indexes() + + def _ensure_indexes(self) -> None: + self.jobs.create_index([("job_id", 1)], unique=True) + self.jobs.create_index([("job_type", 1), ("idempotency_key", 1)], unique=True) + self.jobs.create_index([("user_id", 1), ("updated_at", -1)]) + self.jobs.create_index([("status", 1), ("updated_at", 1)]) + + def enqueue( + self, + *, + job_type: str, + payload: Mapping[str, Any], + idempotency_fields: Mapping[str, Any], + user_id: str, + timeout_seconds: float = 120.0, + max_attempts: int = 3, + ) -> Tuple[Dict[str, Any], bool]: + key = idempotency_key(job_type, idempotency_fields) + job_id = f"{job_type}:{key}" + now = utc_now() + doc = { + "job_id": job_id, + "job_type": job_type, + "idempotency_key": key, + "user_id": user_id, + "payload": redact_payload(payload), + "status": QUEUED, + "retry_count": 0, + "max_attempts": max_attempts, + "timeout_seconds": timeout_seconds, + "error": None, + "error_state": None, + "result": None, + "created_at": now, + "updated_at": now, + "started_at": None, + "completed_at": None, + "dead_lettered_at": None, + } + try: + self.jobs.insert_one(doc) + return doc, True + except Exception as exc: + if exc.__class__.__name__ != "DuplicateKeyError": + raise + existing = self.get(job_id) + if existing is None: + existing = self.jobs.find_one({ + "job_type": job_type, + "idempotency_key": key, + }) + if existing is None: + raise + return existing, False + + def get(self, job_id: str) -> Optional[Dict[str, Any]]: + return self.jobs.find_one({"job_id": job_id}) + + def mark_running(self, job_id: str) -> None: + self.jobs.update_one( + {"job_id": job_id}, + { + "$set": { + "status": RUNNING, + "started_at": utc_now(), + "updated_at": utc_now(), + "error": None, + "error_state": None, + }, + "$inc": {"retry_count": 1}, + }, + ) + + def mark_succeeded( + self, + job_id: str, + result: Mapping[str, Any] | None = None, + ) -> None: + self.jobs.update_one( + {"job_id": job_id}, + { + "$set": { + "status": SUCCEEDED, + "result": _normalise(result or {}), + "error": None, + "error_state": None, + "completed_at": utc_now(), + "updated_at": utc_now(), + }, + }, + ) + + def mark_failed(self, job_id: str, error: str) -> str: + job = self.get(job_id) or {} + retry_count = int(job.get("retry_count") or 0) + max_attempts = int(job.get("max_attempts") or 1) + status = DEAD_LETTER if retry_count >= max_attempts else FAILED + update: Dict[str, Any] = { + "status": status, + "error": error, + "error_state": { + "message": error, + "failed_at": utc_now(), + "attempt": retry_count, + }, + "updated_at": utc_now(), + } + if status == DEAD_LETTER: + update["dead_lettered_at"] = utc_now() + update["completed_at"] = utc_now() + self.jobs.update_one({"job_id": job_id}, {"$set": update}) + return status + + def reset_for_retry(self, job_id: str) -> None: + self.jobs.update_one( + {"job_id": job_id}, + {"$set": {"status": QUEUED, "updated_at": utc_now()}}, + ) + + +def get_default_job_store() -> DurableJobStore: + global _default_store + if _default_store is None: + _default_store = DurableJobStore() + return _default_store + + +async def run_job( + store: DurableJobStore, + job_id: str, + handler: JobHandler, + *, + retry_base_seconds: float = 1.0, +) -> None: + """Run one queued job and persist lifecycle transitions.""" + while True: + job = await asyncio.to_thread(store.get, job_id) + if not job: + logger.warning("Durable job %s disappeared before execution", job_id) + return + if job.get("status") in TERMINAL_STATUSES: + return + + timeout_seconds = float(job.get("timeout_seconds") or 120.0) + await asyncio.to_thread(store.mark_running, job_id) + started = time.perf_counter() + try: + result = await asyncio.wait_for(handler(), timeout=timeout_seconds) + payload = dict(result or {}) + payload["elapsed_ms"] = round((time.perf_counter() - started) * 1000, 2) + await asyncio.to_thread(store.mark_succeeded, job_id, payload) + return + except Exception as exc: + error = str(exc) or exc.__class__.__name__ + status = await asyncio.to_thread(store.mark_failed, job_id, error) + if status == DEAD_LETTER: + logger.exception("Durable job %s dead-lettered: %s", job_id, error) + return + job = await asyncio.to_thread(store.get, job_id) or {} + retry_count = int(job.get("retry_count") or 1) + delay = min(retry_base_seconds * (2 ** max(retry_count - 1, 0)), 30.0) + logger.warning( + "Durable job %s failed; retrying in %.1fs: %s", + job_id, + delay, + error, + ) + await asyncio.sleep(delay) + await asyncio.to_thread(store.reset_for_retry, job_id) + + +def new_attempt_id() -> str: + return uuid.uuid4().hex diff --git a/src/scanner/code_store.py b/src/scanner/code_store.py index 213eb15..b7204f2 100644 --- a/src/scanner/code_store.py +++ b/src/scanner/code_store.py @@ -149,8 +149,12 @@ def upsert_scanner_job( error: Optional[str] = None, phase1_result: Optional[Dict[str, Any]] = None, phase2_result: Optional[Dict[str, Any]] = None, + durable_job_id: Optional[str] = None, + retry_count: Optional[int] = None, + timeout_seconds: Optional[float] = None, ) -> None: """Persist or update scanner dashboard job state.""" + rollup_status = phase2_status if phase1_status == "complete" else phase1_status doc: Dict[str, Any] = { "job_id": job_id, "username": username, @@ -160,14 +164,22 @@ def upsert_scanner_job( "url": url, "phase1_status": phase1_status, "phase2_status": phase2_status, + "status": rollup_status, "started_at": started_at, "updated_at": datetime.now(timezone.utc), "error": error, + "error_state": {"message": error} if error else None, } if phase1_result is not None: doc["phase1_result"] = phase1_result if phase2_result is not None: doc["phase2_result"] = phase2_result + if durable_job_id is not None: + doc["durable_job_id"] = durable_job_id + if retry_count is not None: + doc["retry_count"] = retry_count + if timeout_seconds is not None: + doc["timeout_seconds"] = timeout_seconds self.scanner_jobs.update_one( {"job_id": job_id}, diff --git a/tests/test_durable_jobs.py b/tests/test_durable_jobs.py new file mode 100644 index 0000000..b6fbda6 --- /dev/null +++ b/tests/test_durable_jobs.py @@ -0,0 +1,130 @@ +import os + +import pytest + +os.environ.setdefault("PINECONE_API_KEY", "test-pinecone-key") +os.environ.setdefault("NEO4J_PASSWORD", "test-neo4j-password") +os.environ.setdefault("GEMINI_API_KEY", "test-gemini-key") + +from src.jobs.durable import ( + DEAD_LETTER, + FAILED, + QUEUED, + RUNNING, + SUCCEEDED, + idempotency_key, + redact_payload, + run_job, + utc_now, +) + + +def test_idempotency_key_is_stable_for_equivalent_payloads(): + left = idempotency_key("memory_ingest", {"b": 2, "a": {"z": 1, "y": 0}}) + right = idempotency_key("memory_ingest", {"a": {"y": 0, "z": 1}, "b": 2}) + + assert left == right + + +def test_redact_payload_masks_nested_secret_fields(): + payload = { + "github_url": "https://github.com/acme/repo", + "pat": "ghp_secret", + "nested": { + "Authorization": "Bearer token", + "client_secret": "secret", + "ok": "visible", + }, + } + + redacted = redact_payload(payload) + + assert redacted["pat"] == "[redacted]" + assert redacted["nested"]["Authorization"] == "[redacted]" + assert redacted["nested"]["client_secret"] == "[redacted]" + assert redacted["nested"]["ok"] == "visible" + + +class FakeJobStore: + def __init__(self, job): + self.job = job + + def get(self, job_id): + assert job_id == self.job["job_id"] + return dict(self.job) + + def mark_running(self, job_id): + assert job_id == self.job["job_id"] + self.job["status"] = RUNNING + self.job["retry_count"] = self.job.get("retry_count", 0) + 1 + + def mark_succeeded(self, job_id, result=None): + assert job_id == self.job["job_id"] + self.job["status"] = SUCCEEDED + self.job["result"] = dict(result or {}) + + def mark_failed(self, job_id, error): + assert job_id == self.job["job_id"] + status = ( + DEAD_LETTER + if self.job.get("retry_count", 0) >= self.job.get("max_attempts", 1) + else FAILED + ) + self.job["status"] = status + self.job["error"] = error + self.job["error_state"] = { + "message": error, + "failed_at": utc_now(), + "attempt": self.job.get("retry_count", 0), + } + return status + + def reset_for_retry(self, job_id): + assert job_id == self.job["job_id"] + self.job["status"] = QUEUED + + +@pytest.mark.asyncio +async def test_run_job_retries_then_succeeds(): + store = FakeJobStore({ + "job_id": "job-1", + "status": QUEUED, + "retry_count": 0, + "max_attempts": 2, + "timeout_seconds": 1, + }) + attempts = 0 + + async def handler(): + nonlocal attempts + attempts += 1 + if attempts == 1: + raise RuntimeError("transient failure") + return {"ok": True} + + await run_job(store, "job-1", handler, retry_base_seconds=0) + + assert attempts == 2 + assert store.job["status"] == SUCCEEDED + assert store.job["retry_count"] == 2 + assert store.job["result"]["ok"] is True + + +@pytest.mark.asyncio +async def test_run_job_dead_letters_after_max_attempts(): + store = FakeJobStore({ + "job_id": "job-2", + "status": QUEUED, + "retry_count": 0, + "max_attempts": 1, + "timeout_seconds": 1, + }) + + async def handler(): + raise RuntimeError("permanent failure") + + await run_job(store, "job-2", handler, retry_base_seconds=0) + + assert store.job["status"] == DEAD_LETTER + assert store.job["retry_count"] == 1 + assert store.job["error"] == "permanent failure" From 427746829e6459dd7e70c327351a30e2abbd89be Mon Sep 17 00:00:00 2001 From: hunterbastian <81837742+hunterbastian@users.noreply.github.com> Date: Tue, 12 May 2026 00:25:47 -0400 Subject: [PATCH 2/4] Restore v1 memory routes and add v2 durable ingest --- src/api/app.py | 4 + src/api/routes/memory.py | 109 +++++++++++++++++++++++-- tests/api/test_memory_versioning.py | 121 ++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 7 deletions(-) create mode 100644 tests/api/test_memory_versioning.py diff --git a/src/api/app.py b/src/api/app.py index 7b9dc79..889151e 100644 --- a/src/api/app.py +++ b/src/api/app.py @@ -34,6 +34,8 @@ from src.api.routes.health import router as health_router from src.api.routes.memory import router as memory_router from src.api.routes.memory import scrape_router as memory_scrape_router +from src.api.routes.memory import v2_router as memory_v2_router +from src.api.routes.memory import v2_scrape_router as memory_v2_scrape_router from src.api.routes.memory_graph import router as memory_graph_router from src.api.routes.scanner import router as scanner_router from src.api.routes.telemetry import router as telemetry_router @@ -156,6 +158,8 @@ async def lifespan(app: FastAPI): app.include_router(health_router) app.include_router(memory_scrape_router) app.include_router(memory_router) + app.include_router(memory_v2_scrape_router) + app.include_router(memory_v2_router) app.include_router(memory_graph_router) app.include_router(code_router) app.include_router(scanner_router) diff --git a/src/api/routes/memory.py b/src/api/routes/memory.py index 4da7fc3..592973c 100644 --- a/src/api/routes/memory.py +++ b/src/api/routes/memory.py @@ -1,5 +1,5 @@ """ -/v1/memory/* routes — production endpoints for XMem memory operations. +/v1/memory/* and /v2/memory/* routes - production endpoints for XMem memory operations. All routes require a valid Bearer API key and respect the per-key rate limit. """ @@ -24,6 +24,7 @@ from src.api.schemas import ( APIResponse, BatchIngestRequest, + BatchIngestResponse, DomainResult, IngestRequest, IngestResponse, @@ -70,6 +71,18 @@ dependencies=[Depends(enforce_rate_limit)], ) +v2_router = APIRouter( + prefix="/v2/memory", + tags=["memory"], + dependencies=[Depends(require_ready), Depends(enforce_rate_limit)], +) + +v2_scrape_router = APIRouter( + prefix="/v2/memory", + tags=["memory"], + dependencies=[Depends(enforce_rate_limit)], +) + # Helpers def _model_name(model: Any) -> str: @@ -668,6 +681,31 @@ async def ingest_memory(req: IngestRequest, request: Request, user: dict = Depen start = time.perf_counter() user_id = _current_user_id(user) payload = req.model_dump() + + try: + data = await asyncio.wait_for( + _run_ingest_payload(payload, user_id), + timeout=120.0, + ) + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _wrap(request, data, elapsed) + + except Exception as exc: + elapsed = round((time.perf_counter() - start) * 1000, 2) + logger.exception("Ingest failed for user=%s", user_id) + return _error(request, str(exc), 500, elapsed) + + +# POST /v2/memory/ingest +@v2_router.post( + "/ingest", + response_model=APIResponse, + summary="Start an async durable memory ingest job", +) +async def ingest_memory_v2(req: IngestRequest, request: Request, user: dict = Depends(require_api_key)): + start = time.perf_counter() + user_id = _current_user_id(user) + payload = req.model_dump() payload["user_id"] = user_id try: @@ -697,7 +735,7 @@ async def ingest_memory(req: IngestRequest, request: Request, user: dict = Depen request, job, created, - f"/v1/memory/ingest/{job['job_id']}/status", + f"/v2/memory/ingest/{job['job_id']}/status", elapsed, ) @@ -723,7 +761,7 @@ async def _read_user_job(job_id: str, user_id: str) -> Dict[str, Any] | None: return job -@router.get( +@v2_router.get( "/ingest/{job_id}/status", response_model=APIResponse, summary="Poll an async memory ingest job", @@ -738,7 +776,7 @@ async def ingest_job_status(job_id: str, request: Request, user: dict = Depends( return _wrap(request, _job_status_data(job), elapsed) -@router.get( +@v2_router.get( "/jobs/{job_id}/status", response_model=APIResponse, summary="Poll an async memory job", @@ -762,6 +800,35 @@ async def memory_job_status(job_id: str, request: Request, user: dict = Depends( async def batch_ingest_memory(req: BatchIngestRequest, request: Request, user: dict = Depends(require_api_key)): start = time.perf_counter() user_id = _current_user_id(user) + + try: + results = [] + for item in req.items: + data = await asyncio.wait_for( + _run_ingest_payload(item.model_dump(), user_id), + timeout=120.0, + ) + results.append(IngestResponse(**data)) + + response_data = BatchIngestResponse(results=results) + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _wrap(request, response_data, elapsed) + + except Exception as exc: + elapsed = round((time.perf_counter() - start) * 1000, 2) + logger.exception("Batch ingest failed for user=%s", user_id) + return _error(request, str(exc), 500, elapsed) + + +# POST /v2/memory/batch-ingest +@v2_router.post( + "/batch-ingest", + response_model=APIResponse, + summary="Start an async durable batch memory ingest job", +) +async def batch_ingest_memory_v2(req: BatchIngestRequest, request: Request, user: dict = Depends(require_api_key)): + start = time.perf_counter() + user_id = _current_user_id(user) payload = req.model_dump() payload["user_id"] = user_id @@ -788,7 +855,7 @@ async def batch_ingest_memory(req: BatchIngestRequest, request: Request, user: d request, job, created, - f"/v1/memory/jobs/{job['job_id']}/status", + f"/v2/memory/jobs/{job['job_id']}/status", elapsed, ) @@ -930,6 +997,34 @@ async def _search_summary(pipeline: RetrievalPipeline, query: str, user_id: str, summary="Scrape a shared AI chat link into message pairs", ) async def scrape_chat_link(req: ScrapeRequest, request: Request): + start = time.perf_counter() + url = req.url + + try: + result = await _scrape_chat_share(url) + pairs = result["pairs"] + + if not pairs: + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _error(request, _chat_share_error_message(result), 400, elapsed) + + data = ScrapeResponse(pairs=pairs) + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _wrap(request, data, elapsed) + + except Exception as exc: + elapsed = round((time.perf_counter() - start) * 1000, 2) + logger.exception("Scrape failed for url=%s", url) + return _error(request, str(exc) or repr(exc), 500, elapsed) + + +# POST /v2/memory/scrape +@v2_scrape_router.post( + "/scrape", + response_model=APIResponse, + summary="Start an async durable scrape job", +) +async def scrape_chat_link_v2(req: ScrapeRequest, request: Request): start = time.perf_counter() payload = req.model_dump() @@ -950,7 +1045,7 @@ async def scrape_chat_link(req: ScrapeRequest, request: Request): request, job, created, - f"/v1/memory/scrape/{job['job_id']}/status", + f"/v2/memory/scrape/{job['job_id']}/status", elapsed, ) @@ -960,7 +1055,7 @@ async def scrape_chat_link(req: ScrapeRequest, request: Request): return _error(request, str(exc) or repr(exc), 500, elapsed) -@scrape_router.get( +@v2_scrape_router.get( "/scrape/{job_id}/status", response_model=APIResponse, summary="Poll an async scrape job", diff --git a/tests/api/test_memory_versioning.py b/tests/api/test_memory_versioning.py new file mode 100644 index 0000000..a7091cb --- /dev/null +++ b/tests/api/test_memory_versioning.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from types import SimpleNamespace + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from src.api import dependencies as deps +from src.api.routes import memory + + +class FakeIngestPipeline: + model = SimpleNamespace(model="fake-ingest") + + def __init__(self): + self.calls = [] + + async def run(self, **kwargs): + self.calls.append(kwargs) + return {"classification_result": SimpleNamespace(classifications=[])} + + +class FakeRetrievalPipeline: + model = SimpleNamespace(model="fake-retrieval") + + +class FakeJobStore: + def __init__(self): + self.jobs = {} + + def enqueue(self, *, job_type, payload, user_id, timeout_seconds, max_attempts, **_): + job = { + "job_id": f"{job_type}:fake", + "job_type": job_type, + "payload": payload, + "user_id": user_id, + "status": "queued", + "timeout_seconds": timeout_seconds, + "max_attempts": max_attempts, + "retry_count": 0, + } + self.jobs[job["job_id"]] = job + return job, True + + def get(self, job_id): + return self.jobs.get(job_id) + + +def _build_app(monkeypatch): + ingest = FakeIngestPipeline() + deps._init_error = None + deps._pipelines_ready.set() + deps.set_pipelines(ingest, FakeRetrievalPipeline()) + + async def fake_user(): + return {"id": "user-1", "username": "hunter"} + + async def fake_ready(): + return None + + async def fake_rate_limit(): + return {"id": "user-1", "username": "hunter"} + + app = FastAPI() + app.dependency_overrides[deps.require_api_key] = fake_user + app.dependency_overrides[deps.require_ready] = fake_ready + app.dependency_overrides[deps.enforce_rate_limit] = fake_rate_limit + app.include_router(memory.scrape_router) + app.include_router(memory.router) + app.include_router(memory.v2_scrape_router) + app.include_router(memory.v2_router) + return app, ingest + + +def test_v1_ingest_keeps_synchronous_response_contract(monkeypatch): + app, ingest = _build_app(monkeypatch) + payload = { + "user_query": "remember this", + "agent_response": "done", + "user_id": "body-user", + } + + response = TestClient(app).post("/v1/memory/ingest", json=payload) + + assert response.status_code == 200 + body = response.json() + assert body["status"] == "ok" + assert body["data"]["model"] == "fake-ingest" + assert "job_id" not in body["data"] + assert ingest.calls[0]["user_id"] == "hunter" + + +def test_v2_ingest_returns_durable_job_envelope(monkeypatch): + app, ingest = _build_app(monkeypatch) + store = FakeJobStore() + scheduled = [] + monkeypatch.setattr(memory, "get_default_job_store", lambda: store) + monkeypatch.setattr( + memory, + "_schedule_job", + lambda job, handler: scheduled.append(job["job_id"]), + ) + payload = { + "user_query": "remember this", + "agent_response": "done", + "user_id": "body-user", + } + + response = TestClient(app).post("/v2/memory/ingest", json=payload) + + assert response.status_code == 200 + body = response.json() + assert body["status"] == "ok" + assert body["data"] == { + "job_id": "memory_ingest:fake", + "status": "queued", + "created": True, + "status_url": "/v2/memory/ingest/memory_ingest:fake/status", + } + assert scheduled == ["memory_ingest:fake"] + assert ingest.calls == [] From 41f9a94bd5effed35a3b4021e4b876756ca4470a Mon Sep 17 00:00:00 2001 From: hunterbastian <81837742+hunterbastian@users.noreply.github.com> Date: Tue, 12 May 2026 07:15:57 -0400 Subject: [PATCH 3/4] Add v2 ingest benchmark helper --- .gitignore | 2 + docs/benchmarks.md | 33 ++++++ scripts/benchmark_v2_ingest.py | 196 +++++++++++++++++++++++++++++++++ 3 files changed, 231 insertions(+) create mode 100644 scripts/benchmark_v2_ingest.py diff --git a/.gitignore b/.gitignore index fc64ccb..880aaf7 100644 --- a/.gitignore +++ b/.gitignore @@ -53,6 +53,8 @@ logs/ # Dev-only folders frontend/ scripts/ +!scripts/ +!scripts/benchmark_v2_ingest.py tests/ !tests/ !tests/**/*.py diff --git a/docs/benchmarks.md b/docs/benchmarks.md index e69de29..b2f24f3 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -0,0 +1,33 @@ +# Benchmarks + +## v2 Durable Ingest Staging Check + +Use `scripts/benchmark_v2_ingest.py` to measure the new durable ingest route in +a staging environment without putting credentials in shell history, PR comments, +or logs. The script reads the Bearer token from an environment variable, posts a +single low-effort ingest payload to `/v2/memory/ingest`, then polls the returned +status URL until the durable job reaches a terminal state or the timeout expires. + +```bash +export XMEM_API_KEY="..." +python scripts/benchmark_v2_ingest.py --base-url https://staging.example.com +``` + +For a production-like sample, pass an explicit payload file: + +```bash +python scripts/benchmark_v2_ingest.py \ + --base-url https://staging.example.com \ + --payload-file ./benchmark-payload.json +``` + +The output intentionally excludes the API key and reports: + +- `enqueue_latency_ms`: latency for the initial `POST /v2/memory/ingest` request. +- `completion_latency_ms`: elapsed time until the durable job is terminal. +- `final_status`: `succeeded`, `dead_letter`, or the last observed nonterminal + status if the timeout expires. +- `poll_count`: number of status polling requests made after enqueue. + +This separates the expected fast enqueue path from the total pipeline completion +time, which is the useful comparison for the v1 synchronous route. diff --git a/scripts/benchmark_v2_ingest.py b/scripts/benchmark_v2_ingest.py new file mode 100644 index 0000000..204e247 --- /dev/null +++ b/scripts/benchmark_v2_ingest.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +"""Benchmark the durable v2 memory ingest endpoint against a live environment.""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from urllib.parse import urljoin + +import httpx + +TERMINAL_STATUSES = {"succeeded", "dead_letter"} + + +def _default_payload() -> dict[str, Any]: + return { + "user_query": "Benchmark the v2 durable ingest endpoint.", + "agent_response": "This is a staging benchmark payload.", + "session_datetime": datetime.now(timezone.utc).isoformat(), + "effort_level": "low", + } + + +def _load_payload(path: str | None) -> dict[str, Any]: + if path is None: + return _default_payload() + with Path(path).open("r", encoding="utf-8") as handle: + payload = json.load(handle) + if not isinstance(payload, dict): + raise ValueError("payload file must contain a JSON object") + return payload + + +def _api_data(body: dict[str, Any]) -> dict[str, Any]: + data = body.get("data", body) + if not isinstance(data, dict): + raise ValueError("API response did not contain an object data field") + return data + + +def _status_endpoint(base_url: str, status_url: str) -> str: + if status_url.startswith(("http://", "https://")): + return status_url + return urljoin(base_url.rstrip("/") + "/", status_url.lstrip("/")) + + +def _request_json( + client: httpx.Client, + method: str, + url: str, + **kwargs: Any, +) -> tuple[dict[str, Any], float]: + started = time.perf_counter() + response = client.request(method, url, **kwargs) + elapsed_ms = (time.perf_counter() - started) * 1000 + response.raise_for_status() + body = response.json() + if not isinstance(body, dict): + raise ValueError("API response was not a JSON object") + return body, elapsed_ms + + +def run_benchmark(args: argparse.Namespace) -> dict[str, Any]: + api_key = os.environ.get(args.api_key_env, "").strip() + if not api_key: + raise ValueError(f"{args.api_key_env} is required") + + base_url = args.base_url.rstrip("/") + ingest_url = f"{base_url}/v2/memory/ingest" + payload = _load_payload(args.payload_file) + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + + overall_started = time.perf_counter() + with httpx.Client(timeout=args.request_timeout) as client: + accepted_body, enqueue_latency_ms = _request_json( + client, + "POST", + ingest_url, + headers=headers, + json=payload, + ) + accepted = _api_data(accepted_body) + job_id = accepted.get("job_id") + status_url = accepted.get("status_url") + if not isinstance(job_id, str) or not isinstance(status_url, str): + raise ValueError("v2 ingest response must include job_id and status_url") + + status_endpoint = _status_endpoint(base_url, status_url) + final_data = accepted + completion_latency_ms: float | None = None + poll_count = 0 + deadline = time.monotonic() + args.completion_timeout + + while time.monotonic() < deadline: + status = str(final_data.get("status", "unknown")) + if status in TERMINAL_STATUSES: + completion_latency_ms = ( + time.perf_counter() - overall_started + ) * 1000 + break + + time.sleep(args.poll_interval) + poll_count += 1 + status_body, _ = _request_json( + client, + "GET", + status_endpoint, + headers=headers, + ) + final_data = _api_data(status_body) + + if completion_latency_ms is None: + completion_latency_ms = (time.perf_counter() - overall_started) * 1000 + + return { + "base_url": base_url, + "endpoint": "/v2/memory/ingest", + "job_id": job_id, + "initial_status": accepted.get("status"), + "final_status": final_data.get("status"), + "enqueue_latency_ms": round(enqueue_latency_ms, 2), + "completion_latency_ms": round(completion_latency_ms, 2), + "poll_count": poll_count, + "created": accepted.get("created"), + "status_url": status_url, + "timed_out": str(final_data.get("status")) not in TERMINAL_STATUSES, + "final_error": final_data.get("error"), + } + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Measure POST /v2/memory/ingest enqueue latency and durable job " + "completion latency for a staging XMem API." + ), + ) + parser.add_argument( + "--base-url", + required=True, + help="Base URL for the target XMem API, for example https://staging.example.com", + ) + parser.add_argument( + "--api-key-env", + default="XMEM_API_KEY", + help="Environment variable containing the Bearer API key.", + ) + parser.add_argument( + "--payload-file", + help="Optional JSON file to use instead of the default low-effort payload.", + ) + parser.add_argument( + "--poll-interval", + type=float, + default=2.0, + help="Seconds between status polling requests.", + ) + parser.add_argument( + "--completion-timeout", + type=float, + default=300.0, + help="Maximum seconds to wait for the durable job to finish.", + ) + parser.add_argument( + "--request-timeout", + type=float, + default=30.0, + help="HTTP timeout in seconds for each request.", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + result = run_benchmark(args) + except Exception as exc: + print(f"benchmark failed: {exc}", file=sys.stderr) + return 1 + + print(json.dumps(result, indent=2, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) From 67761f204111777201f03cf7cb1180649ed6dc52 Mon Sep 17 00:00:00 2001 From: hunterbastian <81837742+hunterbastian@users.noreply.github.com> Date: Tue, 12 May 2026 14:29:19 -0400 Subject: [PATCH 4/4] Centralize scanner durable job timeout --- src/api/routes/scanner.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/api/routes/scanner.py b/src/api/routes/scanner.py index 22d2d97..655a48f 100644 --- a/src/api/routes/scanner.py +++ b/src/api/routes/scanner.py @@ -53,6 +53,7 @@ router = APIRouter(prefix="/v1/scanner", tags=["scanner"]) _code_store_singleton: Any = None +SCANNER_DURABLE_TIMEOUT_SECONDS = 1800.0 def _get_code_store(): @@ -72,6 +73,10 @@ def _schedule_durable_job(job: Dict[str, Any], handler) -> None: asyncio.create_task(run_job(get_default_job_store(), job["job_id"], handler)) +def _durable_timeout_seconds(job: Dict[str, Any]) -> float: + return float(job.get("timeout_seconds") or SCANNER_DURABLE_TIMEOUT_SECONDS) + + async def _public_durable_job(job_id: str) -> Optional[Dict[str, Any]]: job = await asyncio.to_thread(get_default_job_store().get, job_id) public = serialize_job(job) @@ -809,7 +814,7 @@ async def resume_scan(req: ResumeScanRequest, user: dict = Depends(require_api_k "scanner_job_id": job_id, }, user_id=username, - timeout_seconds=1800.0, + timeout_seconds=SCANNER_DURABLE_TIMEOUT_SECONDS, max_attempts=2, ) store.upsert_scanner_job( @@ -817,7 +822,7 @@ async def resume_scan(req: ResumeScanRequest, user: dict = Depends(require_api_k branch=branch, url=url, phase1_status="running", phase2_status="pending", started_at=started_at, error=None, durable_job_id=durable_job["job_id"], retry_count=int(durable_job.get("retry_count") or 0), - timeout_seconds=float(durable_job.get("timeout_seconds") or 1800.0), + timeout_seconds=_durable_timeout_seconds(durable_job), ) _schedule_durable_job( durable_job, @@ -851,7 +856,7 @@ async def resume_scan(req: ResumeScanRequest, user: dict = Depends(require_api_k "scanner_job_id": job_id, }, user_id=username, - timeout_seconds=1800.0, + timeout_seconds=SCANNER_DURABLE_TIMEOUT_SECONDS, max_attempts=2, ) store.upsert_scanner_job( @@ -859,7 +864,7 @@ async def resume_scan(req: ResumeScanRequest, user: dict = Depends(require_api_k branch=branch, url=url, phase1_status="complete", phase2_status="running", started_at=started_at, error=None, durable_job_id=durable_job["job_id"], retry_count=int(durable_job.get("retry_count") or 0), - timeout_seconds=float(durable_job.get("timeout_seconds") or 1800.0), + timeout_seconds=_durable_timeout_seconds(durable_job), ) _schedule_durable_job( durable_job, @@ -977,7 +982,7 @@ async def start_scan(req: ScanRequest, user: dict = Depends(require_api_key)): "force_full": req.force_full, }, user_id=username, - timeout_seconds=1800.0, + timeout_seconds=SCANNER_DURABLE_TIMEOUT_SECONDS, max_attempts=2, ) store.upsert_scanner_job( @@ -993,7 +998,7 @@ async def start_scan(req: ScanRequest, user: dict = Depends(require_api_key)): error=None, durable_job_id=durable_job["job_id"], retry_count=int(durable_job.get("retry_count") or 0), - timeout_seconds=float(durable_job.get("timeout_seconds") or 1800.0), + timeout_seconds=_durable_timeout_seconds(durable_job), ) store.upsert_user_repo_entry(username, org, repo, branch)