Skip to content

Add durable job tracking for ingest and scanner work#177

Open
hunterbastian wants to merge 4 commits into
XortexAI:mainfrom
hunterbastian:codex-durable-jobs
Open

Add durable job tracking for ingest and scanner work#177
hunterbastian wants to merge 4 commits into
XortexAI:mainfrom
hunterbastian:codex-durable-jobs

Conversation

@hunterbastian
Copy link
Copy Markdown

Closes #162.

Summary

  • add a Mongo-backed durable job store with idempotency keys, retry counts, timeout fields, error state, result persistence, and dead-letter transitions
  • move /v1/memory/ingest, /v1/memory/batch-ingest, and /v1/memory/scrape onto async durable jobs that return a job_id immediately with polling endpoints
  • wrap scanner scan/resume execution in durable jobs while preserving existing scanner status responses, now including durable job metadata
  • redact token/password/PAT-style fields from persisted job payloads

Notes

This implements a focused Phase 1 slice for the bounty: durable job records, immediate enqueue responses, status APIs, idempotency, retries/timeouts, dead-letter state, and scanner job wiring. It does not add Celery/Redis yet; the runner remains in-process but persists lifecycle state in Mongo so request handlers no longer own the work synchronously.

Verification

  • python3 -m compileall src/jobs/durable.py src/api/routes/memory.py src/api/routes/scanner.py src/scanner/code_store.py tests/test_durable_jobs.py
  • git diff --check
  • local no-dependency smoke check for idempotency, payload redaction, retry, and success transition

pytest was not available in the local Python runtimes I had access to, so I added tests/test_durable_jobs.py but could not run it here without installing test dependencies.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a durable background job system using MongoDB to handle long-running tasks like memory ingestion and repository scanning asynchronously. Key changes include the addition of a DurableJobStore for job persistence, idempotency management, and retry logic, along with the refactoring of existing API routes to utilize this new system. Feedback highlights a critical performance issue where the use of the synchronous pymongo driver could block the FastAPI event loop, and a potential for ID collisions due to truncation of the idempotency hash. Additionally, there are suggestions to deduplicate scheduling logic and centralize hardcoded timeout constants to improve maintainability.

Comment thread src/jobs/durable.py
database: Optional[str] = None,
collection: str = "durable_jobs",
) -> None:
from pymongo import MongoClient
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The DurableJobStore implementation uses pymongo.MongoClient, which is a synchronous driver. In an asynchronous FastAPI application, performing synchronous database operations on the main event loop will block it, preventing other requests from being handled and significantly reducing the application's scalability. It is highly recommended to use an asynchronous driver like motor.

Comment thread src/jobs/durable.py Outdated
max_attempts: int = 3,
) -> Tuple[Dict[str, Any], bool]:
key = idempotency_key(job_type, idempotency_fields)
job_id = f"{job_type}:{key[:24]}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Truncating the idempotency hash to 24 characters for the job_id increases the risk of collisions. If two different payloads result in the same 24-character prefix, the enqueue method will incorrectly return the existing job for the first payload, breaking the idempotency contract. Since MongoDB handles longer strings efficiently, it is safer to use the full hash.

Suggested change
job_id = f"{job_type}:{key[:24]}"
job_id = f"{job_type}:{key}"

Comment thread src/api/routes/memory.py
Comment on lines +218 to +220
def _schedule_job(job: Dict[str, Any], handler) -> None:
if job.get("status") in {QUEUED, FAILED}:
asyncio.create_task(run_job(get_default_job_store(), job["job_id"], handler))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This _schedule_job helper is duplicated in src/api/routes/scanner.py (as _schedule_durable_job). To improve maintainability and follow the DRY (Don't Repeat Yourself) principle, consider moving this logic to a shared location, such as a method within DurableJobStore or a utility function in src/jobs/durable.py.

Comment thread src/api/routes/scanner.py Outdated
"scanner_job_id": job_id,
},
user_id=req.username,
timeout_seconds=1800.0,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout value of 1800.0 seconds is hardcoded in multiple places throughout this file (lines 809, 850, 975). It would be better to define this as a constant or a configuration setting in src/config.py to make it easier to manage and update.

@hunterbastian hunterbastian force-pushed the codex-durable-jobs branch 2 times, most recently from 470d00b to b2b0b40 Compare May 12, 2026 01:42
@hunterbastian
Copy link
Copy Markdown
Author

Pushed an update addressing the Gemini review notes on the initial commit: Mongo-backed durable job store calls from async routes/runner are now offloaded via asyncio.to_thread so pymongo does not block the FastAPI event loop, and durable job IDs now use the full idempotency hash instead of a truncated prefix. Local verification rerun: compileall on touched Python files, git diff --check, and the no-dependency smoke check for idempotency, redaction, retry, and success transitions.

@hunterbastian
Copy link
Copy Markdown
Author

Added one more hardening pass after reviewing the PR with Codex Security: pause/resume now derives the scanner job owner from the authenticated API key user instead of trusting the request body username, so one API-key user cannot target another user’s scanner job by changing the body. Verification rerun: AST parse for src/api/routes/scanner.py and git diff --check.

@ishaanxgupta
Copy link
Copy Markdown
Member

Hi @hunterbastian, thank you for the contribution. the PR looks good to me, mostly I am concerned that if we need to make the changes in the /v1/memory routes or we could upgrade the versioning and make the changes in /v2/memory routes leaving the /v1/memory as it is. What do you think on this? Also let me know your thoughts on celery & redis, did you try out the ingest endpoint after job tracking improvement and notice the latency? hsa that increased?

@hunterbastian
Copy link
Copy Markdown
Author

Thanks for the review. I agree the versioning point is the main API-design question here.

I changed /v1/memory/ingest because #162 explicitly called out that endpoint returning a job_id immediately. That said, the response contract does change from a completed ingest result to an accepted/job-status envelope, so from a client-compatibility perspective I think the cleaner production shape is:

  • keep /v1/memory/* synchronous/backwards-compatible if existing clients depend on the completed ingest response;
  • expose the durable async behavior as /v2/memory/ingest, /v2/memory/batch-ingest, /v2/memory/scrape, plus matching status endpoints;
  • keep the same durable job backend and payload/status schema this PR adds, so the implementation does not fork beyond routing/compatibility.

If you prefer that direction, I can push a follow-up that restores the v1 memory route behavior and mounts the durable ingest/scrape routes under /v2/memory. If you want to keep #162 literal, the current v1 behavior matches the issue acceptance criteria.

On Celery + Redis: I see that as the right production queue once this needs multi-process workers, separate worker autoscaling, and stronger operational controls. I kept this PR to Mongo durable records + an in-process async runner to land the durable contract first with fewer new runtime dependencies. The DurableJobStore boundary should make Celery/Redis a later swap-in: enqueue writes the durable record, Celery workers claim/process jobs, and the same status/dead-letter fields are updated in Mongo. Redis step persistence for LangGraph node checkpoints still feels like Phase 2, as you described in the issue comment.

On latency: I did not run a full live ingest against real Pinecone/Neo4j/model credentials locally, so I do not want to overclaim runtime numbers. Expected behavior is that initial request latency should drop to the Mongo enqueue/index-write path plus task scheduling, while total completion time should be roughly the old pipeline runtime plus small status-write overhead. The first Gemini concern was valid, so the current head offloads synchronous pymongo calls with asyncio.to_thread to avoid blocking the FastAPI event loop. CI is now passing on the branch, but a real staging benchmark on /ingest would be the best confirmation before merge.

@ishaanxgupta
Copy link
Copy Markdown
Member

Thanks for the review. I agree the versioning point is the main API-design question here.

I changed /v1/memory/ingest because #162 explicitly called out that endpoint returning a job_id immediately. That said, the response contract does change from a completed ingest result to an accepted/job-status envelope, so from a client-compatibility perspective I think the cleaner production shape is:

  • keep /v1/memory/* synchronous/backwards-compatible if existing clients depend on the completed ingest response;
  • expose the durable async behavior as /v2/memory/ingest, /v2/memory/batch-ingest, /v2/memory/scrape, plus matching status endpoints;
  • keep the same durable job backend and payload/status schema this PR adds, so the implementation does not fork beyond routing/compatibility.

If you prefer that direction, I can push a follow-up that restores the v1 memory route behavior and mounts the durable ingest/scrape routes under /v2/memory. If you want to keep #162 literal, the current v1 behavior matches the issue acceptance criteria.

On Celery + Redis: I see that as the right production queue once this needs multi-process workers, separate worker autoscaling, and stronger operational controls. I kept this PR to Mongo durable records + an in-process async runner to land the durable contract first with fewer new runtime dependencies. The DurableJobStore boundary should make Celery/Redis a later swap-in: enqueue writes the durable record, Celery workers claim/process jobs, and the same status/dead-letter fields are updated in Mongo. Redis step persistence for LangGraph node checkpoints still feels like Phase 2, as you described in the issue comment.

On latency: I did not run a full live ingest against real Pinecone/Neo4j/model credentials locally, so I do not want to overclaim runtime numbers. Expected behavior is that initial request latency should drop to the Mongo enqueue/index-write path plus task scheduling, while total completion time should be roughly the old pipeline runtime plus small status-write overhead. The first Gemini concern was valid, so the current head offloads synchronous pymongo calls with asyncio.to_thread to avoid blocking the FastAPI event loop. CI is now passing on the branch, but a real staging benchmark on /ingest would be the best confirmation before merge.

yes I would prefer that direction that restores the v1 memory route and introduces v2/memory. Okay we can move with phase1 & phase2 after this. can we do this staging benchmark on /v2/ingest before the merge?

@hunterbastian
Copy link
Copy Markdown
Author

Pushed the versioning follow-up in 4277468.

What changed:

  • restored the existing /v1/memory/ingest, /v1/memory/batch-ingest, and /v1/memory/scrape behavior so v1 returns the completed response shape again;
  • mounted the durable async contract under /v2/memory/ingest, /v2/memory/batch-ingest, and /v2/memory/scrape;
  • moved the durable polling URLs to /v2/memory/ingest/{job_id}/status, /v2/memory/jobs/{job_id}/status, and /v2/memory/scrape/{job_id}/status;
  • added route-versioning tests that assert v1 still returns completed ingest data while v2 returns the durable job envelope.

Verification:

  • python3 -m compileall src/api/routes/memory.py src/api/app.py tests/api/test_memory_versioning.py
  • git diff --check
  • uv run --extra dev pytest tests/api/test_memory_versioning.py tests/test_durable_jobs.py -> 6 passed
  • uv run --extra dev pytest -> 46 passed

For the staging benchmark: I do not have access to your staging URL/API key from this PR, so I should not claim live numbers from it. The endpoint is ready for staging now: measure the initial POST /v2/memory/ingest latency as enqueue latency, then poll the returned status_url for completion latency. If you want me to run it, please provide staging access through a private channel rather than posting credentials here.

@github-actions github-actions Bot added the docs label May 12, 2026
@hunterbastian
Copy link
Copy Markdown
Author

Added a staging benchmark helper in 41f9a94 so the /v2/memory ingest latency can be measured without posting staging credentials in the PR.

What changed:

  • added scripts/benchmark_v2_ingest.py, which reads the Bearer token from an env var, posts one payload to POST /v2/memory/ingest, then polls the returned status_url;
  • reports enqueue latency separately from total durable-job completion latency;
  • keeps the API key out of stdout/log output;
  • documented usage in docs/benchmarks.md.

Verification rerun locally:

  • python3 -m compileall scripts/benchmark_v2_ingest.py src/api/routes/memory.py src/api/app.py tests/api/test_memory_versioning.py tests/test_durable_jobs.py
  • git diff --check
  • uv run ruff check scripts/benchmark_v2_ingest.py
  • uv run --extra dev pytest tests/api/test_memory_versioning.py tests/test_durable_jobs.py -> 6 passed
  • uv run --extra dev pytest -> 46 passed

To run it on staging, set the API key privately and run:

export XMEM_API_KEY=...
python scripts/benchmark_v2_ingest.py --base-url https://your-staging-host

I still do not have staging credentials locally, so I have not fabricated live numbers. This should make the benchmark reproducible on your side or safe to run if credentials are shared privately.

@hunterbastian
Copy link
Copy Markdown
Author

Pushed a small follow-up in 67761f2 for the remaining maintainability note from the earlier Gemini review: scanner durable job timeout handling now uses a single SCANNER_DURABLE_TIMEOUT_SECONDS constant and helper instead of repeating the 1800.0 fallback across resume/start paths.\n\nVerification rerun locally:\n- python3 -m compileall src/api/routes/scanner.py\n- git diff --check\n- uv run --extra dev pytest tests/api/test_memory_versioning.py tests/test_durable_jobs.py -> 6 passed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Move ingestion and scanner work to durable jobs/workflows

2 participants