Skip to content

Feat/pull based job dispatch#52

Draft
as535364 wants to merge 15 commits intomainfrom
feat/pull-based-job-dispatch
Draft

Feat/pull based job dispatch#52
as535364 wants to merge 15 commits intomainfrom
feat/pull-based-job-dispatch

Conversation

@as535364
Copy link
Copy Markdown
Member

@as535364 as535364 commented Apr 28, 2026

Pull-Based Job Dispatch — Runner / Infra Implementation Plan (Plan B)

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Refactor the Normal-OJ Sandbox process from "passive HTTP receiver" to "active GH-style runner that polls Backend for jobs". Plus all infra changes (docker-compose Redis AOF, new env vars) needed for the big-bang cutover.

Architecture: Add Sandbox/agent/ with 4 daemon-thread modules (registration, heartbeat, poller, result_sender). Modify Sandbox/dispatcher/dispatcher.py to accept a job_id per submission and push results to an in-memory queue (instead of POSTing inline). Replace Flask Sandbox/app.py entrypoint with a new Sandbox/main.py that wires all threads together. Update Dockerfile + docker-compose.

Tech Stack: Python 3.13 (runtime), Python 3.10 (CI tests), pip (no Poetry), pytest, requests, Docker.

Working directories:

  • Sandbox/ — most changes
  • meta-repo root (Normal-OJ/) — docker-compose.yml, .secret.example/*.env

Prerequisite: Plan A (docs/superpowers/plans/2026-04-28-pull-based-job-dispatch-backend.md) must be merged into Back-End first. Plan B's smoke test depends on Backend exposing the new /runners/* endpoints.


Spec Reference

Source design: docs/superpowers/specs/2026-04-28-pull-based-job-dispatch-design.md

This Plan B covers Sections 10 (Runner internal structure), 11 (env vars — runner side), 12 (infra), 14 (Migration steps).


Naming clarification: runner/ vs agent/

The existing Sandbox/runner/ directory contains code that runs submissions inside Docker containers (SubmissionRunner class) — it's the execution engine. This is unrelated to "GitHub-style runner" terminology.

To avoid name collision, all new pull-loop modules go in Sandbox/agent/ (agent = the worker process that polls + executes). The existing Sandbox/runner/ stays untouched.


File Structure

New files (Sandbox)

Sandbox/
├── main.py                          # NEW entrypoint (replaces app.py CMD)
├── agent/
│   ├── __init__.py
│   ├── config.py                    # Env vars: BACKEND_URL, REGISTRATION_TOKEN, etc.
│   ├── registration.py              # Self-register on startup
│   ├── heartbeat.py                 # Heartbeat thread (15s interval)
│   ├── poller.py                    # Poller thread (3s interval, dispatches to dispatcher)
│   ├── result_sender.py             # Result delivery thread with retry
│   └── client.py                    # Thin HTTP client to backend (shared by above)
└── tests/
    └── agent/
        ├── __init__.py
        ├── test_registration.py
        ├── test_heartbeat.py
        ├── test_poller.py
        ├── test_result_sender.py
        └── test_client.py

Modified files

Sandbox/dispatcher/dispatcher.py     # handle() accepts job_id; completion pushes to queue; add has_capacity()
Sandbox/dispatcher/config.py         # Remove SANDBOX_TOKEN, BACKEND_API; runner gets these dynamically
Sandbox/Dockerfile                   # CMD changes from gunicorn to "python main.py"
Sandbox/requirements.txt             # Drop flask + gunicorn (no longer needed)
docker-compose.yml                   # Redis --appendonly yes; new env vars
.secret.example/sandbox.env          # New env vars
.secret.example/web.env              # New env vars (RUNNER_REGISTRATION_TOKEN)

Deleted files

Sandbox/app.py                       # Flask routes obsolete
Sandbox/gunicorn.conf.py             # No more gunicorn

Phases

  • Phase 1: agent/ scaffold + HTTP client (Tasks 1-2)
  • Phase 2: Per-component TDD (Tasks 3-6)
  • Phase 3: Dispatcher modifications (Tasks 7-9)
  • Phase 4: Entrypoint swap (Tasks 10-12)
  • Phase 5: Infra changes (Tasks 13-14)
  • Phase 6: Smoke test (Task 15)

Phase 1: agent/ scaffold + HTTP client

Task 1: Create agent/ package + config

Files:

  • Create: Sandbox/agent/__init__.py
  • Create: Sandbox/agent/config.py
  • Create: Sandbox/tests/agent/__init__.py

Working dir for git commands: /Users/as535364/Downloads/Project/Normal-OJ/Sandbox

  • Step 1: Create empty package init
# Sandbox/agent/__init__.py
"""GH-style runner agent: polls Backend, dispatches jobs, sends results.

Distinct from Sandbox/runner/ which contains the Docker execution code.
"""
  • Step 2: Create config module
# Sandbox/agent/config.py
"""Environment-driven configuration for the runner agent."""
import os
from pathlib import Path

# Backend URL — where to register, poll, send results
BACKEND_URL: str = os.getenv("BACKEND_URL", "http://web:8080")

# Shared registration secret (must match backend's RUNNER_REGISTRATION_TOKEN).
RUNNER_REGISTRATION_TOKEN: str = os.getenv(
    "RUNNER_REGISTRATION_TOKEN",
    "dev-only-registration-token-change-me",
)

# Display name shown in admin/listing. Defaults to hostname.
RUNNER_NAME: str = os.getenv("RUNNER_NAME", os.uname().nodename)

# Tunings (defaults match what backend returns from register; override here is rarely needed)
HEARTBEAT_INTERVAL_SEC: int = int(os.getenv("HEARTBEAT_INTERVAL_SEC", "15"))
POLL_INTERVAL_SEC: int = int(os.getenv("POLL_INTERVAL_SEC", "3"))

# Result delivery retry policy
RESULT_RETRY_MAX_ATTEMPTS: int = 5
RESULT_RETRY_INITIAL_BACKOFF_SEC: float = 1.0
RESULT_RETRY_MAX_BACKOFF_SEC: float = 16.0

# HTTP timeouts
HTTP_REQUEST_TIMEOUT_SEC: int = 10

# Where to download code zip to (per-job temp dir)
CODE_DOWNLOAD_DIR: Path = Path(os.getenv("CODE_DOWNLOAD_DIR", "/tmp/runner-code"))
CODE_DOWNLOAD_DIR.mkdir(exist_ok=True)
  • Step 3: Create test package init
# Sandbox/tests/agent/__init__.py
  • Step 4: Verify imports

Run: cd Sandbox && python -c "from agent import config; print(config.BACKEND_URL)"
Expected: prints http://web:8080 (or whatever BACKEND_URL is in env)

  • Step 5: Commit
cd Sandbox
git add agent/__init__.py agent/config.py tests/agent/__init__.py
git commit -m "feat(agent): scaffold agent package with config"

Task 2: Implement agent/client.py — thin HTTP client to backend

Files:

  • Test: Sandbox/tests/agent/test_client.py
  • Create: Sandbox/agent/client.py

A small wrapper around requests that handles auth headers, base URL, JSON encoding, and exception mapping. All other agent modules use it (avoids each module re-implementing HTTP boilerplate).

  • Step 1: Write failing tests
# Sandbox/tests/agent/test_client.py
"""Tests for the BackendClient HTTP wrapper."""
import pytest
import responses

from agent.client import BackendClient
from agent import config


@pytest.fixture
def client():
    return BackendClient(base_url="http://test-backend", rk_token="rk_test")


@responses.activate
def test_register_posts_to_correct_url(client):
    responses.add(
        responses.POST,
        "http://test-backend/runners/register",
        json={"data": {"runner_id": "rn_1", "token": "rk_xyz", "config": {}}},
        status=201,
    )
    rv = client.register(name="r1", registration_token="dev-token")
    assert rv["runner_id"] == "rn_1"
    assert rv["token"] == "rk_xyz"
    # Verify request shape
    req = responses.calls[0].request
    assert req.headers["Content-Type"] == "application/json"


@responses.activate
def test_register_raises_on_401():
    c = BackendClient(base_url="http://test-backend", rk_token=None)
    responses.add(
        responses.POST,
        "http://test-backend/runners/register",
        json={"message": "invalid"},
        status=401,
    )
    with pytest.raises(BackendClient.AuthError):
        c.register(name="r1", registration_token="wrong")


@responses.activate
def test_heartbeat_sends_bearer_token(client):
    responses.add(
        responses.POST,
        "http://test-backend/runners/rn_1/heartbeat",
        status=204,
    )
    client.heartbeat(runner_id="rn_1")
    req = responses.calls[0].request
    assert req.headers["Authorization"] == "Bearer rk_test"


@responses.activate
def test_next_job_returns_payload_when_200(client):
    responses.add(
        responses.GET,
        "http://test-backend/runners/rn_1/next-job",
        json={"data": {"job_id": "jb_1", "submission_id": "sub_1",
                       "problem_id": 42, "language": 0, "code_url": "http://...",
                       "checker": "", "tasks": []}},
        status=200,
    )
    job = client.next_job(runner_id="rn_1")
    assert job["job_id"] == "jb_1"


@responses.activate
def test_next_job_returns_none_when_204(client):
    responses.add(
        responses.GET,
        "http://test-backend/runners/rn_1/next-job",
        status=204,
    )
    assert client.next_job(runner_id="rn_1") is None


@responses.activate
def test_complete_job_returns_status_string(client):
    responses.add(
        responses.PUT,
        "http://test-backend/runners/rn_1/jobs/jb_1/complete",
        status=204,
    )
    assert client.complete_job("rn_1", "jb_1", tasks=[]) == "ok"


@responses.activate
def test_complete_job_returns_reclaimed_on_409(client):
    responses.add(
        responses.PUT,
        "http://test-backend/runners/rn_1/jobs/jb_1/complete",
        status=409,
    )
    assert client.complete_job("rn_1", "jb_1", tasks=[]) == "reclaimed"


@responses.activate
def test_complete_job_returns_not_found_on_404(client):
    responses.add(
        responses.PUT,
        "http://test-backend/runners/rn_1/jobs/jb_1/complete",
        status=404,
    )
    assert client.complete_job("rn_1", "jb_1", tasks=[]) == "not_found"


@responses.activate
def test_complete_job_raises_on_5xx(client):
    responses.add(
        responses.PUT,
        "http://test-backend/runners/rn_1/jobs/jb_1/complete",
        status=503,
    )
    with pytest.raises(BackendClient.TransientError):
        client.complete_job("rn_1", "jb_1", tasks=[])
  • Step 2: Run tests to verify failure

Run: cd Sandbox && pip install responses && pytest tests/agent/test_client.py -v
Expected: FAIL with import error (agent.client doesn't exist yet)

  • Step 3: Add responses to test requirements

Edit Sandbox/tests/requirements.txt (create if missing) to add:

responses~=0.25

If tests/requirements.txt exists, append the line. If not, create it (the existing CI workflow installs from this path per CLAUDE.md).

  • Step 4: Implement agent/client.py
# Sandbox/agent/client.py
"""HTTP client for the Backend runner API."""
import requests

from . import config


class BackendClient:
    """Thin wrapper around requests, adding auth, base URL, error mapping."""

    class AuthError(Exception):
        """Raised when backend rejects auth (401)."""

    class TransientError(Exception):
        """Raised on 5xx or network errors — caller should retry."""

    def __init__(self, base_url: str = None, rk_token: str = None):
        self.base_url = base_url or config.BACKEND_URL
        self.rk_token = rk_token

    # ------- Public API -------

    def register(self, name: str, registration_token: str) -> dict:
        """Register this runner. Returns the `data` payload from backend.

        Raises AuthError on 401, TransientError on network/5xx.
        """
        rv = self._request(
            "POST", "/runners/register",
            json_body={"registration_token": registration_token, "name": name},
            need_auth=False,
            expected_statuses=(201,),
        )
        return rv.json()["data"]

    def heartbeat(self, runner_id: str) -> None:
        """Send a heartbeat. Raises AuthError on 401."""
        self._request(
            "POST", f"/runners/{runner_id}/heartbeat",
            expected_statuses=(204,),
        )

    def next_job(self, runner_id: str) -> dict | None:
        """Poll for next job. Returns None if no job available (204)."""
        rv = self._request(
            "GET", f"/runners/{runner_id}/next-job",
            expected_statuses=(200, 204),
        )
        if rv.status_code == 204:
            return None
        return rv.json()["data"]

    def complete_job(self, runner_id: str, job_id: str, tasks: list) -> str:
        """Send result. Returns 'ok' / 'reclaimed' / 'not_found'.

        Raises TransientError on 5xx or network — caller should retry.
        """
        rv = self._request(
            "PUT", f"/runners/{runner_id}/jobs/{job_id}/complete",
            json_body={"tasks": tasks},
            expected_statuses=(204, 409, 404),
        )
        return {204: "ok", 409: "reclaimed", 404: "not_found"}[rv.status_code]

    def download_code(self, code_url: str, dest_path: str) -> None:
        """Download code zip from a presigned URL."""
        try:
            with requests.get(code_url, stream=True,
                              timeout=config.HTTP_REQUEST_TIMEOUT_SEC) as r:
                r.raise_for_status()
                with open(dest_path, "wb") as f:
                    for chunk in r.iter_content(chunk_size=64 * 1024):
                        f.write(chunk)
        except requests.RequestException as e:
            raise self.TransientError(f"code download failed: {e}") from e

    # ------- Internals -------

    def _request(self, method: str, path: str, *,
                 json_body=None, need_auth=True,
                 expected_statuses=(200,)) -> requests.Response:
        headers = {}
        if need_auth:
            if not self.rk_token:
                raise self.AuthError("rk_token not set")
            headers["Authorization"] = f"Bearer {self.rk_token}"
        if json_body is not None:
            headers["Content-Type"] = "application/json"
        try:
            rv = requests.request(
                method=method,
                url=f"{self.base_url}{path}",
                json=json_body,
                headers=headers,
                timeout=config.HTTP_REQUEST_TIMEOUT_SEC,
            )
        except requests.RequestException as e:
            raise self.TransientError(f"network error: {e}") from e

        if rv.status_code == 401:
            raise self.AuthError(rv.text)
        if rv.status_code >= 500:
            raise self.TransientError(f"backend {rv.status_code}: {rv.text}")
        if rv.status_code not in expected_statuses:
            raise self.TransientError(
                f"unexpected status {rv.status_code}: {rv.text}"
            )
        return rv
  • Step 5: Run tests to verify pass

Run: cd Sandbox && pytest tests/agent/test_client.py -v
Expected: All 9 tests PASS

  • Step 6: Commit
cd Sandbox
git add agent/client.py tests/agent/test_client.py tests/requirements.txt
git commit -m "feat(agent): add BackendClient with auth, error mapping, retry hints"

Phase 2: Per-component TDD

Task 3: registration.py — register on startup

Files:

  • Test: Sandbox/tests/agent/test_registration.py

  • Create: Sandbox/agent/registration.py

  • Step 1: Write failing tests

# Sandbox/tests/agent/test_registration.py
from unittest.mock import MagicMock
from agent.registration import register_runner
from agent.client import BackendClient


def test_register_runner_returns_credentials_and_config():
    fake_client = MagicMock(spec=BackendClient)
    fake_client.register.return_value = {
        "runner_id": "rn_xyz",
        "token": "rk_xyz",
        "config": {
            "heartbeat_interval_sec": 10,
            "poll_interval_sec": 5,
            "max_concurrent_jobs": 4,
        },
    }

    result = register_runner(fake_client, name="r1",
                             registration_token="dev-token")

    assert result.runner_id == "rn_xyz"
    assert result.token == "rk_xyz"
    assert result.heartbeat_interval_sec == 10
    assert result.poll_interval_sec == 5
    assert result.max_concurrent_jobs == 4

    fake_client.register.assert_called_once_with(
        name="r1", registration_token="dev-token"
    )


def test_register_runner_uses_defaults_for_missing_config_fields():
    fake_client = MagicMock(spec=BackendClient)
    fake_client.register.return_value = {
        "runner_id": "rn_a", "token": "rk_a", "config": {},
    }

    result = register_runner(fake_client, name="r1",
                             registration_token="t")

    # Backend should always send config, but defensively use sensible defaults
    assert result.heartbeat_interval_sec == 15
    assert result.poll_interval_sec == 3
    assert result.max_concurrent_jobs == 8
  • Step 2: Run tests to verify failure

Run: cd Sandbox && pytest tests/agent/test_registration.py -v
Expected: FAIL — agent.registration not found

  • Step 3: Implement registration.py
# Sandbox/agent/registration.py
"""Self-registration on startup."""
from dataclasses import dataclass

from .client import BackendClient


@dataclass(frozen=True)
class RunnerCredentials:
    runner_id: str
    token: str
    heartbeat_interval_sec: int
    poll_interval_sec: int
    max_concurrent_jobs: int


def register_runner(
    client: BackendClient,
    name: str,
    registration_token: str,
) -> RunnerCredentials:
    """Call backend's register endpoint and return RunnerCredentials.

    Raises BackendClient.AuthError or TransientError on failure.
    """
    rv = client.register(name=name, registration_token=registration_token)
    cfg = rv.get("config", {})
    return RunnerCredentials(
        runner_id=rv["runner_id"],
        token=rv["token"],
        heartbeat_interval_sec=cfg.get("heartbeat_interval_sec", 15),
        poll_interval_sec=cfg.get("poll_interval_sec", 3),
        max_concurrent_jobs=cfg.get("max_concurrent_jobs", 8),
    )
  • Step 4: Run tests to verify pass

Run: cd Sandbox && pytest tests/agent/test_registration.py -v
Expected: 2 tests PASS

  • Step 5: Commit
cd Sandbox
git add agent/registration.py tests/agent/test_registration.py
git commit -m "feat(agent): add startup registration with backend"

Task 4: heartbeat.py — daemon thread sending heartbeat at interval

Files:

  • Test: Sandbox/tests/agent/test_heartbeat.py

  • Create: Sandbox/agent/heartbeat.py

  • Step 1: Write failing tests

# Sandbox/tests/agent/test_heartbeat.py
import threading
import time
from unittest.mock import MagicMock

import pytest

from agent.client import BackendClient
from agent.heartbeat import HeartbeatThread


def test_heartbeat_calls_client_at_interval():
    client = MagicMock(spec=BackendClient)
    shutdown = threading.Event()

    hb = HeartbeatThread(
        client=client, runner_id="rn_1",
        interval_sec=0.05, shutdown_event=shutdown,
    )
    hb.start()
    time.sleep(0.18)  # ~3 intervals
    shutdown.set()
    hb.join(timeout=1)

    # Should have been called 3-4 times
    assert 2 <= client.heartbeat.call_count <= 5
    client.heartbeat.assert_called_with(runner_id="rn_1")


def test_heartbeat_swallows_transient_errors_and_keeps_going():
    client = MagicMock(spec=BackendClient)
    client.heartbeat.side_effect = [
        BackendClient.TransientError("boom"),
        None,
        None,
    ]
    shutdown = threading.Event()

    hb = HeartbeatThread(
        client=client, runner_id="rn_1",
        interval_sec=0.05, shutdown_event=shutdown,
    )
    hb.start()
    time.sleep(0.2)
    shutdown.set()
    hb.join(timeout=1)

    # Despite first call raising, subsequent calls happened
    assert client.heartbeat.call_count >= 3


def test_heartbeat_stops_promptly_on_shutdown():
    client = MagicMock(spec=BackendClient)
    shutdown = threading.Event()
    hb = HeartbeatThread(
        client=client, runner_id="rn_1",
        interval_sec=10.0,  # long interval
        shutdown_event=shutdown,
    )
    hb.start()
    time.sleep(0.05)
    shutdown.set()
    hb.join(timeout=0.5)
    assert not hb.is_alive(), "heartbeat thread should exit promptly"
  • Step 2: Run tests to verify failure

Run: cd Sandbox && pytest tests/agent/test_heartbeat.py -v
Expected: FAIL — HeartbeatThread not found

  • Step 3: Implement heartbeat.py
# Sandbox/agent/heartbeat.py
"""Heartbeat daemon thread: refreshes runner alive TTL on backend."""
import logging
import threading

from .client import BackendClient

log = logging.getLogger(__name__)


class HeartbeatThread(threading.Thread):
    """Periodically POSTs heartbeat. Tolerates transient errors silently."""

    def __init__(
        self,
        client: BackendClient,
        runner_id: str,
        interval_sec: float,
        shutdown_event: threading.Event,
    ):
        super().__init__(daemon=True, name="heartbeat")
        self.client = client
        self.runner_id = runner_id
        self.interval_sec = interval_sec
        self.shutdown_event = shutdown_event

    def run(self) -> None:
        while not self.shutdown_event.is_set():
            try:
                self.client.heartbeat(runner_id=self.runner_id)
            except BackendClient.TransientError as e:
                log.warning(f"heartbeat failed (transient): {e}")
            except BackendClient.AuthError as e:
                # Auth fail means the backend forgot us (e.g., Redis loss).
                # Caller will need to re-register; for now just log.
                log.error(f"heartbeat auth failed: {e}")
            except Exception as e:  # defensive — never let thread die
                log.exception(f"heartbeat unexpected error: {e}")
            # Wait, but break early on shutdown
            self.shutdown_event.wait(timeout=self.interval_sec)
  • Step 4: Run tests to verify pass

Run: cd Sandbox && pytest tests/agent/test_heartbeat.py -v
Expected: 3 tests PASS

  • Step 5: Commit
cd Sandbox
git add agent/heartbeat.py tests/agent/test_heartbeat.py
git commit -m "feat(agent): add heartbeat daemon thread"

Task 5: result_sender.py — daemon thread delivering results with retry

Files:

  • Test: Sandbox/tests/agent/test_result_sender.py

  • Create: Sandbox/agent/result_sender.py

  • Step 1: Write failing tests

# Sandbox/tests/agent/test_result_sender.py
import queue
import threading
import time
from unittest.mock import MagicMock

import pytest

from agent.client import BackendClient
from agent.result_sender import ResultSenderThread, JobResult


def test_result_sender_delivers_one_result():
    client = MagicMock(spec=BackendClient)
    client.complete_job.return_value = "ok"
    result_queue: queue.Queue = queue.Queue()
    shutdown = threading.Event()

    sender = ResultSenderThread(
        client=client, runner_id="rn_1",
        result_queue=result_queue, shutdown_event=shutdown,
        retry_max_attempts=3, retry_initial_backoff_sec=0.01,
        retry_max_backoff_sec=0.1,
    )
    sender.start()
    result_queue.put(JobResult(job_id="jb_1", tasks=[{"status": "AC"}]))
    time.sleep(0.05)
    shutdown.set()
    sender.join(timeout=1)

    client.complete_job.assert_called_once_with(
        runner_id="rn_1", job_id="jb_1", tasks=[{"status": "AC"}]
    )


def test_result_sender_retries_on_transient_error():
    client = MagicMock(spec=BackendClient)
    client.complete_job.side_effect = [
        BackendClient.TransientError("first"),
        BackendClient.TransientError("second"),
        "ok",
    ]
    rq: queue.Queue = queue.Queue()
    shutdown = threading.Event()

    sender = ResultSenderThread(
        client=client, runner_id="rn_1",
        result_queue=rq, shutdown_event=shutdown,
        retry_max_attempts=5, retry_initial_backoff_sec=0.01,
        retry_max_backoff_sec=0.1,
    )
    sender.start()
    rq.put(JobResult(job_id="jb_1", tasks=[]))
    time.sleep(0.5)
    shutdown.set()
    sender.join(timeout=1)

    assert client.complete_job.call_count == 3


def test_result_sender_drops_on_reclaimed():
    """If backend says reclaimed (409), drop the result silently — no retry."""
    client = MagicMock(spec=BackendClient)
    client.complete_job.return_value = "reclaimed"
    rq: queue.Queue = queue.Queue()
    shutdown = threading.Event()

    sender = ResultSenderThread(
        client=client, runner_id="rn_1",
        result_queue=rq, shutdown_event=shutdown,
        retry_max_attempts=3, retry_initial_backoff_sec=0.01,
        retry_max_backoff_sec=0.1,
    )
    sender.start()
    rq.put(JobResult(job_id="jb_1", tasks=[]))
    time.sleep(0.05)
    shutdown.set()
    sender.join(timeout=1)

    assert client.complete_job.call_count == 1  # no retry


def test_result_sender_drops_on_not_found():
    """404 also drops — submission may have been deleted."""
    client = MagicMock(spec=BackendClient)
    client.complete_job.return_value = "not_found"
    rq: queue.Queue = queue.Queue()
    shutdown = threading.Event()
    sender = ResultSenderThread(
        client=client, runner_id="rn_1", result_queue=rq,
        shutdown_event=shutdown, retry_max_attempts=3,
        retry_initial_backoff_sec=0.01, retry_max_backoff_sec=0.1,
    )
    sender.start()
    rq.put(JobResult(job_id="jb_1", tasks=[]))
    time.sleep(0.05)
    shutdown.set()
    sender.join(timeout=1)
    assert client.complete_job.call_count == 1


def test_result_sender_gives_up_after_max_attempts():
    """After exhausting retries, give up but keep the thread alive for next job."""
    client = MagicMock(spec=BackendClient)
    client.complete_job.side_effect = BackendClient.TransientError("always")
    rq: queue.Queue = queue.Queue()
    shutdown = threading.Event()
    sender = ResultSenderThread(
        client=client, runner_id="rn_1", result_queue=rq,
        shutdown_event=shutdown, retry_max_attempts=3,
        retry_initial_backoff_sec=0.01, retry_max_backoff_sec=0.1,
    )
    sender.start()
    rq.put(JobResult(job_id="jb_1", tasks=[]))
    time.sleep(0.5)

    # Should have given up
    assert client.complete_job.call_count == 3

    # Thread still alive, ready for next job
    assert sender.is_alive()
    shutdown.set()
    sender.join(timeout=1)
  • Step 2: Run tests to verify failure

Run: cd Sandbox && pytest tests/agent/test_result_sender.py -v
Expected: FAIL

  • Step 3: Implement result_sender.py
# Sandbox/agent/result_sender.py
"""Result delivery daemon thread with exponential backoff retry."""
import logging
import queue
import threading
from dataclasses import dataclass
from typing import List

from .client import BackendClient

log = logging.getLogger(__name__)


@dataclass(frozen=True)
class JobResult:
    """Pending result waiting to be sent to backend."""
    job_id: str
    tasks: List[dict]


class ResultSenderThread(threading.Thread):
    """Drains result_queue, POSTs to backend, retries on transient errors."""

    def __init__(
        self,
        client: BackendClient,
        runner_id: str,
        result_queue: queue.Queue,
        shutdown_event: threading.Event,
        retry_max_attempts: int,
        retry_initial_backoff_sec: float,
        retry_max_backoff_sec: float,
    ):
        super().__init__(daemon=True, name="result_sender")
        self.client = client
        self.runner_id = runner_id
        self.result_queue = result_queue
        self.shutdown_event = shutdown_event
        self.retry_max_attempts = retry_max_attempts
        self.retry_initial_backoff_sec = retry_initial_backoff_sec
        self.retry_max_backoff_sec = retry_max_backoff_sec

    def run(self) -> None:
        while not (self.shutdown_event.is_set() and self.result_queue.empty()):
            try:
                job_result = self.result_queue.get(timeout=0.5)
            except queue.Empty:
                continue
            try:
                self._deliver_with_retry(job_result)
            except Exception as e:  # defensive
                log.exception(
                    f"result delivery for {job_result.job_id} crashed: {e}"
                )
            finally:
                self.result_queue.task_done()

    def _deliver_with_retry(self, jr: JobResult) -> None:
        backoff = self.retry_initial_backoff_sec
        for attempt in range(1, self.retry_max_attempts + 1):
            if self.shutdown_event.is_set() and attempt > 1:
                # Best-effort: try once on shutdown but don't block
                pass
            try:
                outcome = self.client.complete_job(
                    runner_id=self.runner_id,
                    job_id=jr.job_id,
                    tasks=jr.tasks,
                )
            except BackendClient.TransientError as e:
                log.warning(
                    f"complete_job {jr.job_id} attempt {attempt} failed: {e}"
                )
                if attempt == self.retry_max_attempts:
                    log.error(
                        f"giving up on {jr.job_id} after {attempt} attempts"
                    )
                    return
                self.shutdown_event.wait(timeout=backoff)
                backoff = min(backoff * 2, self.retry_max_backoff_sec)
                continue
            except BackendClient.AuthError as e:
                log.error(f"complete_job {jr.job_id} auth failed: {e}")
                return  # cannot retry without re-register
            # Outcome handling
            if outcome == "ok":
                log.info(f"delivered {jr.job_id}")
                return
            if outcome == "reclaimed":
                log.warning(f"{jr.job_id} was reclaimed; dropping result")
                return
            if outcome == "not_found":
                log.warning(f"{jr.job_id} not found on backend; dropping result")
                return
  • Step 4: Run tests to verify pass

Run: cd Sandbox && pytest tests/agent/test_result_sender.py -v
Expected: 5 tests PASS

  • Step 5: Commit
cd Sandbox
git add agent/result_sender.py tests/agent/test_result_sender.py
git commit -m "feat(agent): add result_sender thread with retry/backoff"

Task 6: poller.py — daemon thread polling backend for jobs

Files:

  • Test: Sandbox/tests/agent/test_poller.py

  • Create: Sandbox/agent/poller.py

  • Step 1: Write failing tests

# Sandbox/tests/agent/test_poller.py
import threading
import time
from unittest.mock import MagicMock, patch

from agent.client import BackendClient
from agent.poller import PollerThread


def _make_dispatcher(can_accept=True):
    d = MagicMock()
    d.has_capacity.return_value = can_accept
    return d


def test_poller_does_nothing_when_no_jobs():
    client = MagicMock(spec=BackendClient)
    client.next_job.return_value = None
    dispatcher = _make_dispatcher()
    shutdown = threading.Event()

    poller = PollerThread(
        client=client, runner_id="rn_1",
        dispatcher=dispatcher, poll_interval_sec=0.05,
        shutdown_event=shutdown,
    )
    poller.start()
    time.sleep(0.15)
    shutdown.set()
    poller.join(timeout=1)

    assert client.next_job.call_count >= 2
    dispatcher.handle.assert_not_called()


def test_poller_dispatches_job_when_received(tmp_path):
    """When a job comes back, poller downloads code and calls dispatcher."""
    client = MagicMock(spec=BackendClient)
    job_payload = {
        "job_id": "jb_1", "submission_id": "sub_1",
        "problem_id": 42, "language": 0,
        "code_url": "http://minio/code.zip", "checker": "",
        "tasks": [{"task_id": 0, "case_count": 1,
                   "memory_limit": 1024, "time_limit": 1000}],
    }
    client.next_job.side_effect = [job_payload, None, None]
    dispatcher = _make_dispatcher()
    shutdown = threading.Event()

    with patch("agent.poller.prepare_submission_dir_for_job") as prepare:
        poller = PollerThread(
            client=client, runner_id="rn_1",
            dispatcher=dispatcher, poll_interval_sec=0.05,
            shutdown_event=shutdown,
        )
        poller.start()
        time.sleep(0.15)
        shutdown.set()
        poller.join(timeout=1)

    prepare.assert_called_once()
    dispatcher.handle.assert_called_once_with(
        submission_id="sub_1", job_id="jb_1",
    )


def test_poller_skips_when_dispatcher_full():
    client = MagicMock(spec=BackendClient)
    dispatcher = _make_dispatcher(can_accept=False)
    shutdown = threading.Event()

    poller = PollerThread(
        client=client, runner_id="rn_1",
        dispatcher=dispatcher, poll_interval_sec=0.05,
        shutdown_event=shutdown,
    )
    poller.start()
    time.sleep(0.15)
    shutdown.set()
    poller.join(timeout=1)

    # When at capacity, poller should NOT call next_job
    client.next_job.assert_not_called()


def test_poller_swallows_transient_errors():
    client = MagicMock(spec=BackendClient)
    client.next_job.side_effect = [
        BackendClient.TransientError("boom"),
        None,
        None,
    ]
    dispatcher = _make_dispatcher()
    shutdown = threading.Event()

    poller = PollerThread(
        client=client, runner_id="rn_1",
        dispatcher=dispatcher, poll_interval_sec=0.05,
        shutdown_event=shutdown,
    )
    poller.start()
    time.sleep(0.2)
    shutdown.set()
    poller.join(timeout=1)

    # Despite first call failing, subsequent polls happen
    assert client.next_job.call_count >= 2
  • Step 2: Run tests to verify failure

Run: cd Sandbox && pytest tests/agent/test_poller.py -v
Expected: FAIL

  • Step 3: Implement poller.py

The poller needs a helper prepare_submission_dir_for_job that:

  • Downloads the code zip
  • Writes a meta.json that the existing dispatcher expects
  • Calls existing dispatcher.prepare_submission_dir(...) with the right args

Look at existing Sandbox/dispatcher/dispatcher.py::prepare_submission_dir (already there) and Sandbox/dispatcher/testdata.py for get_problem_root / get_problem_meta to understand testdata fetching. The new helper wires these together with the downloaded code.

# Sandbox/agent/poller.py
"""Poller daemon thread: pulls jobs from backend, hands to dispatcher."""
import logging
import tempfile
import threading

from dispatcher.constant import Language
from dispatcher.testdata import (
    ensure_testdata,
    get_problem_meta,
    get_problem_root,
)
from .client import BackendClient

log = logging.getLogger(__name__)


def prepare_submission_dir_for_job(dispatcher, job: dict, client: BackendClient):
    """Download code + ensure testdata + extract into dispatcher's submission dir.

    Reuses the existing dispatcher.prepare_submission_dir() — same testdata
    fetching path as the old POST /submit handler.
    """
    submission_id = job["submission_id"]
    problem_id = job["problem_id"]
    language = Language(job["language"])

    ensure_testdata(problem_id)
    meta = get_problem_meta(problem_id, language)

    with tempfile.NamedTemporaryFile(suffix=".zip") as tmp:
        client.download_code(job["code_url"], tmp.name)
        with open(tmp.name, "rb") as src:
            dispatcher.prepare_submission_dir(
                root_dir=dispatcher.SUBMISSION_DIR,
                submission_id=submission_id,
                meta=meta,
                source=src,
                testdata=get_problem_root(problem_id),
            )


class PollerThread(threading.Thread):
    """Polls backend for jobs and dispatches them to the internal dispatcher."""

    def __init__(
        self,
        client: BackendClient,
        runner_id: str,
        dispatcher,                 # existing Dispatcher instance
        poll_interval_sec: float,
        shutdown_event: threading.Event,
    ):
        super().__init__(daemon=True, name="poller")
        self.client = client
        self.runner_id = runner_id
        self.dispatcher = dispatcher
        self.poll_interval_sec = poll_interval_sec
        self.shutdown_event = shutdown_event

    def run(self) -> None:
        while not self.shutdown_event.is_set():
            if not self.dispatcher.has_capacity():
                self.shutdown_event.wait(timeout=0.5)
                continue
            try:
                job = self.client.next_job(runner_id=self.runner_id)
            except BackendClient.TransientError as e:
                log.warning(f"next_job failed: {e}")
                self.shutdown_event.wait(timeout=self.poll_interval_sec)
                continue
            except BackendClient.AuthError as e:
                log.error(f"next_job auth failed: {e}")
                self.shutdown_event.wait(timeout=self.poll_interval_sec)
                continue

            if job is None:
                self.shutdown_event.wait(timeout=self.poll_interval_sec)
                continue

            try:
                prepare_submission_dir_for_job(self.dispatcher, job, self.client)
                self.dispatcher.handle(
                    submission_id=job["submission_id"],
                    job_id=job["job_id"],
                )
                log.info(
                    f"dispatched submission={job['submission_id']} "
                    f"job={job['job_id']}"
                )
            except Exception as e:
                log.exception(
                    f"failed to dispatch job {job.get('job_id')}: {e}"
                )
                # Don't retry — just don't ack. Backend will reclaim after lease expiry.
  • Step 4: Run tests to verify pass

Run: cd Sandbox && pytest tests/agent/test_poller.py -v
Expected: 4 tests PASS

  • Step 5: Commit
cd Sandbox
git add agent/poller.py tests/agent/test_poller.py
git commit -m "feat(agent): add poller thread that pulls jobs and dispatches"

Phase 3: Dispatcher modifications

Task 7: Add has_capacity() and result_queue to Dispatcher

Files:

  • Modify: Sandbox/dispatcher/dispatcher.py

  • Modify: Sandbox/tests/test_dispatcher.py

  • Step 1: Write failing test

Add to Sandbox/tests/test_dispatcher.py:

def test_dispatcher_has_capacity_returns_true_when_queue_empty(docker_dispatcher):
    assert docker_dispatcher.has_capacity() is True


def test_dispatcher_exposes_result_queue(docker_dispatcher):
    """Dispatcher should expose a result_queue that result_sender drains."""
    import queue
    assert isinstance(docker_dispatcher.result_queue, queue.Queue)
  • Step 2: Run tests to verify failure

Run: cd Sandbox && pytest tests/test_dispatcher.py::test_dispatcher_has_capacity_returns_true_when_queue_empty tests/test_dispatcher.py::test_dispatcher_exposes_result_queue -v
Expected: FAIL — methods don't exist

  • Step 3: Add to Dispatcher.init

In Sandbox/dispatcher/dispatcher.py::Dispatcher.__init__, add:

# Result queue: completed submissions are pushed here for the agent's
# result_sender thread to deliver to backend.
self.result_queue: queue.Queue = queue.Queue()

# Map submission_id → job_id (set when handle() is called by poller)
self.job_ids: dict[str, str] = {}
  • Step 4: Add has_capacity method to Dispatcher
def has_capacity(self) -> bool:
    """Whether dispatcher can accept a new submission right now.

    Leaves 30% headroom — when queue gets near full, stop pulling new work.
    """
    return self.queue.qsize() < int(self.MAX_TASK_COUNT * 0.7)
  • Step 5: Run tests to verify pass

Run: cd Sandbox && pytest tests/test_dispatcher.py -v -k "has_capacity or result_queue"
Expected: PASS

  • Step 6: Commit
cd Sandbox
git add dispatcher/dispatcher.py tests/test_dispatcher.py
git commit -m "feat(dispatcher): add has_capacity() and result_queue"

Task 8: Modify handle() to accept job_id

Files:

  • Modify: Sandbox/dispatcher/dispatcher.py

  • Modify: Sandbox/tests/test_dispatcher.py

  • Step 1: Write failing test

def test_handle_records_job_id(docker_dispatcher, submission_generator):
    sg = submission_generator
    sub_id = sg.gen_random_submission_id()  # check existing test for actual API

    # Existing test uses handle(submission_id) — we add job_id param
    docker_dispatcher.handle(submission_id=sub_id, job_id="jb_xyz")

    assert docker_dispatcher.job_ids[sub_id] == "jb_xyz"

(If the existing test fixture pattern differs, adapt to whatever submission_generator.py provides for setting up a submission dir.)

  • Step 2: Run test to verify failure

Expected: FAIL — handle() doesn't accept job_id

  • Step 3: Modify handle() signature

In Sandbox/dispatcher/dispatcher.py::Dispatcher.handle:

def handle(self, submission_id: str, job_id: str = None):
    # ... existing body unchanged ...
    # After self.created_at[submission_id] = datetime.now():
    if job_id is not None:
        self.job_ids[submission_id] = job_id
    # ... rest unchanged

job_id=None default keeps any existing internal callers working (the in-process test fixtures don't go through poller).

  • Step 4: Run test to verify pass

Run: cd Sandbox && pytest tests/test_dispatcher.py -v -k "job_id"
Expected: PASS

  • Step 5: Commit
cd Sandbox
git add dispatcher/dispatcher.py tests/test_dispatcher.py
git commit -m "feat(dispatcher): accept job_id in handle() and track per submission"

Task 9: Replace direct PUT in on_submission_complete with result_queue push

Files:

  • Modify: Sandbox/dispatcher/dispatcher.py

  • Modify: Sandbox/tests/test_dispatcher.py

  • Step 1: Write failing test

def test_on_submission_complete_pushes_to_result_queue(docker_dispatcher):
    """When a submission is done, result lands in result_queue (no HTTP)."""
    from agent.result_sender import JobResult

    # Manually set up a complete submission (use existing test fixtures or
    # craft self.result and self.job_ids directly)
    docker_dispatcher.testing = False  # we want full path
    docker_dispatcher.job_ids["sub_1"] = "jb_1"
    docker_dispatcher.result["sub_1"] = (
        MagicMock(language=2),  # python doesn't compile
        {"0000": {"stdout": "", "stderr": "", "exitCode": 0,
                  "execTime": 1, "memoryUsage": 1, "status": "AC"}},
    )
    docker_dispatcher.locks["sub_1"] = threading.Lock()

    docker_dispatcher.on_submission_complete("sub_1")

    pushed = docker_dispatcher.result_queue.get_nowait()
    assert isinstance(pushed, JobResult)
    assert pushed.job_id == "jb_1"
    assert isinstance(pushed.tasks, list)
  • Step 2: Run test to verify failure

Expected: FAIL

  • Step 3: Replace on_submission_complete body

Find existing on_submission_complete in Sandbox/dispatcher/dispatcher.py (~lines 352-402). Replace its body:

def on_submission_complete(self, submission_id: str):
    if not self.contains(submission_id):
        raise SubmissionIdNotFoundError(f'{submission_id} not found!')
    if self.testing:
        logger().info(
            f'skip submission post processing in testing [submission_id={submission_id}]'
        )
        return True

    _, results = self.result[submission_id]
    # parse results into nested list-of-tasks shape (existing logic)
    submission_result = {}
    for no, r in results.items():
        task_no = int(no[:2])
        case_no = int(no[2:])
        submission_result.setdefault(task_no, {})[case_no] = r
    for task_no, cases in submission_result.items():
        assert [*cases.keys()] == [*range(len(cases))]
        submission_result[task_no] = [*cases.values()]
    assert [*submission_result.keys()] == [*range(len(submission_result))]
    submission_result = [*submission_result.values()]

    # Push to result_queue for agent's result_sender to deliver
    from agent.result_sender import JobResult
    job_id = self.job_ids.get(submission_id)
    if job_id is None:
        logger().error(
            f"submission_complete with no job_id mapping [submission_id={submission_id}]"
        )
        # Don't release — let lease expire and reclaim happen
        return

    self.result_queue.put(JobResult(job_id=job_id, tasks=submission_result))

    # Clean up local state. Files: keep on disk; let result_sender's success
    # signal trigger cleanup (or use file_manager.backup_data on permanent failure).
    # For simplicity in v1, clean immediately — we accept the trade-off that a
    # failed result delivery loses the local files (backup_data is no longer called).
    file_manager.clean_data(submission_id)
    self.release(submission_id)
    if submission_id in self.job_ids:
        del self.job_ids[submission_id]

This:

  • Removes direct requests.put call (now done by result_sender)

  • Removes the with tempfile.NamedTemporaryFile dance

  • Removes the SANDBOX_TOKEN reference

  • Removes file_manager.backup_data (rare failure mode; user has rejudge as fallback)

  • Step 4: Remove now-unused imports

In Sandbox/dispatcher/dispatcher.py top:

  • Remove import requests (or check it's still used elsewhere)

  • Remove import tempfile (same check)

  • Remove from .config import SANDBOX_TOKEN, BACKEND_API (no longer used)

  • Step 5: Run tests

Run: cd Sandbox && pytest tests/test_dispatcher.py -v
Expected: All pass (or document any pre-existing failures)

  • Step 6: Commit
cd Sandbox
git add dispatcher/dispatcher.py tests/test_dispatcher.py
git commit -m "refactor(dispatcher): push completed submissions to result_queue (was inline PUT)"

Phase 4: Entrypoint swap

Task 10: Create Sandbox/main.py — new entrypoint

Files:

  • Create: Sandbox/main.py

  • Step 1: Write the entrypoint

# Sandbox/main.py
"""Runner agent entrypoint.

Replaces the old Flask app — this process actively polls Backend instead of
listening for incoming HTTP. Spawns 4 daemon threads:
  - dispatcher (existing)
  - heartbeat
  - poller
  - result_sender

Graceful shutdown on SIGTERM/SIGINT: stop polling, drain result queue, exit.
"""
import logging
import os
import signal
import threading
import time

from agent import config as agent_config
from agent.client import BackendClient
from agent.heartbeat import HeartbeatThread
from agent.poller import PollerThread
from agent.registration import register_runner
from agent.result_sender import ResultSenderThread
from dispatcher.dispatcher import Dispatcher

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(name)s/%(threadName)s] %(levelname)s: %(message)s",
    handlers=[
        logging.FileHandler("logs/runner.log"),
        logging.StreamHandler(),
    ],
)
log = logging.getLogger("main")


def main():
    log.info("runner agent starting")
    shutdown_event = threading.Event()

    # 1. Register
    bootstrap_client = BackendClient()  # no token yet
    creds = register_runner(
        client=bootstrap_client,
        name=agent_config.RUNNER_NAME,
        registration_token=agent_config.RUNNER_REGISTRATION_TOKEN,
    )
    log.info(f"registered as {creds.runner_id}")

    # 2. Authenticated client used by all daemon threads
    client = BackendClient(rk_token=creds.token)

    # 3. Start dispatcher (existing)
    dispatcher_config_path = os.getenv(
        "DISPATCHER_CONFIG", ".config/dispatcher.json.example"
    )
    dispatcher = Dispatcher(dispatcher_config_path)
    dispatcher.start()
    log.info("dispatcher started")

    # 4. Start daemon threads
    heartbeat = HeartbeatThread(
        client=client,
        runner_id=creds.runner_id,
        interval_sec=creds.heartbeat_interval_sec,
        shutdown_event=shutdown_event,
    )
    poller = PollerThread(
        client=client,
        runner_id=creds.runner_id,
        dispatcher=dispatcher,
        poll_interval_sec=creds.poll_interval_sec,
        shutdown_event=shutdown_event,
    )
    sender = ResultSenderThread(
        client=client,
        runner_id=creds.runner_id,
        result_queue=dispatcher.result_queue,
        shutdown_event=shutdown_event,
        retry_max_attempts=agent_config.RESULT_RETRY_MAX_ATTEMPTS,
        retry_initial_backoff_sec=agent_config.RESULT_RETRY_INITIAL_BACKOFF_SEC,
        retry_max_backoff_sec=agent_config.RESULT_RETRY_MAX_BACKOFF_SEC,
    )
    heartbeat.start()
    poller.start()
    sender.start()
    log.info("all threads started")

    # 5. Wait for shutdown signal
    def handle_sig(signum, frame):
        log.info(f"received signal {signum}, shutting down")
        shutdown_event.set()
        dispatcher.stop()

    signal.signal(signal.SIGTERM, handle_sig)
    signal.signal(signal.SIGINT, handle_sig)

    while not shutdown_event.is_set():
        time.sleep(1)

    # 6. Graceful drain
    log.info("waiting for in-flight work to complete (max 60s)")
    sender.join(timeout=60)
    log.info("runner agent exiting")


if __name__ == "__main__":
    main()
  • Step 2: Verify it imports cleanly (won't actually start because no backend)

Run: cd Sandbox && python -c "import main; print('ok')"
Expected: prints ok

  • Step 3: Commit
cd Sandbox
git add main.py
git commit -m "feat(agent): add main.py entrypoint wiring all threads"

Task 11: Delete Sandbox/app.py and Sandbox/gunicorn.conf.py

Files:

  • Delete: Sandbox/app.py

  • Delete: Sandbox/gunicorn.conf.py

  • Step 1: Verify nothing else imports from app.py

Run: cd Sandbox && grep -rn "from app\|import app" --include="*.py" .
Expected: No results (or only in main.py if you accidentally referenced it — fix that)

  • Step 2: Delete files
cd Sandbox
git rm app.py gunicorn.conf.py
  • Step 3: Run tests to ensure nothing broke

Run: cd Sandbox && pytest -v
Expected: All non-Docker-requiring tests pass. (Some tests need ./build.sh to have run — those would skip or fail in a clean env; check existing CI workflow for what's expected.)

  • Step 4: Commit
cd Sandbox
git add app.py gunicorn.conf.py
git commit -m "refactor: remove Flask app.py and gunicorn config (replaced by main.py)"

Task 12: Update Dockerfiles to use new entrypoint

Files:

  • Modify: Sandbox/Dockerfile

  • Modify: Sandbox/Dockerfile.prod (if exists)

  • Modify: Sandbox/requirements.txt (drop flask + gunicorn)

  • Step 1: Update Dockerfile CMD

Replace Sandbox/Dockerfile:

FROM python:3.13-alpine

WORKDIR /app

# install dependencies
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Make logs directory exist
RUN mkdir -p logs

CMD ["python", "main.py"]
  • Step 2: Update Dockerfile.prod (same change)

Check if Sandbox/Dockerfile.prod exists; if so apply the same CMD change.

  • Step 3: Drop flask + gunicorn from requirements.txt

Edit Sandbox/requirements.txt — remove these two lines:

gunicorn~=20.1
flask~=2.0

Keep: docker, requests, yapf, pydantic, redis (verify which are actually still imported anywhere; use grep -rn "import flask\|from flask").

  • Step 4: Build the image to verify

Run from meta-repo root: docker compose build sandbox
Expected: Image builds successfully

  • Step 5: Commit
cd Sandbox
git add Dockerfile Dockerfile.prod requirements.txt
git commit -m "build(sandbox): use main.py entrypoint; drop flask + gunicorn"

Phase 5: Infra changes (meta-repo)

Task 13: Update docker-compose.yml — Redis AOF + new env vars

Files:

  • Modify: docker-compose.yml (meta-repo root)
  • Modify: .secret.example/sandbox.env
  • Modify: .secret.example/web.env

Working dir for git commands: meta-repo root /Users/as535364/Downloads/Project/Normal-OJ. This commits to the meta-repo, not a submodule.

  • Step 1: Modify Redis service in docker-compose.yml
# In docker-compose.yml — modify the redis service:
  redis:
    image: redis:7-alpine
    volumes:
      - ./redis-data:/data
    command: ["redis-server", "--appendonly", "yes", "--appendfsync", "everysec"]
  • Step 2: Add env vars to web service
# In docker-compose.yml — modify the web service environment:
  web:
    build: ./Back-End
    environment:
      - MONGO_HOST=mongo
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - RUNNER_REGISTRATION_TOKEN=${RUNNER_REGISTRATION_TOKEN:-dev-only-registration-token-change-me}
  • Step 3: Add env vars to sandbox service
  sandbox:
    build: ./Sandbox
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    environment:
      - BACKEND_URL=http://web:8080
      - RUNNER_REGISTRATION_TOKEN=${RUNNER_REGISTRATION_TOKEN:-dev-only-registration-token-change-me}
      - RUNNER_NAME=sandbox-default
  • Step 4: Update .secret.example/web.env

Append (or modify if existing):

RUNNER_REGISTRATION_TOKEN=<generate a long random secret>
  • Step 5: Update .secret.example/sandbox.env

Append:

BACKEND_URL=http://web:8080
RUNNER_REGISTRATION_TOKEN=<same value as web.env>
RUNNER_NAME=sandbox-prod-1
  • Step 6: Verify compose config

Run: docker compose config | grep -A5 redis
Expected: Shows the appendonly command

  • Step 7: Commit (meta-repo)
cd /Users/as535364/Downloads/Project/Normal-OJ
git add docker-compose.yml .secret.example/web.env .secret.example/sandbox.env
git commit -m "chore(infra): enable Redis AOF; add RUNNER_REGISTRATION_TOKEN env vars"

Task 14: Submodule pointer bumps in meta-repo

Files:

  • Modify: Back-End submodule pointer
  • Modify: Sandbox submodule pointer

After Plan A and Plan B are merged into their respective submodule's main branch, the meta-repo needs its pointer bumped.

  • Step 1: Update Back-End pointer
cd /Users/as535364/Downloads/Project/Normal-OJ/Back-End
git checkout main
git pull
cd ..
git add Back-End
  • Step 2: Update Sandbox pointer
cd /Users/as535364/Downloads/Project/Normal-OJ/Sandbox
git checkout main
git pull
cd ..
git add Sandbox
  • Step 3: Commit (meta-repo)
cd /Users/as535364/Downloads/Project/Normal-OJ
git commit -m "chore: bump Back-End and Sandbox to pull-based dispatch"

Phase 6: Smoke test

Task 15: End-to-end smoke test (manual, with checklist)

No code changes — verification only.

This task verifies the whole stack works after both plans. Run in dev environment.

  • Step 1: Bring up the stack
cd /Users/as535364/Downloads/Project/Normal-OJ
docker compose down
docker compose up --build -d
sleep 30  # let everything boot
  • Step 2: Verify backend health
curl http://localhost:8080/api/health

Expected: 200 with {"mongo": true, "redis": true}

  • Step 3: Verify sandbox registered as runner
docker compose logs sandbox | grep "registered as"

Expected: log line registered as rn_...

  • Step 4: Verify runner appears in Redis
docker compose exec redis redis-cli SMEMBERS runners:registered

Expected: At least one rn_... ID

  • Step 5: Submit a test submission

Login as first_admin / firstpasswordforadmin via the Vue app at http://localhost:8080. Create a problem with one simple testcase. Submit a trivial AC solution.

Watch its status:

  • Initial: Pending

  • Within ~10s: should transition to AC (or whatever is appropriate)

  • Step 6: Verify the job flow in logs

docker compose logs sandbox | tail -50
docker compose logs web | grep "complete"

Expected:

  • Sandbox log shows dispatched submission=... then delivered jb_...

  • Backend log shows the complete endpoint being hit

  • Step 7: Failure injection — kill sandbox mid-submission

In one terminal:

docker compose kill sandbox

In another, submit a long-running submission. Wait 30 seconds. Restart:

docker compose up -d sandbox

Within ~60s, the submission should reclaim and complete (status moves from Pending to a final status).

  • Step 8: Document results in commit message and PR description

If anything fails, file specific issues with logs. Otherwise, mark Plan B complete.


Plan B Done — Verification Checklist

  • Tasks 1-14 committed in their respective repos
  • Sandbox/: pytest -v passes (excluding tests requiring Docker)
  • Sandbox/: yapf -rd . shows no diff
  • Smoke test (Task 15) passed
  • Spec sections 10, 11, 12, 14 all implemented
  • No leftover flask / gunicorn imports in Sandbox

Post-Plan Tasks (out of scope here)

  • Frontend "Sandbox 設定" admin page → "Runner list" page (Spec section 17 future work; can wait)
  • Per-job progress reporting for zombie detection (v2)
  • Long polling upgrade (v2)

as535364 added 14 commits April 28, 2026 04:17
Addresses code review findings:
- yapf -ir on client.py and test_client.py to satisfy CI
- 3 new tests for download_code (happy path, 4xx, network error)
… inline PUT)

Replace on_submission_complete's inline HTTP PUT with a push to result_queue
so the agent's result_sender thread handles delivery. Remove now-unused
imports (requests, tempfile). Add two TDD tests verifying queue push and
state cleanup behaviour.
- delete app.py and gunicorn.conf.py
- Dockerfile CMD: python main.py (+ mkdir -p logs)
- requirements.txt: drop flask + gunicorn (no longer needed)
- dispatcher/utils.py: update logger fallback from 'gunicorn.error' to 'dispatcher'

Sandbox is now an active runner agent that polls Backend, not a passive
HTTP server. The pull-loop modules in agent/ replace the old Flask routes.
After Plan B's removal of SANDBOX_TOKEN, dispatcher/testdata.py needs to
authenticate to backend's testdata endpoints with the runner's own
rk_token (set after registration).

- Module-level _RK_TOKEN with set_runner_token() setter
- fetch_problem_meta, fetch_testdata, get_checksum use it
- main.py calls dispatcher_testdata.set_runner_token(creds.token) after registration
main.py is no longer a Flask app — current_app fallback is dead code.
Caused ModuleNotFoundError when running the sandbox container.
@as535364 as535364 marked this pull request as draft April 28, 2026 06:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant