Skip to content

Fix split-topology result retrieval#2278

Open
charlesbluca wants to merge 18 commits into
NVIDIA:mainfrom
charlesbluca:codex/shared-result-store
Open

Fix split-topology result retrieval#2278
charlesbluca wants to merge 18 commits into
NVIDIA:mainfrom
charlesbluca:codex/shared-result-store

Conversation

@charlesbluca

@charlesbluca charlesbluca commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

Description

Fix split-topology result retrieval when multiple worker replicas are present.

NVBug 6348464 exposed silent result loss because workers retained completed rows only in pod-local memory, while the gateway fetched them through a load-balanced ClusterIP Service with no document-to-pod affinity. Requests routed to a non-owning pod returned 404, so clients received incomplete result data even though every document completed successfully.

This PR:

  • Persists retained result rows atomically in the shared retriever-results volume.
  • Lets gateway status routes consume shared rows directly before using the legacy worker Service fallback.
  • Mounts and configures the results PVC on gateway, realtime, and batch pods while retaining the in-memory fallback for non-Helm deployments.
  • Opportunistically removes expired result and interrupted temporary files, using a configurable eight-hour default TTL that covers the job tracker's stale and terminal retention windows.
  • Adds concurrency, lifecycle, gateway-route, and Helm rendering coverage.

Result retrieval is now deterministic across split-topology replicas. Multi-node deployments must use an RWX-capable storage class; RWO remains suitable when all consumers run on one node.

Validation:

  • 81 focused and adjacent pytest tests passed.
  • Helm lint and split-mode template rendering passed.
  • Python compilation and git diff --check passed.
  • Live two-replica Helm verification improved from 4/20 rows returned before the fix to 20/20 after the fix, with the workload distributed evenly across both batch pods (10 documents each).

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@charlesbluca charlesbluca marked this pull request as ready for review June 30, 2026 21:37
@charlesbluca charlesbluca requested review from a team as code owners June 30, 2026 21:37
@charlesbluca charlesbluca requested a review from drobison00 June 30, 2026 21:37
@charlesbluca charlesbluca changed the title [codex] Fix split-topology result retrieval Fix split-topology result retrieval Jun 30, 2026
@greptile-apps

greptile-apps Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes silent result loss in split-topology multi-replica deployments by replacing the load-balanced proxy fetch with a direct pod-IP handoff: workers advertise POD_IP in completion callbacks, the gateway fetches rows from the exact owning pod and writes them atomically to a shared PVC as immutable UUID-named generations, and status routes read from the gateway's local filesystem copy rather than proxying.

  • worker_result_store.py is rewritten with an atomic write-temp → fsync → rename → fsync-dir protocol, TTL-based sweep of expired generations, and an in-memory fallback for non-Helm deployments.
  • pipeline_pool.py gains a bounded, backpressured deferred-retry loop (BoundedSemaphore(num_workers)) for callback delivery, so transient gateway unavailability no longer silently drops the callback.
  • job_tracker.py changes consume_result_data to idempotent get_result_data and adds opportunistic eviction on every read path via _get_live_job_locked/_get_live_document_locked.

Confidence Score: 4/5

Safe to merge for clean deployments; a gateway-pod restart while retained-result callbacks are in-flight leaves worker semaphore slots occupied for up to the result retention window.

The callback handler now raises HTTP 503 when the gateway has no record of a document and a result_worker_ip is present. Pre-PR, that path fell through to mark_completed, which returned MarkOutcome.UNKNOWN_DOCUMENT and acknowledged the callback with HTTP 200, letting the worker discard its local copy and move on. Post-PR, those callbacks enter the deferred retry loop and hold a BoundedSemaphore slot for up to result_retention_seconds() (default 8 h). In a cluster where a gateway restart occurs mid-flight, all N worker slots can be occupied by stale retries, delaying processing of new completions that also need a retry slot.

nemo_retriever/src/nemo_retriever/service/routers/ingest.py — the pre_rec is None and result_worker_ip guard in job_callback

Important Files Changed

Filename Overview
nemo_retriever/src/nemo_retriever/service/routers/ingest.py Reworks result retrieval from proxy-based fetch to gateway-local filesystem read, adds result handoff in callback handler. The new 503 guard for pre_rec is None with result_worker_ip breaks the pre-PR graceful gateway-restart fallback.
nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py New module implementing immutable-generation filesystem store for retained result rows; well-structured with atomic rename protocol, TTL sweep, and in-memory fallback.
nemo_retriever/src/nemo_retriever/service/services/pipeline_pool.py Adds bounded deferred-retry loop for gateway callbacks with semaphore-controlled concurrency and exponential backoff. Worker loop can stall when all N slots are occupied; otherwise correct and well-tested.
nemo_retriever/src/nemo_retriever/service/services/job_tracker.py Replaces consume-on-read consume_result_data with idempotent get_result_data; adds opportunistic eviction on every read path. Logic is consistent and deepcopy of result_data prevents aliasing.
nemo_retriever/helm/templates/deployment.yaml Correctly wires NEMO_RETRIEVER_RESULTS_DIR and TTL env vars to standalone and gateway pods only; PVC volume mounted only on gateway in split mode.

Comments Outside Diff (1)

  1. nemo_retriever/src/nemo_retriever/service/routers/ingest.py, line 389-394 (link)

    P1 Gateway-restart resilience regression for retained-result callbacks

    The new guard raises HTTP 503 when pre_rec is None and result_worker_ip is present, bypassing the mark_completed call. Pre-PR, mark_completed for an unknown document returned MarkOutcome.UNKNOWN_DOCUMENT and the endpoint returned HTTP 200, so the worker acknowledged the callback, called discard_local_result_data, and moved on. Post-PR, those callbacks retry indefinitely (every 1 s, then exponentially up to 300 s) until result_retention_seconds() (8 h by default) expires. During that window each pre-restart document occupies one of the _num_workers semaphore slots in _schedule_gateway_callback_retry. If N pre-restart documents each consume one slot, new completions whose callbacks also fail (e.g. during a rolling restart) block on await slots.acquire(), stalling queue draining until a slot is freed.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: nemo_retriever/src/nemo_retriever/service/routers/ingest.py
    Line: 389-394
    
    Comment:
    **Gateway-restart resilience regression for retained-result callbacks**
    
    The new guard raises HTTP 503 when `pre_rec is None` and `result_worker_ip` is present, bypassing the `mark_completed` call. Pre-PR, `mark_completed` for an unknown document returned `MarkOutcome.UNKNOWN_DOCUMENT` and the endpoint returned HTTP 200, so the worker acknowledged the callback, called `discard_local_result_data`, and moved on. Post-PR, those callbacks retry indefinitely (every 1 s, then exponentially up to 300 s) until `result_retention_seconds()` (8 h by default) expires. During that window each pre-restart document occupies one of the `_num_workers` semaphore slots in `_schedule_gateway_callback_retry`. If N pre-restart documents each consume one slot, new completions whose callbacks also fail (e.g. during a rolling restart) block on `await slots.acquire()`, stalling queue draining until a slot is freed.
    
    How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
nemo_retriever/src/nemo_retriever/service/routers/ingest.py:389-394
**Gateway-restart resilience regression for retained-result callbacks**

The new guard raises HTTP 503 when `pre_rec is None` and `result_worker_ip` is present, bypassing the `mark_completed` call. Pre-PR, `mark_completed` for an unknown document returned `MarkOutcome.UNKNOWN_DOCUMENT` and the endpoint returned HTTP 200, so the worker acknowledged the callback, called `discard_local_result_data`, and moved on. Post-PR, those callbacks retry indefinitely (every 1 s, then exponentially up to 300 s) until `result_retention_seconds()` (8 h by default) expires. During that window each pre-restart document occupies one of the `_num_workers` semaphore slots in `_schedule_gateway_callback_retry`. If N pre-restart documents each consume one slot, new completions whose callbacks also fail (e.g. during a rolling restart) block on `await slots.acquire()`, stalling queue draining until a slot is freed.

Reviews (10): Last reviewed commit: "Bound deferred callback retries" | Re-trigger Greptile

Comment thread nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/service/routers/ingest.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant