Skip to content

Feat/pull based job dispatch#336

Draft
as535364 wants to merge 22 commits intomainfrom
feat/pull-based-job-dispatch
Draft

Feat/pull based job dispatch#336
as535364 wants to merge 22 commits intomainfrom
feat/pull-based-job-dispatch

Conversation

@as535364
Copy link
Copy Markdown
Member

@as535364 as535364 commented Apr 28, 2026

Pull-Based Job Dispatch — Backend Implementation Plan (Plan A)

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Refactor Back-End submission dispatch from "push to Sandbox" to "Sandbox-pulls-from-Redis-queue", with self-registration, heartbeat, and orphan-job reclaim. Runner-side changes deferred to Plan B.

Architecture: Add dispatch/ module (Redis-backed runner registry + job queue) and model/runner.py blueprint exposing 4 endpoints (register / heartbeat / next-job / complete). Modify mongo/submission.py::submit() to enqueue to Redis instead of POSTing to Sandbox. Remove all push-related code paths. Tests use a "mock runner" (HTTP client simulating Sandbox) to verify the full flow without needing the real Sandbox.

Tech Stack: Python 3.11, Flask, mongoengine, redis-py (with fakeredis in tests), Pydantic, ULID, pytest, mongomock, testcontainers.

Working directory for this plan: Back-End/ (all file paths in this plan are relative to Back-End/ unless stated otherwise).


Spec Reference

Source design: docs/superpowers/specs/2026-04-28-pull-based-job-dispatch-design.md

This Plan A covers Sections 5, 6, 7, 8, 9, 13 (Backend pieces only). Plan B will cover Section 10 (Runner refactor) + Sections 11-12 (Infra/Migration).


File Structure

New files (Back-End)

dispatch/
├── __init__.py             # Re-export public API
├── config.py               # Env vars + constants (RUNNER_REGISTRATION_TOKEN, intervals, MAX_ATTEMPTS)
├── redis_keys.py           # Centralized Redis key naming
├── runner.py               # Runner: register, verify_token, renew_alive, is_alive, list_runners
├── job.py                  # Job: enqueue, claim_next_job, complete_job, internal helpers
└── scripts.py              # Lua reclaim script

model/
├── runner.py               # Blueprint with 4 endpoints
├── schemas/
│   └── runner.py           # Pydantic schemas
└── utils/
    └── runner_auth.py      # @require_runner_token decorator

tests/
├── unittest/
│   ├── dispatch/
│   │   ├── __init__.py
│   │   ├── test_runner.py          # Unit tests for dispatch/runner.py
│   │   ├── test_job.py             # Unit tests for dispatch/job.py
│   │   └── test_reclaim_script.py  # Unit tests for Lua script atomicity
│   └── test_runner_api.py          # Tests for model/runner.py blueprint
└── integration/
    ├── __init__.py
    └── test_runner_flow.py         # End-to-end mock runner tests

Modified files

mongo/submission.py         # submit() + rejudge() use enqueue_job; remove send/target_sandbox/etc
mongo/engine.py             # Remove uses of SubmissionConfig.sandbox_instances (keep field)
model/submission.py         # Remove PUT /<submission>/complete + PUT /config endpoints
model/schemas/submission.py # Remove OnSubmissionCompleteBody + UpdateConfigBody
model/schemas/__init__.py   # Remove exports of removed schemas
app.py                      # Register runner_api blueprint
tests/conftest.py           # Add redis cleanup fixture

Deleted files

mongo/sandbox.py            # find_by_token no longer needed

Phases

  • Phase 1: dispatch/ module foundation (Tasks 1-5)
  • Phase 2: Runner HTTP API blueprint (Tasks 6-9)
  • Phase 3: Submission flow integration (Tasks 10-12)
  • Phase 4: Cleanup — remove old push code (Tasks 13-18)
  • Phase 5: End-to-end integration tests (Tasks 19-21)

Phase 1: dispatch/ module foundation

Task 1: Create dispatch package skeleton + config + redis_keys

Files:

  • Create: dispatch/__init__.py

  • Create: dispatch/config.py

  • Create: dispatch/redis_keys.py

  • Create: tests/unittest/dispatch/__init__.py

  • Step 1: Create empty package init

# dispatch/__init__.py
"""Dispatch module: Redis-backed runner registry and job queue.

Public API re-exported from submodules.
"""
  • Step 2: Create config module
# dispatch/config.py
"""Configuration for dispatch module — env vars and tuning constants."""
import os

# Shared secret used by runners to register with backend.
# In production, set via env var (RUNNER_REGISTRATION_TOKEN).
# Default value is for tests/dev only.
RUNNER_REGISTRATION_TOKEN: str = os.getenv(
    "RUNNER_REGISTRATION_TOKEN",
    "dev-only-registration-token-change-me",
)

# Heartbeat / lease parameters
HEARTBEAT_INTERVAL_SEC: int = 15
RUNNER_ALIVE_TTL_SEC: int = 30           # 2x heartbeat
POLL_INTERVAL_SEC: int = 3
MAX_CONCURRENT_JOBS_PER_RUNNER: int = 8

# Job retry policy
MAX_ATTEMPTS: int = 3                     # 1 initial + 2 reclaims, then mark JE

# Presigned URL TTL for code download (seconds)
CODE_PRESIGNED_URL_TTL_SEC: int = 3600    # 1 hour
  • Step 3: Create redis_keys module
# dispatch/redis_keys.py
"""Centralized Redis key naming. All Redis keys used by dispatch live here."""

# Runner namespace
RUNNERS_REGISTERED = "runners:registered"

def runner_meta_key(rn_id: str) -> str:
    return f"runner:{rn_id}:meta"

def runner_token_key(rn_id: str) -> str:
    return f"runner:{rn_id}:token_hash"

def runner_alive_key(rn_id: str) -> str:
    return f"runner:{rn_id}:alive"

# Job namespace
JOBS_PENDING = "jobs:pending"
JOBS_LEASED = "jobs:leased"

def job_key(jb_id: str) -> str:
    return f"job:{jb_id}"
  • Step 4: Create test package init
# tests/unittest/dispatch/__init__.py
  • Step 5: Commit
cd Back-End
git add dispatch/__init__.py dispatch/config.py dispatch/redis_keys.py tests/unittest/dispatch/__init__.py
git commit -m "feat(dispatch): scaffold dispatch package with config and redis_keys"

Task 2: Implement dispatch/runner.py with TDD

Files:

  • Test: tests/unittest/dispatch/test_runner.py

  • Create: dispatch/runner.py

  • Step 1: Write failing tests for register / verify_token / renew_alive / is_alive

# tests/unittest/dispatch/test_runner.py
import pytest
from mongo.utils import RedisCache
from dispatch import runner as runner_mod
from dispatch.redis_keys import (
    RUNNERS_REGISTERED,
    runner_meta_key,
    runner_token_key,
    runner_alive_key,
)


@pytest.fixture(autouse=True)
def clear_redis():
    """Clear Redis state between tests (fakeredis persists across tests)."""
    RedisCache().client.flushdb()
    yield
    RedisCache().client.flushdb()


def test_register_returns_id_and_token():
    rn_id, rk_token = runner_mod.register(name="my-runner", registration_ip="1.2.3.4")
    assert rn_id.startswith("rn_")
    assert rk_token.startswith("rk_")
    assert len(rk_token) > 30  # actually random


def test_register_persists_to_redis():
    rn_id, rk_token = runner_mod.register(name="my-runner", registration_ip="1.2.3.4")
    rds = RedisCache().client

    # Meta hash exists with name + ip
    meta = rds.hgetall(runner_meta_key(rn_id))
    assert meta[b"name"] == b"my-runner"
    assert meta[b"registration_ip"] == b"1.2.3.4"
    assert b"registered_at" in meta

    # Token hash exists (NOT plaintext token)
    stored_hash = rds.get(runner_token_key(rn_id))
    assert stored_hash is not None
    assert stored_hash.decode() != rk_token  # plaintext NOT stored

    # In registered set
    assert rds.sismember(RUNNERS_REGISTERED, rn_id)

    # Initial alive key exists with TTL
    assert rds.exists(runner_alive_key(rn_id))
    assert 0 < rds.ttl(runner_alive_key(rn_id)) <= 30


def test_register_each_call_unique_token():
    _, t1 = runner_mod.register(name="a", registration_ip="1.1.1.1")
    _, t2 = runner_mod.register(name="b", registration_ip="1.1.1.1")
    assert t1 != t2


def test_verify_token_returns_true_for_correct_token():
    rn_id, rk_token = runner_mod.register(name="x", registration_ip="1.1.1.1")
    assert runner_mod.verify_token(rn_id, rk_token) is True


def test_verify_token_returns_false_for_wrong_token():
    rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1")
    assert runner_mod.verify_token(rn_id, "rk_wrongtoken") is False


def test_verify_token_returns_false_for_unknown_runner():
    assert runner_mod.verify_token("rn_nonexistent", "rk_anything") is False


def test_renew_alive_resets_ttl():
    rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1")
    rds = RedisCache().client
    # Manually set short TTL
    rds.expire(runner_alive_key(rn_id), 5)
    assert rds.ttl(runner_alive_key(rn_id)) <= 5

    runner_mod.renew_alive(rn_id)
    assert 25 < rds.ttl(runner_alive_key(rn_id)) <= 30


def test_is_alive_returns_true_when_key_exists():
    rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1")
    assert runner_mod.is_alive(rn_id) is True


def test_is_alive_returns_false_when_key_expired():
    rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1")
    RedisCache().client.delete(runner_alive_key(rn_id))
    assert runner_mod.is_alive(rn_id) is False
  • Step 2: Run tests to verify failure

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_runner.py -v
Expected: FAIL with ImportError: cannot import name 'runner' from 'dispatch' (or similar)

  • Step 3: Implement dispatch/runner.py
# dispatch/runner.py
"""Runner lifecycle: registration, token verification, liveness tracking."""
import hashlib
import secrets
from datetime import datetime, timezone

from ulid import ULID

from mongo.utils import RedisCache
from .config import RUNNER_ALIVE_TTL_SEC
from .redis_keys import (
    RUNNERS_REGISTERED,
    runner_alive_key,
    runner_meta_key,
    runner_token_key,
)


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _hash_token(rk_token: str) -> str:
    return hashlib.sha256(rk_token.encode()).hexdigest()


def register(name: str, registration_ip: str) -> tuple[str, str]:
    """Register a new runner. Returns (rn_id, rk_token).

    rk_token is returned in plaintext only here — backend stores only the hash.
    """
    rn_id = f"rn_{ULID()}"
    rk_token = f"rk_{secrets.token_urlsafe(32)}"
    rds = RedisCache().client

    rds.hset(
        runner_meta_key(rn_id),
        mapping={
            "name": name,
            "registered_at": _now_iso(),
            "registration_ip": registration_ip,
        },
    )
    rds.set(runner_token_key(rn_id), _hash_token(rk_token))
    rds.sadd(RUNNERS_REGISTERED, rn_id)
    rds.set(runner_alive_key(rn_id), "1", ex=RUNNER_ALIVE_TTL_SEC)
    return rn_id, rk_token


def verify_token(rn_id: str, rk_token: str) -> bool:
    """Verify a runner's token. Returns False if rn_id unknown or token mismatch."""
    rds = RedisCache().client
    stored = rds.get(runner_token_key(rn_id))
    if stored is None:
        return False
    expected_hash = stored.decode() if isinstance(stored, bytes) else stored
    actual_hash = _hash_token(rk_token)
    return secrets.compare_digest(expected_hash, actual_hash)


def renew_alive(rn_id: str) -> None:
    """Refresh runner alive TTL. Called on every heartbeat."""
    RedisCache().client.set(
        runner_alive_key(rn_id), "1", ex=RUNNER_ALIVE_TTL_SEC
    )


def is_alive(rn_id: str) -> bool:
    """True if this runner has a non-expired alive key."""
    return bool(RedisCache().client.exists(runner_alive_key(rn_id)))
  • Step 4: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_runner.py -v
Expected: All 8 tests PASS

  • Step 5: Commit
cd Back-End
git add dispatch/runner.py tests/unittest/dispatch/test_runner.py
git commit -m "feat(dispatch): add runner registration and liveness tracking"

Task 3: Implement dispatch/scripts.py (Lua reclaim) with TDD

Files:

  • Test: tests/unittest/dispatch/test_reclaim_script.py

  • Create: dispatch/scripts.py

  • Step 1: Write failing tests

# tests/unittest/dispatch/test_reclaim_script.py
"""Tests for the atomic reclaim Lua script.

The script must be atomic: if two runners try to reclaim the same orphan job,
only one should succeed. It must also enforce the max-attempts limit.
"""
import pytest
from datetime import datetime, timezone
from mongo.utils import RedisCache
from dispatch.scripts import reclaim_orphan_atomic
from dispatch.redis_keys import job_key, JOBS_LEASED


@pytest.fixture(autouse=True)
def clear_redis():
    RedisCache().client.flushdb()
    yield
    RedisCache().client.flushdb()


def _seed_leased_job(jb_id: str, owner: str, attempts: int = 1):
    rds = RedisCache().client
    rds.hset(job_key(jb_id), mapping={
        "leased_by": owner,
        "leased_at": datetime.now(timezone.utc).isoformat(),
        "attempts": attempts,
    })
    rds.sadd(JOBS_LEASED, jb_id)


def test_reclaim_succeeds_when_owner_matches():
    _seed_leased_job("jb_1", owner="rn_old", attempts=1)

    result = reclaim_orphan_atomic(
        jb_id="jb_1",
        expected_owner="rn_old",
        new_owner="rn_new",
        max_attempts=3,
    )

    assert result == 1  # success
    rds = RedisCache().client
    assert rds.hget(job_key("jb_1"), "leased_by") == b"rn_new"
    assert int(rds.hget(job_key("jb_1"), "attempts")) == 2  # incremented


def test_reclaim_fails_when_owner_changed():
    """Another runner already reclaimed it before us."""
    _seed_leased_job("jb_1", owner="rn_someone_else", attempts=1)

    result = reclaim_orphan_atomic(
        jb_id="jb_1",
        expected_owner="rn_old",       # we expected rn_old
        new_owner="rn_new",
        max_attempts=3,
    )

    assert result == 0  # not reclaimed
    assert RedisCache().client.hget(job_key("jb_1"), "leased_by") == b"rn_someone_else"


def test_reclaim_returns_negative_when_max_attempts_reached():
    _seed_leased_job("jb_1", owner="rn_old", attempts=3)

    result = reclaim_orphan_atomic(
        jb_id="jb_1",
        expected_owner="rn_old",
        new_owner="rn_new",
        max_attempts=3,
    )

    assert result == -1  # exhausted
    # Job removed from leased set so caller can mark Submission JE
    assert not RedisCache().client.sismember(JOBS_LEASED, "jb_1")


def test_reclaim_is_atomic_under_concurrent_calls():
    """Simulate two runners reclaiming the same orphan at once.

    Lua scripts are atomic in Redis, so even back-to-back calls can't both succeed.
    """
    _seed_leased_job("jb_1", owner="rn_old", attempts=1)

    r1 = reclaim_orphan_atomic("jb_1", "rn_old", "rn_new1", max_attempts=3)
    r2 = reclaim_orphan_atomic("jb_1", "rn_old", "rn_new2", max_attempts=3)

    assert r1 == 1   # first wins
    assert r2 == 0   # second sees owner already changed
    assert RedisCache().client.hget(job_key("jb_1"), "leased_by") == b"rn_new1"
  • Step 2: Run tests to verify failure

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_reclaim_script.py -v
Expected: FAIL with ImportError: cannot import name 'reclaim_orphan_atomic'

  • Step 3: Implement dispatch/scripts.py
# dispatch/scripts.py
"""Lua scripts for atomic Redis operations."""
from datetime import datetime, timezone

from mongo.utils import RedisCache
from .redis_keys import job_key, JOBS_LEASED


# Lua reclaim script:
#   KEYS[1] = job:<jb_id>
#   ARGV[1] = expected_owner (rn_id we expect to currently hold the lease)
#   ARGV[2] = new_owner      (rn_id taking over)
#   ARGV[3] = leased_at      (ISO timestamp string)
#   ARGV[4] = max_attempts   (string-encoded int)
#   ARGV[5] = jobs_leased_set_name
#
# Return:
#    1 = success (lease transferred, attempts incremented)
#    0 = failed (owner changed before we could reclaim)
#   -1 = exhausted (attempts >= max_attempts; job removed from leased set)
_RECLAIM_LUA = """
local current_owner = redis.call('HGET', KEYS[1], 'leased_by')
if current_owner ~= ARGV[1] then
    return 0
end

local attempts = tonumber(redis.call('HGET', KEYS[1], 'attempts')) or 0
local max_attempts = tonumber(ARGV[4])
if attempts >= max_attempts then
    redis.call('SREM', ARGV[5], string.sub(KEYS[1], 5))
    return -1
end

redis.call('HSET', KEYS[1], 'leased_by', ARGV[2], 'leased_at', ARGV[3])
redis.call('HINCRBY', KEYS[1], 'attempts', 1)
return 1
"""


_script_cache = {}


def _get_reclaim_script():
    """Lazily register the script (so we get a fresh script per Redis client)."""
    rds = RedisCache().client
    cache_key = id(rds)
    if cache_key not in _script_cache:
        _script_cache[cache_key] = rds.register_script(_RECLAIM_LUA)
    return _script_cache[cache_key]


def reclaim_orphan_atomic(
    jb_id: str,
    expected_owner: str,
    new_owner: str,
    max_attempts: int,
) -> int:
    """Atomically transfer lease from expected_owner to new_owner.

    Returns:
        1 if reclaimed
        0 if owner changed before we could reclaim
       -1 if attempts exhausted (job removed from JOBS_LEASED set)
    """
    script = _get_reclaim_script()
    leased_at = datetime.now(timezone.utc).isoformat()
    return int(script(
        keys=[job_key(jb_id)],
        args=[expected_owner, new_owner, leased_at, max_attempts, JOBS_LEASED],
    ))
  • Step 4: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_reclaim_script.py -v
Expected: All 4 tests PASS

  • Step 5: Commit
cd Back-End
git add dispatch/scripts.py tests/unittest/dispatch/test_reclaim_script.py
git commit -m "feat(dispatch): add atomic Lua reclaim script for orphan jobs"

Task 4: Implement dispatch/job.py — enqueue + claim_next_job (pending path)

Files:

  • Test: tests/unittest/dispatch/test_job.py
  • Create: dispatch/job.py

This task covers the basic enqueue + claim from pending queue. Orphan reclaim path is added in Task 5.

  • Step 1: Write failing tests for enqueue + claim from pending
# tests/unittest/dispatch/test_job.py
import json
from unittest.mock import MagicMock

import pytest
from mongo.utils import RedisCache
from dispatch import job as job_mod
from dispatch.redis_keys import job_key, JOBS_PENDING, JOBS_LEASED


@pytest.fixture(autouse=True)
def clear_redis():
    RedisCache().client.flushdb()
    yield
    RedisCache().client.flushdb()


def _make_submission(submission_id="sub_1", problem_id=42, language=1,
                     code_minio_path="submissions/sub_1.zip"):
    """Build a minimal submission-like object for enqueue_job tests."""
    sub = MagicMock()
    sub.id = submission_id
    sub.problem_id = problem_id
    sub.language = language
    sub.code_minio_path = code_minio_path
    sub.problem.test_case_info = {
        "tasks": [{"caseCount": 3, "memoryLimit": 65536, "timeLimit": 1000}]
    }
    return sub


def test_enqueue_job_creates_hash_and_pushes_to_pending():
    sub = _make_submission()

    jb_id = job_mod.enqueue_job(sub)

    assert jb_id.startswith("jb_")
    rds = RedisCache().client

    # Hash created with all expected fields
    h = rds.hgetall(job_key(jb_id))
    assert h[b"submission_id"] == b"sub_1"
    assert h[b"problem_id"] == b"42"
    assert h[b"language"] == b"1"
    assert h[b"code_minio_path"] == b"submissions/sub_1.zip"
    assert h[b"attempts"] == b"0"
    assert b"created_at" in h
    assert b"tasks_meta_json" in h

    # Pushed to pending queue
    assert rds.lrange(JOBS_PENDING, 0, -1) == [jb_id.encode()]


def test_claim_next_job_from_empty_queue_returns_none():
    assert job_mod.claim_next_job(rn_id="rn_1") is None


def test_claim_next_job_from_pending_queue():
    sub = _make_submission()
    jb_id = job_mod.enqueue_job(sub)

    payload = job_mod.claim_next_job(rn_id="rn_1")

    assert payload is not None
    assert payload["job_id"] == jb_id
    assert payload["submission_id"] == "sub_1"
    assert payload["problem_id"] == 42
    assert payload["language"] == 1
    assert payload["code_minio_path"] == "submissions/sub_1.zip"
    assert "tasks" in payload  # parsed from tasks_meta_json

    # Side effects in Redis
    rds = RedisCache().client
    assert rds.hget(job_key(jb_id), "leased_by") == b"rn_1"
    assert rds.sismember(JOBS_LEASED, jb_id)
    assert int(rds.hget(job_key(jb_id), "attempts")) == 1
    # Removed from pending
    assert rds.llen(JOBS_PENDING) == 0


def test_claim_next_job_is_fifo():
    """First enqueued = first claimed."""
    j1 = job_mod.enqueue_job(_make_submission(submission_id="sub_1"))
    j2 = job_mod.enqueue_job(_make_submission(submission_id="sub_2"))

    p1 = job_mod.claim_next_job(rn_id="rn_a")
    p2 = job_mod.claim_next_job(rn_id="rn_b")

    assert p1["job_id"] == j1
    assert p2["job_id"] == j2
  • Step 2: Run tests to verify failure

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_job.py -v
Expected: FAIL with ImportError: cannot import name 'job' or similar

  • Step 3: Implement dispatch/job.py (pending path only)
# dispatch/job.py
"""Job lifecycle: enqueue, claim (pending + orphan reclaim), complete."""
import json
from datetime import datetime, timezone
from typing import Optional

from ulid import ULID

from mongo.utils import RedisCache
from .config import MAX_ATTEMPTS
from .redis_keys import job_key, JOBS_PENDING, JOBS_LEASED
from .runner import is_alive
from .scripts import reclaim_orphan_atomic


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _build_tasks_meta(submission) -> list[dict]:
    """Extract testcase metadata from submission for runner."""
    tasks = submission.problem.test_case_info.get("tasks", [])
    return [
        {
            "task_id": idx,
            "case_count": t.get("caseCount", 0),
            "memory_limit": t.get("memoryLimit", 0),
            "time_limit": t.get("timeLimit", 0),
        }
        for idx, t in enumerate(tasks)
    ]


def enqueue_job(submission) -> str:
    """Create a Job from a Submission and push to pending queue. Returns jb_id."""
    jb_id = f"jb_{ULID()}"
    rds = RedisCache().client

    rds.hset(job_key(jb_id), mapping={
        "submission_id": str(submission.id),
        "problem_id": submission.problem_id,
        "language": submission.language,
        "code_minio_path": submission.code_minio_path,
        "checker": 'print("not implement yet. qaq")',
        "tasks_meta_json": json.dumps(_build_tasks_meta(submission)),
        "attempts": 0,
        "created_at": _now_iso(),
    })
    rds.lpush(JOBS_PENDING, jb_id)
    return jb_id


def claim_next_job(rn_id: str) -> Optional[dict]:
    """Try to claim next job for this runner. Returns None if no job available.

    Strategy:
        1. Try pending queue (RPOP for FIFO; LPUSH+RPOP gives FIFO).
        2. If empty, scan leased jobs for orphans (owner not alive) and try to
           reclaim one atomically.
        3. If neither yields a job, return None.
    """
    rds = RedisCache().client

    # Step 1: pending queue
    jb_id_bytes = rds.rpop(JOBS_PENDING)
    if jb_id_bytes is not None:
        jb_id = jb_id_bytes.decode()
        _assign_to_runner(jb_id, rn_id)
        return _build_payload(jb_id)

    # Step 2: orphan reclaim — implemented in Task 5
    return None


def _assign_to_runner(jb_id: str, rn_id: str) -> None:
    """Mark job as leased to this runner (called after RPOP from pending)."""
    rds = RedisCache().client
    rds.hset(job_key(jb_id), mapping={
        "leased_by": rn_id,
        "leased_at": _now_iso(),
    })
    rds.hincrby(job_key(jb_id), "attempts", 1)
    rds.sadd(JOBS_LEASED, jb_id)


def _build_payload(jb_id: str) -> dict:
    """Build the payload returned to the runner via next-job endpoint.

    Note: code_minio_path is returned as-is here. The blueprint layer is
    responsible for converting it to a presigned URL before sending to runner.
    """
    rds = RedisCache().client
    h = rds.hgetall(job_key(jb_id))
    # Decode bytes to str
    h = {k.decode(): v.decode() for k, v in h.items()}
    return {
        "job_id": jb_id,
        "submission_id": h["submission_id"],
        "problem_id": int(h["problem_id"]),
        "language": int(h["language"]),
        "code_minio_path": h["code_minio_path"],
        "checker": h["checker"],
        "tasks": json.loads(h["tasks_meta_json"]),
    }
  • Step 4: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_job.py -v
Expected: All 4 tests PASS

  • Step 5: Commit
cd Back-End
git add dispatch/job.py tests/unittest/dispatch/test_job.py
git commit -m "feat(dispatch): add job enqueue and claim from pending queue"

Task 5: Add orphan reclaim path to claim_next_job + complete_job

Files:

  • Modify: tests/unittest/dispatch/test_job.py (add tests)

  • Modify: dispatch/job.py (add orphan reclaim + complete_job)

  • Step 1: Write failing tests for orphan reclaim and complete_job

Append to tests/unittest/dispatch/test_job.py:

# Append after existing tests in test_job.py

from dispatch.redis_keys import runner_alive_key
from dispatch import runner as runner_mod


def test_claim_next_job_reclaims_orphan_when_owner_dead():
    """Job leased to a runner whose alive key expired should be reclaimable."""
    # Setup: enqueue + claim by runner1, then expire runner1's alive key
    sub = _make_submission()
    jb_id = job_mod.enqueue_job(sub)
    rn_id, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1")
    job_mod.claim_next_job(rn_id=rn_id)  # rn1 takes the job
    # Simulate runner1 dying:
    RedisCache().client.delete(runner_alive_key(rn_id))

    # Now another runner polls
    rn2_id, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2")
    payload = job_mod.claim_next_job(rn_id=rn2_id)

    assert payload is not None
    assert payload["job_id"] == jb_id
    rds = RedisCache().client
    assert rds.hget(job_key(jb_id), "leased_by") == rn2_id.encode()
    assert int(rds.hget(job_key(jb_id), "attempts")) == 2  # incremented


def test_claim_next_job_skips_orphan_with_alive_owner():
    sub = _make_submission()
    jb_id = job_mod.enqueue_job(sub)
    rn1, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1")
    job_mod.claim_next_job(rn_id=rn1)  # rn1 takes the job; rn1 still alive

    rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2")
    payload = job_mod.claim_next_job(rn_id=rn2)

    assert payload is None  # no work for rn2 — rn1 is still alive
    # rn1 still owns the job
    assert RedisCache().client.hget(job_key(jb_id), "leased_by") == rn1.encode()


def test_claim_next_job_reclaim_at_max_attempts_returns_signal(monkeypatch):
    """When attempts == MAX_ATTEMPTS, reclaim should signal exhaustion."""
    sub = _make_submission()
    jb_id = job_mod.enqueue_job(sub)
    rn1, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1")
    job_mod.claim_next_job(rn_id=rn1)
    # Manually bump attempts to MAX-1 then kill rn1
    RedisCache().client.hset(job_key(jb_id), "attempts", 3)
    RedisCache().client.delete(runner_alive_key(rn1))

    rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2")
    payload = job_mod.claim_next_job(rn_id=rn2)

    # The exhausted job is removed from JOBS_LEASED but the caller (blueprint)
    # is responsible for marking Submission JE. claim_next_job itself returns None.
    assert payload is None
    assert not RedisCache().client.sismember(JOBS_LEASED, jb_id)


def test_complete_job_with_correct_owner_succeeds():
    sub = _make_submission()
    jb_id = job_mod.enqueue_job(sub)
    rn_id, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1")
    job_mod.claim_next_job(rn_id=rn_id)

    process_calls = []

    def fake_process(submission_id, tasks):
        process_calls.append((submission_id, tasks))

    result = job_mod.complete_job(
        rn_id=rn_id, jb_id=jb_id, tasks=[{"status": "AC"}],
        process_result=fake_process,
    )

    assert result == "ok"
    assert process_calls == [("sub_1", [{"status": "AC"}])]
    rds = RedisCache().client
    # Job cleaned up
    assert rds.hgetall(job_key(jb_id)) == {}
    assert not rds.sismember(JOBS_LEASED, jb_id)


def test_complete_job_with_wrong_owner_returns_wrong_owner():
    sub = _make_submission()
    jb_id = job_mod.enqueue_job(sub)
    rn1, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1")
    job_mod.claim_next_job(rn_id=rn1)

    rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2")
    result = job_mod.complete_job(
        rn_id=rn2, jb_id=jb_id, tasks=[],
        process_result=lambda *a, **k: None,
    )

    assert result == "wrong_owner"
    # Job NOT cleaned up — still belongs to rn1
    assert RedisCache().client.exists(job_key(jb_id))


def test_complete_job_with_unknown_id_returns_not_found():
    result = job_mod.complete_job(
        rn_id="rn_x", jb_id="jb_nonexistent", tasks=[],
        process_result=lambda *a, **k: None,
    )
    assert result == "not_found"
  • Step 2: Run tests to verify failure

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_job.py -v
Expected: New tests FAIL (existing 4 still pass)

  • Step 3: Implement orphan reclaim + complete_job in dispatch/job.py

Replace the existing claim_next_job and add complete_job in dispatch/job.py:

# Replace the existing claim_next_job in dispatch/job.py:
def claim_next_job(rn_id: str) -> Optional[dict]:
    """Try to claim next job for this runner. Returns None if no job available."""
    rds = RedisCache().client

    # Step 1: pending queue
    jb_id_bytes = rds.rpop(JOBS_PENDING)
    if jb_id_bytes is not None:
        jb_id = jb_id_bytes.decode()
        _assign_to_runner(jb_id, rn_id)
        return _build_payload(jb_id)

    # Step 2: orphan reclaim — scan leased jobs for dead owners
    leased_ids = rds.smembers(JOBS_LEASED)
    for orphan_id_bytes in leased_ids:
        orphan_id = orphan_id_bytes.decode()
        owner_bytes = rds.hget(job_key(orphan_id), "leased_by")
        if owner_bytes is None:
            continue
        owner = owner_bytes.decode()
        if is_alive(owner):
            continue
        # Owner is dead. Try to atomically reclaim.
        reclaim_result = reclaim_orphan_atomic(
            jb_id=orphan_id,
            expected_owner=owner,
            new_owner=rn_id,
            max_attempts=MAX_ATTEMPTS,
        )
        if reclaim_result == 1:
            return _build_payload(orphan_id)
        # 0 = lost the race, try next orphan
        # -1 = exhausted (already cleaned up by Lua); blueprint marks Submission JE
        #      separately via the exhaustion notification path (Task 8)
    return None


# Add to dispatch/job.py:
def complete_job(
    rn_id: str,
    jb_id: str,
    tasks: list,
    process_result,
) -> str:
    """Process a completed job. Returns one of: 'ok', 'wrong_owner', 'not_found'.

    Args:
        rn_id: The runner claiming completion.
        jb_id: The job id.
        tasks: Result tasks array (passed through to process_result).
        process_result: Callable(submission_id_str, tasks) -> None. Injected so
            this module doesn't depend directly on mongo.Submission.
    """
    rds = RedisCache().client
    h = rds.hgetall(job_key(jb_id))
    if not h:
        return "not_found"
    leased_by = h.get(b"leased_by")
    if leased_by is None or leased_by.decode() != rn_id:
        return "wrong_owner"

    submission_id = h[b"submission_id"].decode()
    process_result(submission_id, tasks)

    rds.delete(job_key(jb_id))
    rds.srem(JOBS_LEASED, jb_id)
    return "ok"
  • Step 4: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_job.py -v
Expected: All 9 tests PASS

  • Step 5: Commit
cd Back-End
git add dispatch/job.py tests/unittest/dispatch/test_job.py
git commit -m "feat(dispatch): add orphan reclaim and complete_job paths"

Phase 2: Runner HTTP API blueprint

Task 6: Add Pydantic schemas for runner API

Files:

  • Create: model/schemas/runner.py

  • Modify: model/schemas/__init__.py

  • Step 1: Create schema file

# model/schemas/runner.py
from typing import Any, List, Optional
from .base import BaseSchema


class RegisterRunnerBody(BaseSchema):
    registration_token: str
    name: Optional[str] = None


class CompleteJobBody(BaseSchema):
    tasks: List[Any]
  • Step 2: Export from schemas package

model/schemas/__init__.py uses explicit imports (no wildcard). Add this block at the end:

# In model/schemas/__init__.py — append:
from .runner import (
    RegisterRunnerBody,
    CompleteJobBody,
)
  • Step 3: Verify import works

Run: cd Back-End && poetry run python -c "from model.schemas import RegisterRunnerBody, CompleteJobBody; print('ok')"
Expected: prints ok

  • Step 4: Commit
cd Back-End
git add model/schemas/runner.py model/schemas/__init__.py
git commit -m "feat(api): add runner API request schemas"

Task 7: Add @require_runner_token decorator

Files:

  • Test: tests/unittest/test_runner_auth.py

  • Create: model/utils/runner_auth.py

  • Modify: model/utils/__init__.py (export the decorator)

  • Step 1: Write failing tests

# tests/unittest/test_runner_auth.py
import pytest
from flask import Flask
from mongo.utils import RedisCache
from model.utils.runner_auth import require_runner_token
from dispatch import runner as runner_mod


@pytest.fixture(autouse=True)
def clear_redis():
    RedisCache().client.flushdb()
    yield
    RedisCache().client.flushdb()


@pytest.fixture
def app_with_protected_route():
    app = Flask(__name__)

    @app.get("/protected/<runner_id>")
    @require_runner_token
    def protected(runner_id):
        return {"runner_id": runner_id}

    return app


def test_protected_route_with_valid_token_passes(app_with_protected_route):
    rn_id, rk_token = runner_mod.register(name="r", registration_ip="1.1.1.1")
    client = app_with_protected_route.test_client()
    rv = client.get(f"/protected/{rn_id}", headers={"Authorization": f"Bearer {rk_token}"})
    assert rv.status_code == 200
    assert rv.get_json() == {"runner_id": rn_id}


def test_protected_route_missing_header_returns_401(app_with_protected_route):
    rn_id, _ = runner_mod.register(name="r", registration_ip="1.1.1.1")
    client = app_with_protected_route.test_client()
    rv = client.get(f"/protected/{rn_id}")
    assert rv.status_code == 401


def test_protected_route_wrong_token_returns_401(app_with_protected_route):
    rn_id, _ = runner_mod.register(name="r", registration_ip="1.1.1.1")
    client = app_with_protected_route.test_client()
    rv = client.get(f"/protected/{rn_id}", headers={"Authorization": "Bearer rk_wrong"})
    assert rv.status_code == 401


def test_protected_route_unknown_runner_returns_401(app_with_protected_route):
    client = app_with_protected_route.test_client()
    rv = client.get("/protected/rn_nonexistent",
                    headers={"Authorization": "Bearer rk_anything"})
    assert rv.status_code == 401


def test_protected_route_non_bearer_scheme_returns_401(app_with_protected_route):
    rn_id, rk_token = runner_mod.register(name="r", registration_ip="1.1.1.1")
    client = app_with_protected_route.test_client()
    rv = client.get(f"/protected/{rn_id}", headers={"Authorization": rk_token})
    assert rv.status_code == 401
  • Step 2: Run tests to verify failure

Run: cd Back-End && poetry run pytest tests/unittest/test_runner_auth.py -v
Expected: FAIL with import error

  • Step 3: Implement the decorator
# model/utils/runner_auth.py
"""@require_runner_token decorator for runner API endpoints."""
from functools import wraps
from flask import request

from dispatch import runner as runner_mod
from .response import HTTPError


_BEARER_PREFIX = "Bearer "


def require_runner_token(f):
    """Verify Authorization: Bearer rk_xxx against the runner_id in URL.

    Routes using this decorator must accept a `runner_id` URL path parameter.
    """
    @wraps(f)
    def wrapper(runner_id, *args, **kwargs):
        auth = request.headers.get("Authorization", "")
        if not auth.startswith(_BEARER_PREFIX):
            return HTTPError("missing or malformed Authorization header", 401)
        token = auth[len(_BEARER_PREFIX):]
        if not runner_mod.verify_token(runner_id, token):
            return HTTPError("invalid runner token", 401)
        return f(runner_id, *args, **kwargs)
    return wrapper
  • Step 4: Add to model/utils/init.py exports (so other modules can import)

Add to model/utils/__init__.py:

from . import runner_auth
from .runner_auth import require_runner_token

# Update __all__ if present, e.g.:
# __all__ = [..., 'require_runner_token']
  • Step 5: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/unittest/test_runner_auth.py -v
Expected: All 5 tests PASS

  • Step 6: Commit
cd Back-End
git add model/utils/runner_auth.py model/utils/__init__.py tests/unittest/test_runner_auth.py
git commit -m "feat(api): add require_runner_token decorator"

Task 8: Implement model/runner.py blueprint with all 4 endpoints

Files:

  • Test: tests/unittest/test_runner_api.py

  • Create: model/runner.py

  • Modify: model/__init__.py (export runner_api)

  • Step 1: Write failing tests for all 4 endpoints

# tests/unittest/test_runner_api.py
"""End-to-end HTTP tests for the runner API blueprint.

Uses the standard `client` fixture from conftest.py (Flask test client) and
exercises the blueprint via real HTTP-style calls.
"""
import json
import pytest
from flask import Flask
from mongo.utils import RedisCache
from dispatch import runner as runner_mod
from dispatch.redis_keys import runner_alive_key
from dispatch.config import RUNNER_REGISTRATION_TOKEN


@pytest.fixture(autouse=True)
def clear_redis():
    RedisCache().client.flushdb()
    yield
    RedisCache().client.flushdb()


# --- POST /api/runners/register ---

def test_register_with_valid_token_returns_201_with_credentials(client):
    rv = client.post("/api/runners/register", json={
        "registration_token": RUNNER_REGISTRATION_TOKEN,
        "name": "test-runner",
    })
    assert rv.status_code == 201
    body = rv.get_json()["data"]
    assert body["runner_id"].startswith("rn_")
    assert body["token"].startswith("rk_")
    assert "config" in body
    assert body["config"]["heartbeat_interval_sec"] == 15


def test_register_with_invalid_token_returns_401(client):
    rv = client.post("/api/runners/register", json={
        "registration_token": "wrong-token",
        "name": "test-runner",
    })
    assert rv.status_code == 401


# --- POST /api/runners/<id>/heartbeat ---

def test_heartbeat_with_valid_token_returns_204_and_renews(client):
    rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1")
    rds = RedisCache().client
    rds.expire(runner_alive_key(rn_id), 5)
    rv = client.post(f"/api/runners/{rn_id}/heartbeat",
                     headers={"Authorization": f"Bearer {rk}"})
    assert rv.status_code == 204
    assert 25 < rds.ttl(runner_alive_key(rn_id)) <= 30


def test_heartbeat_with_invalid_token_returns_401(client):
    rn_id, _ = runner_mod.register(name="r", registration_ip="1.1.1.1")
    rv = client.post(f"/api/runners/{rn_id}/heartbeat",
                     headers={"Authorization": "Bearer wrong"})
    assert rv.status_code == 401


# --- GET /api/runners/<id>/next-job ---

def test_next_job_returns_204_when_no_jobs(client):
    rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1")
    rv = client.get(f"/api/runners/{rn_id}/next-job",
                    headers={"Authorization": f"Bearer {rk}"})
    assert rv.status_code == 204


def test_next_job_returns_200_with_payload_when_pending(
    client, app, problem_ids, save_source, get_source,
):
    """Submit a real submission, then have a runner pull it via next-job."""
    from mongo import Submission, User, engine
    rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1")

    with app.app_context():
        # Create user, problem, submission with code
        save_source("base", b"int main(){}", lang=0)
        # Use existing test fixtures (from tests/utils/submission.py)
        from tests.utils.submission import create_submission
        sub = create_submission(user="admin", problem=None, lang=0)

    rv = client.get(f"/api/runners/{rn_id}/next-job",
                    headers={"Authorization": f"Bearer {rk}"})
    assert rv.status_code == 200
    body = rv.get_json()["data"]
    assert body["job_id"].startswith("jb_")
    assert body["submission_id"] == str(sub.id)
    # code_url is presigned (contains amz signing params or testcontainers minio host)
    assert "code_url" in body and len(body["code_url"]) > 0


# --- PUT /api/runners/<rn>/jobs/<jb>/complete ---

def test_complete_with_valid_owner_returns_204(
    client, app, problem_ids, save_source, get_source,
):
    from tests.utils.submission import create_submission
    rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1")
    with app.app_context():
        save_source("base", b"int main(){}", lang=0)
        sub = create_submission(user="admin", problem=None, lang=0)

    # Pull job
    rv = client.get(f"/api/runners/{rn_id}/next-job",
                    headers={"Authorization": f"Bearer {rk}"})
    jb_id = rv.get_json()["data"]["job_id"]

    # Complete
    rv = client.put(
        f"/api/runners/{rn_id}/jobs/{jb_id}/complete",
        headers={"Authorization": f"Bearer {rk}"},
        json={"tasks": []},  # empty result; process_result handles
    )
    assert rv.status_code == 204


def test_complete_with_wrong_owner_returns_409(
    client, app, save_source,
):
    from tests.utils.submission import create_submission
    rn1, rk1 = runner_mod.register(name="r1", registration_ip="1.1.1.1")
    rn2, rk2 = runner_mod.register(name="r2", registration_ip="1.1.1.2")
    with app.app_context():
        save_source("base", b"int main(){}", lang=0)
        create_submission(user="admin", problem=None, lang=0)

    rv = client.get(f"/api/runners/{rn1}/next-job",
                    headers={"Authorization": f"Bearer {rk1}"})
    jb_id = rv.get_json()["data"]["job_id"]

    # rn2 tries to complete rn1's job
    rv = client.put(
        f"/api/runners/{rn2}/jobs/{jb_id}/complete",
        headers={"Authorization": f"Bearer {rk2}"},
        json={"tasks": []},
    )
    assert rv.status_code == 409


def test_complete_with_unknown_job_returns_404(client):
    rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1")
    rv = client.put(
        f"/api/runners/{rn_id}/jobs/jb_nonexistent/complete",
        headers={"Authorization": f"Bearer {rk}"},
        json={"tasks": []},
    )
    assert rv.status_code == 404
  • Step 2: Run tests to verify failure

Run: cd Back-End && poetry run pytest tests/unittest/test_runner_api.py -v
Expected: FAIL — runner_api not registered, blueprint doesn't exist

  • Step 3: Implement model/runner.py
# model/runner.py
"""Runner API blueprint: registration, heartbeat, job pickup, completion."""
from datetime import timedelta

from flask import Blueprint, request

from mongo import Submission
from mongo.utils import MinioClient
from .schemas import RegisterRunnerBody, CompleteJobBody
from .utils import HTTPError, HTTPResponse, parse_body, require_runner_token

from dispatch import runner as runner_mod
from dispatch import job as job_mod
from dispatch.config import (
    RUNNER_REGISTRATION_TOKEN,
    HEARTBEAT_INTERVAL_SEC,
    POLL_INTERVAL_SEC,
    MAX_CONCURRENT_JOBS_PER_RUNNER,
    CODE_PRESIGNED_URL_TTL_SEC,
)

import secrets


__all__ = ["runner_api"]
runner_api = Blueprint("runner_api", __name__)


@runner_api.post("/register")
@parse_body(RegisterRunnerBody)
def register(body: RegisterRunnerBody):
    if not secrets.compare_digest(body.registration_token, RUNNER_REGISTRATION_TOKEN):
        return HTTPError("invalid registration token", 401)
    rn_id, rk_token = runner_mod.register(
        name=body.name or "unnamed",
        registration_ip=request.remote_addr or "unknown",
    )
    return HTTPResponse(
        data={
            "runner_id": rn_id,
            "token": rk_token,
            "config": {
                "heartbeat_interval_sec": HEARTBEAT_INTERVAL_SEC,
                "poll_interval_sec": POLL_INTERVAL_SEC,
                "max_concurrent_jobs": MAX_CONCURRENT_JOBS_PER_RUNNER,
            },
        },
        status_code=201,
    )


@runner_api.post("/<runner_id>/heartbeat")
@require_runner_token
def heartbeat(runner_id):
    runner_mod.renew_alive(runner_id)
    return "", 204


@runner_api.get("/<runner_id>/next-job")
@require_runner_token
def next_job(runner_id):
    payload = job_mod.claim_next_job(runner_id)
    if payload is None:
        return "", 204
    # Convert code_minio_path to presigned URL just before sending
    minio_path = payload.pop("code_minio_path")
    minio = MinioClient()
    payload["code_url"] = minio.client.presigned_get_object(
        minio.bucket,
        minio_path,
        expires=timedelta(seconds=CODE_PRESIGNED_URL_TTL_SEC),
    )
    return HTTPResponse(data=payload)


@runner_api.put("/<runner_id>/jobs/<job_id>/complete")
@require_runner_token
@parse_body(CompleteJobBody)
def complete(runner_id, job_id, body: CompleteJobBody):
    def process(submission_id_str: str, tasks: list) -> None:
        Submission(submission_id_str).process_result(tasks)

    result = job_mod.complete_job(
        rn_id=runner_id,
        jb_id=job_id,
        tasks=body.tasks,
        process_result=process,
    )
    if result == "wrong_owner":
        return HTTPError("job has been reclaimed by another runner", 409)
    if result == "not_found":
        return HTTPError("job not found", 404)
    return "", 204
  • Step 4: Export from model/init.py

model/__init__.py uses both from . import X and from .X import *, with an aggregated __all__. Add three changes:

# In model/__init__.py:
# 1. Top section (with other `from . import X`):
from . import runner

# 2. Wildcard import section:
from .runner import *

# 3. __all__ aggregation:
__all__ = [
    *auth.__all__,
    # ... existing entries ...
    *runner.__all__,        # NEW
]

The new model/runner.py already defines __all__ = ["runner_api"], so this propagates correctly.

  • Step 5: Register blueprint in app.py

Modify app.py's api2prefix list to add the runner_api at prefix /runners:

api2prefix = [
    # ... existing entries (auth_api, profile_api, etc) ...
    (runner_api, '/runners'),          # NEW
]

URL convention note: Other blueprints register without /api/ because Caddy rewrites /api/* → backend's /*. So in production, runners hit https://noj.tw/api/runners/register, but inside Flask the route is just /runners/register. Tests use the Flask test client (no Caddy), so test URLs use /runners/... (already correct in Step 1).

  • Step 6: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/unittest/test_runner_api.py -v
Expected: All 9 tests PASS (after URL adjustment)

  • Step 7: Commit
cd Back-End
git add model/runner.py model/__init__.py app.py tests/unittest/test_runner_api.py
git commit -m "feat(api): add runner API blueprint with 4 endpoints"

Task 9: Wire up exhaustion handling (max attempts → mark Submission JE)

Files:

  • Modify: dispatch/job.py (claim_next_job: handle reclaim_result == -1 by marking JE)
  • Modify: tests/unittest/dispatch/test_job.py (verify JE marking happens)

The Lua script returns -1 when attempts exhausted. The Python wrapper currently swallows this — we need to mark the affected Submission as JE.

  • Step 1: Write failing test

Append to tests/unittest/dispatch/test_job.py:

def test_claim_next_job_marks_submission_je_when_exhausted(monkeypatch, app):
    """When orphan reclaim hits max_attempts, Submission must be marked JE (status=6)."""
    from mongo import Submission, engine
    from tests.utils.submission import create_submission

    with app.app_context():
        # Create real Submission (status starts at -1)
        sub = create_submission(user="admin", problem=None, lang=0)
        # Submission has been enqueued by create_submission via submit() — pull jb_id
        # Look up the job we just enqueued
        rds = RedisCache().client
        # The jb_id is the most recent push to JOBS_PENDING
        # Setup: claim it, set attempts to MAX, then kill the runner
        rn1, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1")
        payload = job_mod.claim_next_job(rn_id=rn1)
        jb_id = payload["job_id"]
        rds.hset(job_key(jb_id), "attempts", 3)
        rds.delete(runner_alive_key(rn1))

        # Now another runner polls — should observe exhaustion
        rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2")
        result = job_mod.claim_next_job(rn_id=rn2)

        assert result is None
        # Submission status must be 6 (JE)
        sub.reload()
        assert sub.status == 6
  • Step 2: Run tests to verify failure

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_job.py::test_claim_next_job_marks_submission_je_when_exhausted -v
Expected: FAIL — Submission.status not changed

  • Step 3: Implement exhaustion handling in dispatch/job.py

Modify the orphan reclaim loop in claim_next_job to call into a callback when result is -1:

# In dispatch/job.py — add helper at module level:
def _on_attempts_exhausted(jb_id: str) -> None:
    """When a job exhausts max_attempts, mark its Submission as Judge Error."""
    from mongo import Submission   # local import to avoid circular dep at module load

    rds = RedisCache().client
    submission_id_bytes = rds.hget(job_key(jb_id), "submission_id")
    if submission_id_bytes is None:
        return
    submission_id = submission_id_bytes.decode()
    try:
        sub = Submission(submission_id)
        if sub:
            sub.update(status=6)  # JE
    finally:
        # Also clear job hash since Lua only removed it from JOBS_LEASED
        rds.delete(job_key(jb_id))


# Modify the orphan reclaim block in claim_next_job:
        if reclaim_result == 1:
            return _build_payload(orphan_id)
        if reclaim_result == -1:
            _on_attempts_exhausted(orphan_id)
            # continue to look at other orphans
  • Step 4: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/unittest/dispatch/test_job.py -v
Expected: All 10 tests PASS

  • Step 5: Commit
cd Back-End
git add dispatch/job.py tests/unittest/dispatch/test_job.py
git commit -m "feat(dispatch): mark Submission as JE when reclaim attempts exhausted"

Phase 3: Submission flow integration

Task 10: Refactor Submission.submit() to use enqueue_job

Files:

  • Modify: mongo/submission.py (submit method)

  • Modify: tests/test_submission.py (verify enqueue happens, not HTTP POST)

  • Step 1: Write failing test

Add to tests/test_submission.py (new test, doesn't replace existing yet):

def test_submit_enqueues_job_to_redis_pending(
    app, save_source, problem_ids,
):
    """After submit(), a job hash should appear in Redis pending queue."""
    from mongo import Submission
    from mongo.utils import RedisCache
    from dispatch.redis_keys import JOBS_PENDING, job_key
    from tests.utils.submission import create_submission

    rds = RedisCache().client
    rds.flushdb()

    with app.app_context():
        save_source("base", b"int main(){}", lang=0)
        sub = create_submission(user="admin", problem=None, lang=0)

        # One job in pending queue
        pending = rds.lrange(JOBS_PENDING, 0, -1)
        assert len(pending) == 1
        jb_id = pending[0].decode()
        # Job hash references our submission
        assert rds.hget(job_key(jb_id), "submission_id") == str(sub.id).encode()
  • Step 2: Run test to verify failure

Run: cd Back-End && poetry run pytest tests/test_submission.py::test_submit_enqueues_job_to_redis_pending -v
Expected: FAIL — pending list is empty (current submit() POSTs to sandbox; in TESTING mode it returns True without doing anything)

  • Step 3: Modify mongo/submission.py::submit()

Find the existing submit method (around line 384) and replace its tail:

# In mongo/submission.py, modify submit():
def submit(self, code_file) -> bool:
    if not self:
        raise engine.DoesNotExist(f'{self}')
    self.update(
        status=-1,
        last_send=datetime.now(),
        code_minio_path=self._put_code(code_file),
    )
    self.reload()
    self.logger.debug(f'{self} code updated.')
    # delete old handwritten submission (existing logic — keep)
    if self.handwritten:
        # ... keep existing handwritten cleanup code ...
        ...
    if self.handwritten:
        return True
    # NEW: enqueue to Redis instead of POST to sandbox.
    # In TESTING mode, also enqueue (so tests verify the flow end-to-end).
    from dispatch.job import enqueue_job
    enqueue_job(self)
    return True

The key changes:

  • Remove the if current_app.config['TESTING'] or self.handwritten: shortcut for non-handwritten.

  • Replace return self.send() with enqueue_job(self); return True.

  • Step 4: Run test to verify pass

Run: cd Back-End && poetry run pytest tests/test_submission.py::test_submit_enqueues_job_to_redis_pending -v
Expected: PASS

  • Step 5: Run the broader test suite to find regressions

Run: cd Back-End && poetry run pytest tests/test_submission.py -v
Expected: Some tests may fail because they assumed submit() was a no-op in TESTING mode and now it enqueues. Review failures.

For each failing test that calls submit() and then asserts something about state:

  • If the test wants to simulate "judging completed", it should call add_fake_output() after submit (this fixture already exists in tests/utils/submission.py).
  • If the test expected the sandbox POST to happen, the test is now obsolete — delete or refactor.

Document any test changes needed in commit message.

  • Step 6: Commit
cd Back-End
git add mongo/submission.py tests/test_submission.py
git commit -m "refactor(submission): submit() enqueues to Redis instead of POSTing to sandbox"

Task 11: Refactor Submission.rejudge() to use enqueue_job

Files:

  • Modify: mongo/submission.py (rejudge method)

  • Modify: tests/test_submission.py (test for rejudge enqueuing)

  • Step 1: Write failing test

# Add to tests/test_submission.py
def test_rejudge_enqueues_new_job_to_pending(app, save_source):
    from mongo import Submission
    from mongo.utils import RedisCache
    from dispatch.redis_keys import JOBS_PENDING
    from tests.utils.submission import create_submission, add_fake_output

    rds = RedisCache().client
    rds.flushdb()

    with app.app_context():
        save_source("base", b"int main(){}", lang=0)
        sub = create_submission(user="admin", problem=None, lang=0, status=0)  # AC

        # Clear pending (the submit during create_submission would have enqueued one)
        rds.delete(JOBS_PENDING)

        # Rejudge
        sub.rejudge()

        # New job in pending
        assert rds.llen(JOBS_PENDING) == 1
        # Submission status reset to -1
        sub.reload()
        assert sub.status == -1
  • Step 2: Run test to verify failure

Run: cd Back-End && poetry run pytest tests/test_submission.py::test_rejudge_enqueues_new_job_to_pending -v
Expected: FAIL

  • Step 3: Modify mongo/submission.py::rejudge()

Find rejudge() (around line 346) and replace its body with this (note: NO TESTING shortcut — we want enqueue to happen even in tests so we can verify the flow):

def rejudge(self) -> bool:
    self.delete_output()
    self.update(
        status=-1,
        last_send=datetime.now(),
        tasks=[],
    )
    from dispatch.job import enqueue_job
    enqueue_job(self)
    return True
  • Step 4: Run tests to verify pass

Run: cd Back-End && poetry run pytest tests/test_submission.py::test_rejudge_enqueues_new_job_to_pending -v
Expected: PASS

  • Step 5: Run rejudge-related existing tests

Run: cd Back-End && poetry run pytest tests/test_submission.py -k rejudge -v
Expected: All PASS, or document any required test updates

  • Step 6: Commit
cd Back-End
git add mongo/submission.py tests/test_submission.py
git commit -m "refactor(submission): rejudge() enqueues to Redis"

Task 12: Add Redis cleanup fixture to conftest.py

Files:

  • Modify: tests/conftest.py

Without this, fakeredis state leaks across tests, causing flakes.

  • Step 1: Add autouse fixture to clear Redis

Modify tests/conftest.py — add after existing fixtures:

# In tests/conftest.py, near other autouse fixtures
@pytest.fixture(autouse=True)
def clear_redis_between_tests():
    """Clear Redis (fakeredis) state between every test to prevent leakage."""
    from mongo.utils import RedisCache
    RedisCache().client.flushdb()
    yield
    RedisCache().client.flushdb()
  • Step 2: Run full test suite to ensure no regressions

Run: cd Back-End && poetry run pytest -x
Expected: Full suite passes (or document any pre-existing issues separately)

  • Step 3: Commit
cd Back-End
git add tests/conftest.py
git commit -m "test: add autouse redis cleanup fixture to prevent state leakage"

Phase 4: Cleanup — remove old push code

Task 13: Remove old push methods from mongo/submission.py

Files:

  • Modify: mongo/submission.py

Remove these methods (no longer reachable):

  • target_sandbox(self) (~lines 292-307)
  • send(self) (~lines 423-459)
  • sandbox_resp_handler(self, resp) (~lines 263-290)
  • assign_token classmethod
  • verify_token classmethod

Also remove the import of requests as rq if unused after removals.

  • Step 1: Remove the methods

Open mongo/submission.py and delete the methods listed above. Keep process_result() — it's still used by the new complete endpoint.

  • Step 2: Run full test suite

Run: cd Back-End && poetry run pytest -x
Expected: All pass. If anything imports the removed methods, update those imports.

  • Step 3: Search for remaining references

Run: cd Back-End && grep -rn "target_sandbox\|sandbox_resp_handler\|assign_token\|verify_token" --include="*.py" .
Expected: No results (or only in deleted/test code about to be cleaned up)

  • Step 4: Commit
cd Back-End
git add mongo/submission.py
git commit -m "refactor(submission): remove push-based sandbox dispatch methods"

Task 14: Delete mongo/sandbox.py

Files:

  • Delete: mongo/sandbox.py

  • Modify: mongo/__init__.py (remove import if any)

  • Step 1: Search for usages of find_by_token

Run: cd Back-End && grep -rn "find_by_token\|from mongo.sandbox\|from mongo import sandbox" --include="*.py" .

  • Step 2: If any usages exist, remove them first

Update each file to no longer import find_by_token. If a route used it for the old callback, that route is being removed in Task 15 anyway.

  • Step 3: Delete the file
cd Back-End
git rm mongo/sandbox.py
  • Step 4: Remove from mongo/init.py if exported

Check mongo/__init__.py for any line referring to sandbox and remove it.

  • Step 5: Run tests

Run: cd Back-End && poetry run pytest -x
Expected: All pass

  • Step 6: Commit
cd Back-End
git add mongo/sandbox.py mongo/__init__.py
git commit -m "refactor: delete mongo/sandbox.py — find_by_token replaced by per-runner token verification"

Task 15: Remove old PUT /<submission>/complete route from model/submission.py

Files:

  • Modify: model/submission.py

  • Modify: model/schemas/submission.py (remove OnSubmissionCompleteBody)

  • Modify: model/schemas/__init__.py (remove export)

  • Step 1: Locate the route

Find @submission_api.put('/<submission>/complete') in model/submission.py (around line 345-362). Read the function body to confirm it's the old callback.

  • Step 2: Delete the route function

Remove the entire route handler function from model/submission.py.

  • Step 3: Remove OnSubmissionCompleteBody schema

Delete OnSubmissionCompleteBody class from model/schemas/submission.py.

Also remove its name from model/schemas/__init__.py — find the explicit import block:

from .submission import (
    CreateSubmissionBody,
    GetSubmissionListQuery,
    OnSubmissionCompleteBody,    # ← remove this line
    GradeSubmissionBody,
    UpdateConfigBody,
)
  • Step 4: Remove import from model/submission.py

Find from .schemas import (... OnSubmissionCompleteBody ...) and remove that name from the import.

  • Step 5: Run tests

Run: cd Back-End && poetry run pytest -x
Expected: Tests that hit the old endpoint now fail with 404. Update or delete those tests.

  • Step 6: Commit
cd Back-End
git add model/submission.py model/schemas/submission.py model/schemas/__init__.py tests/
git commit -m "refactor(api): remove old PUT /<submission>/complete callback endpoint"

Task 16: Remove PUT /api/submission/config (sandbox_instances) endpoint

Files:

  • Modify: model/submission.py (remove update_config function)

  • Modify: model/schemas/submission.py (remove UpdateConfigBody)

  • Modify: model/schemas/__init__.py (remove export)

  • Step 1: Locate the route

Find @submission_api.put('/config') (or similar) decorating update_config in model/submission.py (~lines 489-537).

  • Step 2: Decide: delete entire endpoint, or strip just sandbox_instances?

The endpoint also handles rate_limit. Two options:
a. Delete entire endpoint (simpler — rate_limit could be a config file / env var instead)
b. Strip out only sandbox_instances handling, keep rate_limit

Choose (b): keep the endpoint for rate_limit to preserve admin functionality, just remove the sandbox_instances field handling.

  • Step 3: Modify update_config to drop sandbox_instances

In model/submission.py::update_config:

@submission_api.put('/config')
@login_required
@admin_required
@parse_body(UpdateConfigBody)
def update_config(user, body: UpdateConfigBody):
    config = Submission.config()
    try:
        config.update(rate_limit=body.rate_limit)
    except ValidationError as e:
        return HTTPError(str(e), 400)
    return HTTPResponse('config updated')
  • Step 4: Update UpdateConfigBody schema
# model/schemas/submission.py
class UpdateConfigBody(BaseSchema):
    rate_limit: int
    # sandbox_instances removed
  • Step 5: Run tests

Run: cd Back-End && poetry run pytest -x -k config
Expected: Tests that exercise sandbox_instances part fail. Update or remove them.

  • Step 6: Commit
cd Back-End
git add model/submission.py model/schemas/submission.py tests/
git commit -m "refactor(api): drop sandbox_instances handling from PUT /submission/config"

Task 17: Clean references to SubmissionConfig.sandbox_instances in engine.py

Files:

  • Modify: mongo/engine.py

The field itself stays (to avoid mongo schema conflicts in production), but any default values referencing the (now-removed) Sandbox EmbeddedDocument should be cleaned.

  • Step 1: Find the field definition

Look in mongo/engine.py for sandbox_instances = EmbeddedDocumentListField(...) (~line 432).

  • Step 2: Change to ListField with no schema

Replace:

sandbox_instances = EmbeddedDocumentListField(
    Sandbox,
    default=[Sandbox(name='Sandbox-0', url='http://sandbox:1450', token='KoNoSandboxDa')],
    db_field='sandboxInstances',
)

With:

# DEPRECATED: this field is no longer used since the pull-based dispatch
# refactor (2026-04-28). Kept here as ListField (no schema) only to avoid
# breaking existing production documents. Remove during PG migration.
sandbox_instances = ListField(default=[], db_field='sandboxInstances')
  • Step 3: Delete the Sandbox EmbeddedDocument class

Find class Sandbox(EmbeddedDocument) (~line 422) and delete it entirely.

  • Step 4: Run tests

Run: cd Back-End && poetry run pytest -x
Expected: All pass. If anything imported engine.Sandbox, those imports must be updated/removed.

  • Step 5: Commit
cd Back-End
git add mongo/engine.py
git commit -m "refactor(engine): deprecate Sandbox EmbeddedDocument; keep field as untyped ListField"

Task 18: Verification pass — full test suite + lint

No code changes — verification only.

  • Step 1: Run full test suite

Run: cd Back-End && poetry run pytest --cov=./ --cov-config=.coveragerc -v
Expected: All tests pass; coverage report shows new dispatch module covered

  • Step 2: Run yapf in check mode

Run: cd Back-End && poetry run yapf --recursive --parallel --diff .
Expected: No diff (CI compliance)

  • Step 3: If any diff, format

Run: cd Back-End && poetry run yapf -ir .
Then verify diff with git, commit:

cd Back-End
git add .
git commit -m "style: yapf formatting"

Phase 5: End-to-end integration tests

Task 19: Mock-runner happy-path integration test

Files:

  • Create: tests/integration/__init__.py
  • Create: tests/integration/test_runner_flow.py

This test exercises the full Backend dispatch path via HTTP, simulating a runner with a Python test client.

  • Step 1: Create test package init
# tests/integration/__init__.py
  • Step 2: Write the integration test
# tests/integration/test_runner_flow.py
"""End-to-end mock-runner integration tests.

These tests use the Flask test client to simulate a complete runner lifecycle:
register → poll → claim → complete. Verifies the entire Backend pipeline works
without needing a real Sandbox container.
"""
import pytest
from mongo import Submission
from mongo.utils import RedisCache
from dispatch.config import RUNNER_REGISTRATION_TOKEN


@pytest.fixture
def mock_runner(client):
    """Register a runner and return helpers for runner-side actions."""
    rv = client.post("/runners/register", json={
        "registration_token": RUNNER_REGISTRATION_TOKEN,
        "name": "mock-runner-1",
    })
    assert rv.status_code == 201
    body = rv.get_json()["data"]
    rn_id = body["runner_id"]
    rk = body["token"]

    headers = {"Authorization": f"Bearer {rk}"}

    class Runner:
        runner_id = rn_id
        token = rk

        def heartbeat(self):
            return client.post(f"/runners/{rn_id}/heartbeat", headers=headers)

        def next_job(self):
            return client.get(f"/runners/{rn_id}/next-job", headers=headers)

        def complete(self, jb_id, tasks):
            return client.put(
                f"/runners/{rn_id}/jobs/{jb_id}/complete",
                headers=headers,
                json={"tasks": tasks},
            )

    return Runner()


def test_full_flow_submit_pull_complete(app, mock_runner, save_source):
    """Submit code → runner pulls → runner submits result → submission shows AC."""
    from tests.utils.submission import create_submission

    with app.app_context():
        save_source("base", b"int main(){}", lang=0)
        sub = create_submission(user="admin", problem=None, lang=0)
        assert sub.status == -1  # Pending after submit

        # Runner pulls
        rv = mock_runner.next_job()
        assert rv.status_code == 200
        payload = rv.get_json()["data"]
        assert payload["submission_id"] == str(sub.id)
        jb_id = payload["job_id"]

        # Runner sends back AC result
        ac_tasks = [
            {
                "exitCode": 0, "status": "AC",
                "tasks": [{"exitCode": 0, "status": "AC", "execTime": 10, "memoryUsage": 1024, "stdout": "", "stderr": ""}],
            },
        ]
        rv = mock_runner.complete(jb_id, ac_tasks)
        assert rv.status_code == 204

        sub.reload()
        assert sub.status == 0  # AC


def test_no_pending_returns_204(mock_runner):
    rv = mock_runner.next_job()
    assert rv.status_code == 204


def test_heartbeat_keeps_runner_alive(mock_runner):
    for _ in range(3):
        rv = mock_runner.heartbeat()
        assert rv.status_code == 204
  • Step 3: Run test

Run: cd Back-End && poetry run pytest tests/integration/test_runner_flow.py -v
Expected: All 3 tests PASS

  • Step 4: Commit
cd Back-End
git add tests/integration/
git commit -m "test(integration): add end-to-end mock-runner happy-path tests"

Task 20: Mock-runner orphan reclaim integration test

Files:

  • Modify: tests/integration/test_runner_flow.py

  • Step 1: Add test for orphan reclaim end-to-end

Append to tests/integration/test_runner_flow.py:

def test_orphan_reclaim_when_runner_dies(app, client, save_source):
    """Runner1 takes a job and dies; runner2 should reclaim and complete it."""
    from tests.utils.submission import create_submission
    from dispatch.redis_keys import runner_alive_key

    # Setup: 2 runners + 1 submission
    rv = client.post("/runners/register", json={
        "registration_token": RUNNER_REGISTRATION_TOKEN, "name": "rn1"})
    rn1 = rv.get_json()["data"]
    rv = client.post("/runners/register", json={
        "registration_token": RUNNER_REGISTRATION_TOKEN, "name": "rn2"})
    rn2 = rv.get_json()["data"]

    h1 = {"Authorization": f"Bearer {rn1['token']}"}
    h2 = {"Authorization": f"Bearer {rn2['token']}"}

    with app.app_context():
        save_source("base", b"int main(){}", lang=0)
        sub = create_submission(user="admin", problem=None, lang=0)

    # rn1 takes the job
    rv = client.get(f"/runners/{rn1['runner_id']}/next-job", headers=h1)
    assert rv.status_code == 200
    jb_id = rv.get_json()["data"]["job_id"]

    # Simulate rn1 death
    RedisCache().client.delete(runner_alive_key(rn1["runner_id"]))

    # rn2 polls — should reclaim
    rv = client.get(f"/runners/{rn2['runner_id']}/next-job", headers=h2)
    assert rv.status_code == 200
    reclaimed = rv.get_json()["data"]
    assert reclaimed["job_id"] == jb_id  # same job!

    # rn2 completes it
    rv = client.put(
        f"/runners/{rn2['runner_id']}/jobs/{jb_id}/complete",
        headers=h2,
        json={"tasks": [{"exitCode": 0, "status": "AC",
                         "tasks": [{"exitCode": 0, "status": "AC",
                                    "execTime": 0, "memoryUsage": 0,
                                    "stdout": "", "stderr": ""}]}]},
    )
    assert rv.status_code == 204

    # If rn1 zombie comes back and tries to complete, rejected with 409
    rv = client.put(
        f"/runners/{rn1['runner_id']}/jobs/{jb_id}/complete",
        headers=h1,
        json={"tasks": []},
    )
    # 404 because the job was deleted after rn2 completed it
    assert rv.status_code == 404
  • Step 2: Run test

Run: cd Back-End && poetry run pytest tests/integration/test_runner_flow.py::test_orphan_reclaim_when_runner_dies -v
Expected: PASS

  • Step 3: Commit
cd Back-End
git add tests/integration/test_runner_flow.py
git commit -m "test(integration): add orphan reclaim end-to-end test"

Task 21: Mock-runner max-attempts → JE integration test

Files:

  • Modify: tests/integration/test_runner_flow.py

  • Step 1: Add test for max-attempts exhaustion

Append:

def test_max_attempts_marks_submission_je(app, client, save_source):
    """When a job is reclaimed too many times, Submission must be marked JE."""
    from tests.utils.submission import create_submission
    from dispatch.redis_keys import job_key, runner_alive_key
    from dispatch.config import MAX_ATTEMPTS

    # Setup
    with app.app_context():
        save_source("base", b"int main(){}", lang=0)
        sub = create_submission(user="admin", problem=None, lang=0)
        assert sub.status == -1

    # Simulate MAX_ATTEMPTS rounds of "runner takes job, dies, gets reclaimed"
    runners = []
    for i in range(MAX_ATTEMPTS):
        rv = client.post("/runners/register", json={
            "registration_token": RUNNER_REGISTRATION_TOKEN,
            "name": f"rn{i}",
        })
        runners.append(rv.get_json()["data"])

    # Round 1: runner 0 claims from pending
    rn = runners[0]
    h = {"Authorization": f"Bearer {rn['token']}"}
    rv = client.get(f"/runners/{rn['runner_id']}/next-job", headers=h)
    assert rv.status_code == 200
    jb_id = rv.get_json()["data"]["job_id"]
    # Kill runner 0
    RedisCache().client.delete(runner_alive_key(rn["runner_id"]))

    # Subsequent rounds: each new runner reclaims, then dies
    for i in range(1, MAX_ATTEMPTS - 1):
        rn = runners[i]
        h = {"Authorization": f"Bearer {rn['token']}"}
        rv = client.get(f"/runners/{rn['runner_id']}/next-job", headers=h)
        assert rv.status_code == 200, f"reclaim {i} failed"
        RedisCache().client.delete(runner_alive_key(rn["runner_id"]))

    # The MAX_ATTEMPTS-th poll observes exhaustion and returns 204
    last_rn = runners[MAX_ATTEMPTS - 1]
    h = {"Authorization": f"Bearer {last_rn['token']}"}
    rv = client.get(f"/runners/{last_rn['runner_id']}/next-job", headers=h)
    assert rv.status_code == 204  # exhausted; no job available

    # And the Submission is marked JE
    with app.app_context():
        sub.reload()
        assert sub.status == 6  # JE
  • Step 2: Run test

Run: cd Back-End && poetry run pytest tests/integration/test_runner_flow.py::test_max_attempts_marks_submission_je -v
Expected: PASS

  • Step 3: Run full integration suite

Run: cd Back-End && poetry run pytest tests/integration/ -v
Expected: All tests PASS

  • Step 4: Run full backend test suite as a final check

Run: cd Back-End && poetry run pytest --cov=./ --cov-config=.coveragerc
Expected: All pass; coverage on dispatch/ and model/runner.py at >85%

  • Step 5: Commit
cd Back-End
git add tests/integration/test_runner_flow.py
git commit -m "test(integration): verify max-attempts exhaustion marks Submission JE"

Plan A Done — Verification Checklist

Before declaring Plan A complete, verify:

  • All 21 tasks committed in Back-End submodule
  • poetry run pytest passes 100%
  • poetry run yapf --recursive --parallel --diff . shows no diff
  • Coverage report shows dispatch/ and model/runner.py >85%
  • No imports of removed code (target_sandbox, find_by_token, etc.)
  • Spec section 9 fully implemented (Backend internal structure)
  • Spec section 6 fully implemented (4 endpoints with correct behavior)
  • Spec section 13 failure modes have corresponding tests:
    • Orphan reclaim ✓ (Task 20)
    • Max attempts → JE ✓ (Task 21)
    • Wrong owner complete → 409 ✓ (Task 8 + 20)

Handoff to Plan B

Plan B (Runner refactor) is unblocked once this plan is complete and Backend is deployed in a test/staging environment. Plan B will:

  • Refactor Sandbox/app.py and Sandbox/dispatcher/dispatcher.py
  • Add new threads (poller, heartbeat, result_sender, registration)
  • Update Sandbox/Dockerfile
  • Update docker-compose.yml for Redis AOF + new env vars
  • Add a real-runner integration test that exercises both submodules

as535364 added 22 commits April 28, 2026 04:04
The 'hash' in the old name suggested a Redis HASH data structure, but the
key actually addresses a STRING storing a SHA-256 hex digest. Rename
removes ambiguity. Caught in code review before Task 2 wrote tests
against the old name.
Implement register, verify_token, renew_alive, is_alive in dispatch/runner.py
with TDD (8 test cases). Fix RedisCache fakeredis singleton so cross-instance
state is shared in tests (shared FakeServer).
- tests use RUNNER_ALIVE_TTL_SEC constant instead of hardcoded 30
- renew_alive() docstring documents caller's verify_token responsibility
Adds reclaim_orphan_atomic() backed by a single Lua script so that only
one runner can claim an orphaned job; also enforces max-attempts policy.
Adds lupa dev-dep to enable fakeredis Lua execution in tests.
redis-py's register_script() already handles EVALSHA caching internally.
The Python-side _script_cache + _get_reclaim_script() helpers added
unnecessary complexity. Caught in code review.
Implements POST /runners/register, POST /runners/<id>/heartbeat,
GET /runners/<id>/next-job, PUT /runners/<id>/jobs/<jb>/complete.
Wires blueprint into app.py and adds test_case_info property to
engine.Problem so dispatch layer can read task metadata via
duck-typing interface used in both production and mock tests.
Add _on_attempts_exhausted helper that updates Submission.status to 6
(JE) and deletes the orphaned job hash when Lua reclaim returns -1.
…o sandbox

- mongo/submission.py::submit() removes TESTING shortcut and old send() call;
  now calls dispatch.job.enqueue_job() to push to Redis queue
- Tests updated: removed manual enqueue_job() calls in test_runner_api.py
  that were workarounds before submit() was wired
- New test added: test_submit_enqueues_job_to_redis_pending verifies
  submit() pushes exactly one job to JOBS_PENDING with correct submission_id
…fixture

- mongo/submission.py::rejudge() now calls dispatch.job.enqueue_job()
  (consistent with submit() refactor in previous commit)
- tests/conftest.py: add autouse fixture that flushdb between every test
  to prevent fakeredis state leakage across tests (was causing pre-existing
  flakiness in TestTeacherGetSubmission)
Delete target_sandbox, send, sandbox_resp_handler, assign_token, and
verify_token from Submission class; also drop unused `import requests as rq`.
Drop on_submission_complete handler and OnSubmissionCompleteBody schema;
these were part of the push-based dispatch that is now replaced by the
pull-based runner API.
…onfig

Remove sandbox_instances field from UpdateConfigBody and simplify
update_config to only update rate_limit; runners now self-register
via the pull-based dispatch. Also drop unused imports (requests, current_app).
Update test_edit_config to match new API shape.
…ntyped ListField

Delete class Sandbox(EmbeddedDocument) which is no longer referenced.
Convert SubmissionConfig.sandbox_instances to a plain ListField with no
schema to preserve DB compatibility on production until the PG migration.
…token

- dispatch/runner.py: add verify_any_token() — scans registered runners
- model/problem.py: get_testdata/get_checksum/get_meta now accept any runner's token
- mongo/sandbox.py: deleted (find_by_token replaced)

Resolves the gap where Plan A Task 14 was blocked by these legacy endpoints.
Runners (Sandbox) now authenticate to the testdata fetch path with their
per-runner rk_token instead of the shared SANDBOX_TOKEN.
Covers Plan A spec Section 13 failure modes:
- Happy path: submit → pull → complete → status=AC
- Orphan reclaim: rn1 dies mid-job, rn2 picks up, rn1 zombie complete rejected
- Max attempts: 3 reclaims exhausted → Submission marked JE

These complete Plan A's verification — Backend dispatch path is end-to-end
exercised without needing the real Sandbox.
After task.to_mongo() the task is a SON dict (no .cases attribute).
The same fix is already applied in get_result() above; this method
was missed. Triggered 500 on GET /api/submission/<id> for any
submission with at least one task result.
@as535364 as535364 marked this pull request as draft April 28, 2026 06:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant