Add durable job tracking for ingest and scanner work#177
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a durable background job system using MongoDB to handle long-running tasks like memory ingestion and repository scanning asynchronously. Key changes include the addition of a DurableJobStore for job persistence, idempotency management, and retry logic, along with the refactoring of existing API routes to utilize this new system. Feedback highlights a critical performance issue where the use of the synchronous pymongo driver could block the FastAPI event loop, and a potential for ID collisions due to truncation of the idempotency hash. Additionally, there are suggestions to deduplicate scheduling logic and centralize hardcoded timeout constants to improve maintainability.
| database: Optional[str] = None, | ||
| collection: str = "durable_jobs", | ||
| ) -> None: | ||
| from pymongo import MongoClient |
There was a problem hiding this comment.
The DurableJobStore implementation uses pymongo.MongoClient, which is a synchronous driver. In an asynchronous FastAPI application, performing synchronous database operations on the main event loop will block it, preventing other requests from being handled and significantly reducing the application's scalability. It is highly recommended to use an asynchronous driver like motor.
| max_attempts: int = 3, | ||
| ) -> Tuple[Dict[str, Any], bool]: | ||
| key = idempotency_key(job_type, idempotency_fields) | ||
| job_id = f"{job_type}:{key[:24]}" |
There was a problem hiding this comment.
Truncating the idempotency hash to 24 characters for the job_id increases the risk of collisions. If two different payloads result in the same 24-character prefix, the enqueue method will incorrectly return the existing job for the first payload, breaking the idempotency contract. Since MongoDB handles longer strings efficiently, it is safer to use the full hash.
| job_id = f"{job_type}:{key[:24]}" | |
| job_id = f"{job_type}:{key}" |
| def _schedule_job(job: Dict[str, Any], handler) -> None: | ||
| if job.get("status") in {QUEUED, FAILED}: | ||
| asyncio.create_task(run_job(get_default_job_store(), job["job_id"], handler)) |
There was a problem hiding this comment.
This _schedule_job helper is duplicated in src/api/routes/scanner.py (as _schedule_durable_job). To improve maintainability and follow the DRY (Don't Repeat Yourself) principle, consider moving this logic to a shared location, such as a method within DurableJobStore or a utility function in src/jobs/durable.py.
| "scanner_job_id": job_id, | ||
| }, | ||
| user_id=req.username, | ||
| timeout_seconds=1800.0, |
470d00b to
b2b0b40
Compare
|
Pushed an update addressing the Gemini review notes on the initial commit: Mongo-backed durable job store calls from async routes/runner are now offloaded via asyncio.to_thread so pymongo does not block the FastAPI event loop, and durable job IDs now use the full idempotency hash instead of a truncated prefix. Local verification rerun: compileall on touched Python files, git diff --check, and the no-dependency smoke check for idempotency, redaction, retry, and success transitions. |
b2b0b40 to
9661dac
Compare
|
Added one more hardening pass after reviewing the PR with Codex Security: pause/resume now derives the scanner job owner from the authenticated API key user instead of trusting the request body username, so one API-key user cannot target another user’s scanner job by changing the body. Verification rerun: AST parse for src/api/routes/scanner.py and git diff --check. |
|
Hi @hunterbastian, thank you for the contribution. the PR looks good to me, mostly I am concerned that if we need to make the changes in the /v1/memory routes or we could upgrade the versioning and make the changes in /v2/memory routes leaving the /v1/memory as it is. What do you think on this? Also let me know your thoughts on celery & redis, did you try out the ingest endpoint after job tracking improvement and notice the latency? hsa that increased? |
|
Thanks for the review. I agree the versioning point is the main API-design question here. I changed
If you prefer that direction, I can push a follow-up that restores the v1 memory route behavior and mounts the durable ingest/scrape routes under On Celery + Redis: I see that as the right production queue once this needs multi-process workers, separate worker autoscaling, and stronger operational controls. I kept this PR to Mongo durable records + an in-process async runner to land the durable contract first with fewer new runtime dependencies. The On latency: I did not run a full live ingest against real Pinecone/Neo4j/model credentials locally, so I do not want to overclaim runtime numbers. Expected behavior is that initial request latency should drop to the Mongo enqueue/index-write path plus task scheduling, while total completion time should be roughly the old pipeline runtime plus small status-write overhead. The first Gemini concern was valid, so the current head offloads synchronous pymongo calls with |
yes I would prefer that direction that restores the v1 memory route and introduces v2/memory. Okay we can move with phase1 & phase2 after this. can we do this staging benchmark on /v2/ingest before the merge? |
|
Pushed the versioning follow-up in What changed:
Verification:
For the staging benchmark: I do not have access to your staging URL/API key from this PR, so I should not claim live numbers from it. The endpoint is ready for staging now: measure the initial |
|
Added a staging benchmark helper in What changed:
Verification rerun locally:
To run it on staging, set the API key privately and run: export XMEM_API_KEY=...
python scripts/benchmark_v2_ingest.py --base-url https://your-staging-hostI still do not have staging credentials locally, so I have not fabricated live numbers. This should make the benchmark reproducible on your side or safe to run if credentials are shared privately. |
|
Pushed a small follow-up in |
Closes #162.
Summary
/v1/memory/ingest,/v1/memory/batch-ingest, and/v1/memory/scrapeonto async durable jobs that return ajob_idimmediately with polling endpointsNotes
This implements a focused Phase 1 slice for the bounty: durable job records, immediate enqueue responses, status APIs, idempotency, retries/timeouts, dead-letter state, and scanner job wiring. It does not add Celery/Redis yet; the runner remains in-process but persists lifecycle state in Mongo so request handlers no longer own the work synchronously.
Verification
python3 -m compileall src/jobs/durable.py src/api/routes/memory.py src/api/routes/scanner.py src/scanner/code_store.py tests/test_durable_jobs.pygit diff --checkpytestwas not available in the local Python runtimes I had access to, so I addedtests/test_durable_jobs.pybut could not run it here without installing test dependencies.