From b5c285fc9dbc59febf79a3a2014f7e98af7dcc17 Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 16 Jun 2026 17:59:56 +0200 Subject: [PATCH 1/4] refactor(node/api): restructure the HTTP API around typed DI and response models Remove the AggregatorController indirection and re-architect the node's HTTP API folder around explicit dependency injection and typed response bodies. Aggregator role: - Delete AggregatorController; the sync service already owns the live flag. - Remove the write-only NetworkService aggregator copy (nothing ever read it). - The admin endpoints read and mutate the sync service flag directly. HTTP API: - Add ApiContext: a frozen dependency bundle (spec, store getter, aggregator role control) with require_store and require_aggregator_role_control guards, replacing the stringly-keyed aiohttp app state and silencing NotAppKeyWarning. - Add typed snake-case Pydantic response models; serialization flows through the field types, so manual hex and JSON building are gone. - Consolidate the per-resource handler functions into one ApiHandlers class bound to an ApiContext, and delete the endpoints package. - Report fork-choice validator_count as null, not zero, when the head state is missing, so consumers cannot misread it. - Drop the unused is_admin route flag and document that admin routes are unauthenticated and must be restricted at the network layer. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/lean_spec/node/api/__init__.py | 2 - .../node/api/aggregator_controller.py | 71 -------- src/lean_spec/node/api/context.py | 48 +++++ src/lean_spec/node/api/endpoints/__init__.py | 1 - .../node/api/endpoints/aggregator.py | 78 -------- .../node/api/endpoints/checkpoints.py | 40 ---- .../node/api/endpoints/fork_choice.py | 74 -------- src/lean_spec/node/api/endpoints/health.py | 33 ---- src/lean_spec/node/api/endpoints/metrics.py | 27 --- src/lean_spec/node/api/endpoints/states.py | 46 ----- src/lean_spec/node/api/handlers.py | 171 ++++++++++++++++++ src/lean_spec/node/api/responses.py | 66 +++++++ src/lean_spec/node/api/routes.py | 37 ++-- src/lean_spec/node/api/server.py | 72 ++------ .../node/networking/service/service.py | 3 - src/lean_spec/node/node.py | 23 +-- tests/node/api/endpoints/conftest.py | 21 +-- tests/node/api/endpoints/test_aggregator.py | 4 +- tests/node/api/endpoints/test_health.py | 6 +- tests/node/api/test_aggregator_controller.py | 98 ---------- tests/node/api/test_server.py | 75 +++----- tests/node/networking/service/test_service.py | 21 --- 22 files changed, 368 insertions(+), 649 deletions(-) delete mode 100644 src/lean_spec/node/api/aggregator_controller.py create mode 100644 src/lean_spec/node/api/context.py delete mode 100644 src/lean_spec/node/api/endpoints/__init__.py delete mode 100644 src/lean_spec/node/api/endpoints/aggregator.py delete mode 100644 src/lean_spec/node/api/endpoints/checkpoints.py delete mode 100644 src/lean_spec/node/api/endpoints/fork_choice.py delete mode 100644 src/lean_spec/node/api/endpoints/health.py delete mode 100644 src/lean_spec/node/api/endpoints/metrics.py delete mode 100644 src/lean_spec/node/api/endpoints/states.py create mode 100644 src/lean_spec/node/api/handlers.py create mode 100644 src/lean_spec/node/api/responses.py delete mode 100644 tests/node/api/test_aggregator_controller.py diff --git a/src/lean_spec/node/api/__init__.py b/src/lean_spec/node/api/__init__.py index 26db171d6..883800152 100644 --- a/src/lean_spec/node/api/__init__.py +++ b/src/lean_spec/node/api/__init__.py @@ -1,10 +1,8 @@ """API server module for various API endpoints.""" -from lean_spec.node.api.aggregator_controller import AggregatorController from lean_spec.node.api.server import ApiServer, ApiServerConfig __all__ = [ - "AggregatorController", "ApiServer", "ApiServerConfig", ] diff --git a/src/lean_spec/node/api/aggregator_controller.py b/src/lean_spec/node/api/aggregator_controller.py deleted file mode 100644 index dbba621af..000000000 --- a/src/lean_spec/node/api/aggregator_controller.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -Runtime controller for the node's aggregator role. - -Exposes get/set operations over the shared is_aggregator flag so the admin -API can rotate aggregator duties across nodes without restarting. - -Toggles are serialized under an asyncio lock so concurrent admin requests -cannot leave the sync and network services disagreeing on the current role. -""" - -from __future__ import annotations - -import asyncio -import logging -from dataclasses import dataclass, field - -from lean_spec.node.networking import NetworkService -from lean_spec.node.sync import SyncService - -logger = logging.getLogger(__name__) - - -@dataclass(slots=True) -class AggregatorController: - """ - Runtime control over the node's aggregator role. - - Operators toggle the flag to rotate aggregation duties across nodes when - an active aggregator becomes unhealthy, without restarting the node. - - The spec-level semantics are unchanged: the sync service reads - is_aggregator on each gossip event and each tick, so flipping the flag - takes effect from the next event or tick onward. - """ - - sync_service: SyncService - """Sync service whose flag drives gossip-side aggregator behavior.""" - - network_service: NetworkService - """Network service whose flag mirrors the sync service for consistency.""" - - _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False, repr=False) - """Serializes concurrent toggles from API handlers.""" - - def is_enabled(self) -> bool: - """Return whether the node is currently acting as aggregator.""" - return self.sync_service.is_aggregator - - async def set_enabled(self, enabled: bool) -> bool: - """ - Update the aggregator role and return the previous value. - - The sync and network services are updated together under a lock so - both views remain consistent from any observer's perspective. - - Args: - enabled: Desired aggregator state. - - Returns: - Aggregator state prior to the update. - """ - async with self._lock: - previous = self.sync_service.is_aggregator - self.sync_service.is_aggregator = enabled - self.network_service.is_aggregator = enabled - if previous != enabled: - logger.info( - "Aggregator role %s via admin API", - "activated" if enabled else "deactivated", - ) - return previous diff --git a/src/lean_spec/node/api/context.py b/src/lean_spec/node/api/context.py new file mode 100644 index 000000000..c341a620b --- /dev/null +++ b/src/lean_spec/node/api/context.py @@ -0,0 +1,48 @@ +"""Typed dependencies the HTTP API handlers receive.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import Protocol + +from aiohttp import web + +from lean_spec.spec.forks import LstarSpec, Store + + +class AggregatorRoleControl(Protocol): + """The slice of the sync service the admin endpoints read and write.""" + + is_aggregator: bool + + +@dataclass(frozen=True, slots=True) +class ApiContext: + """Dependencies shared across handlers, resolved once at startup.""" + + spec: LstarSpec + """Fork spec driving consensus computations such as fork-choice weights.""" + + store_getter: Callable[[], Store | None] | None + """Callable returning the live store, or None before the store exists.""" + + aggregator_role_control: AggregatorRoleControl | None + """Holder of the aggregator flag, or None when aggregator control is unwired.""" + + def require_store(self) -> Store: + """ + Return the live store, or raise 503 when the node has no store yet. + + The store is a frozen snapshot, so all reads in one handler stay consistent. + """ + store = self.store_getter() if self.store_getter else None + if store is None: + raise web.HTTPServiceUnavailable(reason="Store not initialized") + return store + + def require_aggregator_role_control(self) -> AggregatorRoleControl: + """Return the aggregator role control, or raise 503 when it is unwired.""" + if self.aggregator_role_control is None: + raise web.HTTPServiceUnavailable(reason="Aggregator role control not available") + return self.aggregator_role_control diff --git a/src/lean_spec/node/api/endpoints/__init__.py b/src/lean_spec/node/api/endpoints/__init__.py deleted file mode 100644 index 78ef5b6ae..000000000 --- a/src/lean_spec/node/api/endpoints/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""API endpoint specifications.""" diff --git a/src/lean_spec/node/api/endpoints/aggregator.py b/src/lean_spec/node/api/endpoints/aggregator.py deleted file mode 100644 index 69f2a9580..000000000 --- a/src/lean_spec/node/api/endpoints/aggregator.py +++ /dev/null @@ -1,78 +0,0 @@ -"""Admin endpoints for toggling the aggregator role at runtime.""" - -from __future__ import annotations - -import json -import logging - -from aiohttp import web - -logger = logging.getLogger(__name__) - - -async def handle_status(request: web.Request) -> web.Response: - """ - Handle aggregator status request. - - Returns whether the node is currently acting as an aggregator. - - Response: JSON object with fields: - - is_aggregator (bool): Whether the node is currently acting as aggregator. - - Status Codes: - 200 OK: Status returned successfully. - 503 Service Unavailable: Aggregator controller not wired. - """ - controller = request.app.get("aggregator_controller") - if controller is None: - raise web.HTTPServiceUnavailable(reason="Aggregator controller not available") - - return web.Response( - body=json.dumps({"is_aggregator": controller.is_enabled()}), - content_type="application/json", - ) - - -async def handle_toggle(request: web.Request) -> web.Response: - """ - Handle aggregator toggle request. - - Activates or deactivates the aggregator role at runtime so operators can - rotate aggregator duties across nodes without restarting. - - Request body: JSON object with fields: - - enabled (bool): Desired aggregator state. - - Response: JSON object with fields: - - is_aggregator (bool): Aggregator state after the update. - - previous (bool): Aggregator state before the update. - - Status Codes: - 200 OK: Role updated successfully. - 400 Bad Request: Body missing, malformed, or with wrong field types. - 503 Service Unavailable: Aggregator controller not wired. - """ - controller = request.app.get("aggregator_controller") - if controller is None: - raise web.HTTPServiceUnavailable(reason="Aggregator controller not available") - - try: - request_body = await request.json() - except json.JSONDecodeError as exception: - raise web.HTTPBadRequest(reason="Invalid JSON body") from exception - - if not isinstance(request_body, dict) or "enabled" not in request_body: - raise web.HTTPBadRequest(reason="Missing 'enabled' field in body") - - enabled = request_body["enabled"] - # Explicit bool check rejects ints like 0/1, which JSON does not distinguish - # from booleans in loose parsers but Python does. - if not isinstance(enabled, bool): - raise web.HTTPBadRequest(reason="'enabled' must be a boolean") - - previous_aggregator_state = await controller.set_enabled(enabled) - - return web.Response( - body=json.dumps({"is_aggregator": enabled, "previous": previous_aggregator_state}), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/checkpoints.py b/src/lean_spec/node/api/endpoints/checkpoints.py deleted file mode 100644 index 371c86c8a..000000000 --- a/src/lean_spec/node/api/endpoints/checkpoints.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Checkpoints endpoint handlers.""" - -from __future__ import annotations - -import json - -from aiohttp import web - - -async def handle_justified(request: web.Request) -> web.Response: - """ - Handle justified checkpoint request. - - Returns the latest justified checkpoint for monitoring consensus progress. - - Response: JSON object with fields: - - slot (integer): The slot number of the justified checkpoint. - - root (string): The block root as 0x-prefixed hex string (66 characters total). - - Status Codes: - 200 OK: Checkpoint returned successfully. - 503 Service Unavailable: Store not initialized. - """ - store_getter = request.app.get("store_getter") - store = store_getter() if store_getter else None - - if store is None: - raise web.HTTPServiceUnavailable(reason="Store not initialized") - - justified = store.latest_justified - - return web.Response( - body=json.dumps( - { - "slot": justified.slot, - "root": "0x" + justified.root.hex(), - } - ), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/fork_choice.py b/src/lean_spec/node/api/endpoints/fork_choice.py deleted file mode 100644 index 1867c2d18..000000000 --- a/src/lean_spec/node/api/endpoints/fork_choice.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Fork choice endpoint handler.""" - -from __future__ import annotations - -import json - -from aiohttp import web - - -async def handle(request: web.Request) -> web.Response: - """ - Handle fork choice tree request. - - Returns the fork choice tree snapshot: blocks with weights, head, - checkpoints, safe target, and validator count. - - Response: JSON object with fields: - - nodes (array): Blocks in the tree, each with root, slot, parent_root, - proposer_index, and weight. - - head (string): Current head block root as 0x-prefixed hex. - - justified (object): Latest justified checkpoint (slot, root). - - finalized (object): Latest finalized checkpoint (slot, root). - - safe_target (string): Safe target block root as 0x-prefixed hex. - - validator_count (integer): Number of validators in head state. - - Status Codes: - 200 OK: Fork choice tree returned successfully. - 503 Service Unavailable: Store not initialized. - """ - store_getter = request.app.get("store_getter") - store = store_getter() if store_getter else None - - if store is None: - raise web.HTTPServiceUnavailable(reason="Store not initialized") - - finalized_slot = store.latest_finalized.slot - weights = request.app["spec"].compute_block_weights(store) - - nodes = [] - for root, block in store.blocks.items(): - if block.slot < finalized_slot: - continue - nodes.append( - { - "root": "0x" + root.hex(), - "slot": int(block.slot), - "parent_root": "0x" + block.parent_root.hex(), - "proposer_index": int(block.proposer_index), - "weight": weights.get(root, 0), - } - ) - - head_state = store.states.get(store.head) - validator_count = len(head_state.validators) if head_state is not None else 0 - - response = { - "nodes": nodes, - "head": "0x" + store.head.hex(), - "justified": { - "slot": int(store.latest_justified.slot), - "root": "0x" + store.latest_justified.root.hex(), - }, - "finalized": { - "slot": int(store.latest_finalized.slot), - "root": "0x" + store.latest_finalized.root.hex(), - }, - "safe_target": "0x" + store.safe_target.hex(), - "validator_count": validator_count, - } - - return web.Response( - body=json.dumps(response), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/health.py b/src/lean_spec/node/api/endpoints/health.py deleted file mode 100644 index cd6206fb1..000000000 --- a/src/lean_spec/node/api/endpoints/health.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Health endpoint specification and handler.""" - -from __future__ import annotations - -import json -from typing import Final - -from aiohttp import web - -STATUS_HEALTHY: Final = "healthy" -"""Fixed healthy status returned by the health endpoint.""" - -SERVICE_NAME: Final = "lean-rpc-api" -"""Fixed service identifier returned by the health endpoint.""" - - -async def handle(_request: web.Request) -> web.Response: - """ - Handle health check request. - - Returns server health status to indicate the service is operational. - - Response: JSON object with fields: - - status (string): Always healthy when the endpoint is reachable. - - service (string): Fixed identifier "lean-rpc-api". - - Status Codes: - 200 OK: Server is running. - """ - return web.Response( - body=json.dumps({"status": STATUS_HEALTHY, "service": SERVICE_NAME}), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/metrics.py b/src/lean_spec/node/api/endpoints/metrics.py deleted file mode 100644 index bc1dee81e..000000000 --- a/src/lean_spec/node/api/endpoints/metrics.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Metrics endpoint (Prometheus exposition).""" - -from __future__ import annotations - -from aiohttp import web - -from lean_spec.node.metrics.registry import get_metrics_output - -CONTENT_TYPE = "text/plain; version=0.0.4" -"""Prometheus text exposition media type, without the charset parameter.""" - -CHARSET = "utf-8" -"""Body encoding, passed separately from the media type. - -aiohttp rejects a charset inside the content_type argument. -It must travel through the dedicated charset parameter instead. -""" - - -async def handle(_request: web.Request) -> web.Response: - """ - Handle Prometheus metrics scrape request. - - Returns metrics in Prometheus text exposition format. - """ - body = get_metrics_output() - return web.Response(body=body, content_type=CONTENT_TYPE, charset=CHARSET) diff --git a/src/lean_spec/node/api/endpoints/states.py b/src/lean_spec/node/api/endpoints/states.py deleted file mode 100644 index 555244685..000000000 --- a/src/lean_spec/node/api/endpoints/states.py +++ /dev/null @@ -1,46 +0,0 @@ -"""States endpoint handlers.""" - -from __future__ import annotations - -import asyncio -import logging - -from aiohttp import web - -logger = logging.getLogger(__name__) - - -async def handle_finalized(request: web.Request) -> web.Response: - """ - Handle finalized state request. - - Returns the finalized beacon state as raw SSZ bytes (not snappy compressed). - - Response: SSZ-encoded State (binary, application/octet-stream) - - Status Codes: - 200 OK: State returned successfully. - 404 Not Found: Finalized state not available in store. - 503 Service Unavailable: Store not initialized. - """ - store_getter = request.app.get("store_getter") - store = store_getter() if store_getter else None - - if store is None: - raise web.HTTPServiceUnavailable(reason="Store not initialized") - - finalized = store.latest_finalized - - if finalized.root not in store.states: - raise web.HTTPNotFound(reason="Finalized state not available") - - state = store.states[finalized.root] - - # Implementation detail: offload CPU-intensive encoding to thread pool - try: - ssz_bytes = await asyncio.to_thread(state.encode_bytes) - except Exception as exception: - logger.error("Failed to encode state: %s", exception) - raise web.HTTPInternalServerError(reason="Encoding failed") from exception - - return web.Response(body=ssz_bytes, content_type="application/octet-stream") diff --git a/src/lean_spec/node/api/handlers.py b/src/lean_spec/node/api/handlers.py new file mode 100644 index 000000000..f3c709a5c --- /dev/null +++ b/src/lean_spec/node/api/handlers.py @@ -0,0 +1,171 @@ +"""HTTP handlers exposing the node's API over aiohttp.""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass +from typing import Final + +from aiohttp import web + +from lean_spec.node.api.context import ApiContext +from lean_spec.node.api.responses import ( + AggregatorStatusBody, + AggregatorToggleBody, + CheckpointBody, + ForkChoiceBody, + ForkChoiceNode, + HealthBody, + json_response, +) +from lean_spec.node.metrics.registry import get_metrics_output + +logger = logging.getLogger(__name__) + +STATUS_HEALTHY: Final = "healthy" +"""Fixed healthy status returned by the health endpoint.""" + +SERVICE_NAME: Final = "lean-rpc-api" +"""Fixed service identifier returned by the health endpoint.""" + +METRICS_CONTENT_TYPE: Final = "text/plain; version=0.0.4" +"""Prometheus text exposition media type, without the charset parameter.""" + +METRICS_CHARSET: Final = "utf-8" +""" +Body encoding, passed separately from the media type. + +aiohttp rejects a charset inside the content_type argument. + +It must travel through the dedicated charset parameter instead. +""" + + +@dataclass(frozen=True, slots=True) +class ApiHandlers: + """The aiohttp request handlers, bound to the dependencies they serve.""" + + context: ApiContext + """Dependency bundle resolved once at server startup.""" + + async def health(self, request: web.Request) -> web.Response: + """Report that the service is reachable.""" + return json_response(HealthBody(status=STATUS_HEALTHY, service=SERVICE_NAME)) + + async def metrics(self, request: web.Request) -> web.Response: + """Return node metrics in Prometheus text exposition format.""" + return web.Response( + body=get_metrics_output(), + content_type=METRICS_CONTENT_TYPE, + charset=METRICS_CHARSET, + ) + + async def justified_checkpoint(self, request: web.Request) -> web.Response: + """Return the latest justified checkpoint.""" + store = self.context.require_store() + return json_response( + CheckpointBody(slot=store.latest_justified.slot, root=store.latest_justified.root) + ) + + async def fork_choice(self, request: web.Request) -> web.Response: + """ + Return a snapshot of the fork-choice tree. + + Weights count only the votes currently driving the head. + They exclude attestations seen but not yet counted. + """ + store = self.context.require_store() + weights = self.context.spec.compute_block_weights(store) + + nodes = [ + ForkChoiceNode( + root=root, + slot=block.slot, + parent_root=block.parent_root, + proposer_index=block.proposer_index, + weight=weights.get(root, 0), + ) + for root, block in store.blocks.items() + if block.slot >= store.latest_finalized.slot + ] + + # Report a missing head state as null, not zero. + # Zero would read as "no validators" rather than "state unavailable". + head_state = store.states.get(store.head) + validator_count = len(head_state.validators) if head_state is not None else None + + return json_response( + ForkChoiceBody( + nodes=nodes, + head=store.head, + justified=CheckpointBody( + slot=store.latest_justified.slot, root=store.latest_justified.root + ), + finalized=CheckpointBody( + slot=store.latest_finalized.slot, root=store.latest_finalized.root + ), + safe_target=store.safe_target, + validator_count=validator_count, + ) + ) + + async def finalized_state(self, request: web.Request) -> web.Response: + """Return the finalized beacon state as SSZ bytes.""" + store = self.context.require_store() + + if store.latest_finalized.root not in store.states: + raise web.HTTPNotFound(reason="Finalized state not available") + + state = store.states[store.latest_finalized.root] + + # Encoding a full state is CPU-heavy, so run it off the event loop. + try: + ssz_bytes = await asyncio.to_thread(state.encode_bytes) + except Exception as exception: + logger.error("Failed to encode state: %s", exception) + raise web.HTTPInternalServerError(reason="Encoding failed") from exception + + return web.Response(body=ssz_bytes, content_type="application/octet-stream") + + async def aggregator_status(self, request: web.Request) -> web.Response: + """Report whether the node is acting as an aggregator.""" + aggregator_role_control = self.context.require_aggregator_role_control() + return json_response( + AggregatorStatusBody(is_aggregator=aggregator_role_control.is_aggregator) + ) + + async def aggregator_toggle(self, request: web.Request) -> web.Response: + """ + Set the aggregator role at runtime and report the previous value. + + Raises: + HTTPBadRequest: Body missing, malformed, or 'enabled' not a boolean. + """ + aggregator_role_control = self.context.require_aggregator_role_control() + + try: + request_body = await request.json() + except json.JSONDecodeError as exception: + raise web.HTTPBadRequest(reason="Invalid JSON body") from exception + + if not isinstance(request_body, dict) or "enabled" not in request_body: + raise web.HTTPBadRequest(reason="Missing 'enabled' field in body") + + enabled = request_body["enabled"] + # Reject ints like 0 and 1, which loose JSON parsers blur with booleans but Python does not. + if not isinstance(enabled, bool): + raise web.HTTPBadRequest(reason="'enabled' must be a boolean") + + previous_aggregator_state = aggregator_role_control.is_aggregator + aggregator_role_control.is_aggregator = enabled + if previous_aggregator_state != enabled: + logger.info( + "Aggregator role %s via admin API", + "activated" if enabled else "deactivated", + ) + + return json_response( + AggregatorToggleBody(is_aggregator=enabled, previous=previous_aggregator_state) + ) diff --git a/src/lean_spec/node/api/responses.py b/src/lean_spec/node/api/responses.py new file mode 100644 index 000000000..a182f643a --- /dev/null +++ b/src/lean_spec/node/api/responses.py @@ -0,0 +1,66 @@ +"""Typed response bodies for the HTTP API wire contract.""" + +from aiohttp import web +from pydantic import BaseModel, ConfigDict + +from lean_spec.spec.forks.lstar.containers import Slot +from lean_spec.spec.ssz import Bytes32 + + +class ApiResponseBody(BaseModel): + """Base for every JSON response body, keyed snake_case, never camelCase-aliased.""" + + model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True) + + +class HealthBody(ApiResponseBody): + """Liveness probe response.""" + + status: str + service: str + + +class CheckpointBody(ApiResponseBody): + """A checkpoint on the wire: its slot and block root.""" + + slot: Slot + root: Bytes32 + + +class ForkChoiceNode(ApiResponseBody): + """One block in the fork-choice tree view.""" + + root: Bytes32 + slot: Slot + parent_root: Bytes32 + proposer_index: int + weight: int + + +class ForkChoiceBody(ApiResponseBody): + """A snapshot of the fork-choice tree.""" + + nodes: list[ForkChoiceNode] + head: Bytes32 + justified: CheckpointBody + finalized: CheckpointBody + safe_target: Bytes32 + validator_count: int | None + + +class AggregatorStatusBody(ApiResponseBody): + """The node's current aggregator role.""" + + is_aggregator: bool + + +class AggregatorToggleBody(ApiResponseBody): + """The result of a runtime aggregator-role change.""" + + is_aggregator: bool + previous: bool + + +def json_response(body: ApiResponseBody) -> web.Response: + """Serialize a response model to a JSON HTTP response.""" + return web.Response(body=body.model_dump_json(), content_type="application/json") diff --git a/src/lean_spec/node/api/routes.py b/src/lean_spec/node/api/routes.py index b7199cb39..7a303fff2 100644 --- a/src/lean_spec/node/api/routes.py +++ b/src/lean_spec/node/api/routes.py @@ -7,21 +7,14 @@ from aiohttp import web -from lean_spec.node.api.endpoints import ( - aggregator, - checkpoints, - fork_choice, - health, - metrics, - states, -) +from lean_spec.node.api.handlers import ApiHandlers Handler = Callable[[web.Request], Awaitable[web.Response]] -"""Type alias for aiohttp request handlers.""" +"""Request handler already bound to its dependencies.""" class Route(NamedTuple): - """One API route: its verb, path, handler, and access tier.""" + """One API route: its verb, path, and handler.""" method: str """HTTP verb the route responds to.""" @@ -32,17 +25,15 @@ class Route(NamedTuple): handler: Handler """Coroutine that serves requests to this route.""" - is_admin: bool - """True for privileged admin routes, False for public read-only routes.""" - -ROUTES: list[Route] = [ - Route("GET", "/lean/v0/health", health.handle, is_admin=False), - Route("GET", "/lean/v0/states/finalized", states.handle_finalized, is_admin=False), - Route("GET", "/lean/v0/checkpoints/justified", checkpoints.handle_justified, is_admin=False), - Route("GET", "/lean/v0/fork_choice", fork_choice.handle, is_admin=False), - Route("GET", "/metrics", metrics.handle, is_admin=False), - Route("GET", "/lean/v0/admin/aggregator", aggregator.handle_status, is_admin=True), - Route("POST", "/lean/v0/admin/aggregator", aggregator.handle_toggle, is_admin=True), -] -"""Every API route, public and admin alike, tagged by access tier.""" +def build_routes(handlers: ApiHandlers) -> list[Route]: + """Bind every API route to its handler method.""" + return [ + Route("GET", "/lean/v0/health", handlers.health), + Route("GET", "/lean/v0/states/finalized", handlers.finalized_state), + Route("GET", "/lean/v0/checkpoints/justified", handlers.justified_checkpoint), + Route("GET", "/lean/v0/fork_choice", handlers.fork_choice), + Route("GET", "/metrics", handlers.metrics), + Route("GET", "/lean/v0/admin/aggregator", handlers.aggregator_status), + Route("POST", "/lean/v0/admin/aggregator", handlers.aggregator_toggle), + ] diff --git a/src/lean_spec/node/api/server.py b/src/lean_spec/node/api/server.py index 123fc435f..efbd58105 100644 --- a/src/lean_spec/node/api/server.py +++ b/src/lean_spec/node/api/server.py @@ -1,9 +1,4 @@ -""" -API server implementation using aiohttp. - -Provides the HTTP server that serves routes defined in routes.py. -See endpoints/ for endpoint specifications and handlers. -""" +"""API server implementation using aiohttp.""" from __future__ import annotations @@ -14,25 +9,17 @@ from aiohttp import web -from lean_spec.node.api.aggregator_controller import AggregatorController -from lean_spec.node.api.routes import ROUTES +from lean_spec.node.api.context import AggregatorRoleControl, ApiContext +from lean_spec.node.api.handlers import ApiHandlers +from lean_spec.node.api.routes import build_routes from lean_spec.spec.forks import LstarSpec, Store logger = logging.getLogger(__name__) -# The following classes are implementation details. -# Other implementations may structure their code differently. - - @dataclass(frozen=True, slots=True) class ApiServerConfig: - """ - Configuration for the API server. - - Implementation-specific. Other implementations may use different - configuration patterns (env vars, config files, CLI args, etc.). - """ + """Configuration for the API server.""" host: str = "0.0.0.0" """Host address to bind to.""" @@ -43,16 +30,7 @@ class ApiServerConfig: @dataclass(slots=True) class ApiServer: - """ - HTTP API server using aiohttp. - - Implementation-specific. This class handles: - - Server lifecycle (start, stop, run) - - Route registration - - Store access via callable getter - - Other implementations may use different frameworks or patterns. - """ + """HTTP API server using aiohttp.""" config: ApiServerConfig """Server configuration.""" @@ -63,13 +41,8 @@ class ApiServer: store_getter: Callable[[], Store | None] | None = None """Callable that returns the current Store instance.""" - aggregator_controller: AggregatorController | None = None - """ - Optional controller for toggling the aggregator role at runtime. - - When present, the admin aggregator endpoints can query and mutate the - node's aggregator flag. When absent, those endpoints return 503. - """ + aggregator_role_control: AggregatorRoleControl | None = None + """Optional runtime accessor for the node's aggregator role.""" _runner: web.AppRunner | None = field(default=None, init=False) """aiohttp application runner.""" @@ -89,18 +62,17 @@ async def start(self) -> None: """Start the API server in the background.""" app = web.Application() - # Store the store_getter in app for handlers that need store access - app["store_getter"] = self.store_getter - - # Expose the fork spec for handlers that drive consensus computations. - app["spec"] = self.spec - - # Expose the aggregator controller to admin endpoints. - # Absence is fine; endpoints return 503 when unset. - app["aggregator_controller"] = self.aggregator_controller - - # Register every route from the unified table, keyed by its verb. - routes = [web.route(route.method, route.path, route.handler) for route in ROUTES] + # Resolve the shared dependencies once and bind them to the handlers. + # Every route then serves through a handler method that reads them. + context = ApiContext( + spec=self.spec, + store_getter=self.store_getter, + aggregator_role_control=self.aggregator_role_control, + ) + handlers = ApiHandlers(context) + routes = [ + web.route(route.method, route.path, route.handler) for route in build_routes(handlers) + ] app.add_routes(routes) self._runner = web.AppRunner(app) @@ -112,11 +84,7 @@ async def start(self) -> None: logger.info("API server listening on %s:%d", self.config.host, self.config.port) async def run(self) -> None: - """ - Run the API server until shutdown. - - Blocks until stop() is called. - """ + """Run the API server until it is asked to stop.""" await self.start() await self._stop_event.wait() diff --git a/src/lean_spec/node/networking/service/service.py b/src/lean_spec/node/networking/service/service.py index 63bb52990..178662a22 100644 --- a/src/lean_spec/node/networking/service/service.py +++ b/src/lean_spec/node/networking/service/service.py @@ -76,9 +76,6 @@ class NetworkService: network_name: str = field(default="0x00000000") """Network name for gossip topics (4-byte hex string).""" - is_aggregator: bool = field(default=False) - """Whether this node functions as an aggregator.""" - _running: bool = field(default=False, repr=False) """Whether the event loop is running.""" diff --git a/src/lean_spec/node/node.py b/src/lean_spec/node/node.py index 0d210a31a..6b10db1b7 100644 --- a/src/lean_spec/node/node.py +++ b/src/lean_spec/node/node.py @@ -19,7 +19,7 @@ from pathlib import Path from typing import Final -from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.node.api import ApiServer, ApiServerConfig from lean_spec.node.chain import SlotClock from lean_spec.node.chain.service import ChainService from lean_spec.node.metrics import registry as metrics @@ -128,10 +128,10 @@ class NodeConfig: When False (default): - The node runs in standard validator or passive mode - Seeds the initial value of the live aggregator flag on SyncService and - NetworkService. The flag can be toggled at runtime via the admin API - (see AggregatorController). Runtime toggles do not persist across - restarts and do not update the local ENR or subnet subscriptions. + Seeds the initial value of the live aggregator flag on the sync service. + The flag can be toggled at runtime via the admin API. + Runtime toggles do not persist across restarts. + They do not update the local ENR or subnet subscriptions. """ anchor_store: Store | None = field(default=None) @@ -299,7 +299,6 @@ def from_genesis(cls, config: NodeConfig) -> Node: sync_service=sync_service, event_source=config.event_source, network_name=config.network_name, - is_aggregator=config.is_aggregator, ) # Wire up aggregated attestation publishing. @@ -311,18 +310,14 @@ def from_genesis(cls, config: NodeConfig) -> Node: # Create API server if configured api_server: ApiServer | None = None if config.api_config is not None: - # Controller lets the admin API rotate the aggregator role at - # runtime when another aggregator becomes unhealthy. - aggregator_controller = AggregatorController( - sync_service=sync_service, - network_service=network_service, - ) - # Store getter captures sync_service to get the live store + # The admin API reads and mutates the sync service aggregator flag, + # letting operators rotate the role at runtime without a restart. + # Store getter captures sync_service to get the live store. api_server = ApiServer( config=config.api_config, spec=fork, store_getter=lambda: sync_service.store, - aggregator_controller=aggregator_controller, + aggregator_role_control=sync_service, ) # Create validator service if registry provided. diff --git a/tests/node/api/endpoints/conftest.py b/tests/node/api/endpoints/conftest.py index d834914ae..dcecaaaa9 100644 --- a/tests/node/api/endpoints/conftest.py +++ b/tests/node/api/endpoints/conftest.py @@ -10,29 +10,19 @@ import pytest from consensus_testing import make_genesis_store -from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.node.api import ApiServer, ApiServerConfig # Default port for auto-started local server DEFAULT_PORT = 15099 @dataclass(slots=True) -class _AggregatorStub: - """Minimal stub exposing only the is_aggregator flag.""" +class _AggregatorRoleStub: + """Minimal stand-in exposing only the aggregator flag the endpoints touch.""" is_aggregator: bool = field(default=False) -def _make_conformance_controller(initial: bool = False) -> AggregatorController: - """Build an AggregatorController backed by lightweight stubs.""" - sync_stub = _AggregatorStub(is_aggregator=initial) - network_stub = _AggregatorStub(is_aggregator=initial) - return AggregatorController( - sync_service=sync_stub, # type: ignore[arg-type] - network_service=network_stub, # type: ignore[arg-type] - ) - - class _ServerThread(threading.Thread): """Thread that runs the API server in its own event loop.""" @@ -64,15 +54,14 @@ def run(self) -> None: self.loop.close() def _create_server(self) -> ApiServer: - """Create the API server with a test store and aggregator controller.""" + """Create the API server with a test store and aggregator flag holder.""" store = make_genesis_store(num_validators=3, observer=True, genesis_time=int(time.time())) - controller = _make_conformance_controller(initial=False) config = ApiServerConfig(host="127.0.0.1", port=self.port) return ApiServer( config=config, store_getter=lambda: store, - aggregator_controller=controller, + aggregator_role_control=_AggregatorRoleStub(), ) def stop(self) -> None: diff --git a/tests/node/api/endpoints/test_aggregator.py b/tests/node/api/endpoints/test_aggregator.py index 6fb8044e9..4cb25808d 100644 --- a/tests/node/api/endpoints/test_aggregator.py +++ b/tests/node/api/endpoints/test_aggregator.py @@ -1,7 +1,7 @@ """ Tests for the admin aggregator endpoint. -The conformance server is started with a controller seeded to disabled, +The conformance server is started with the aggregator role seeded to disabled, so tests exercise both the happy path and error cases. """ @@ -14,7 +14,7 @@ class TestAggregatorStatus: """Tests for GET /lean/v0/admin/aggregator.""" def test_returns_200(self, server_url: str) -> None: - """GET returns 200 when the controller is wired.""" + """GET returns 200 when the sync service is wired.""" response = httpx.get(f"{server_url}/lean/v0/admin/aggregator") assert response.status_code == 200 diff --git a/tests/node/api/endpoints/test_health.py b/tests/node/api/endpoints/test_health.py index ce6a95306..67416e64f 100644 --- a/tests/node/api/endpoints/test_health.py +++ b/tests/node/api/endpoints/test_health.py @@ -2,7 +2,7 @@ import httpx -from lean_spec.node.api.endpoints import health +from lean_spec.node.api.handlers import SERVICE_NAME, STATUS_HEALTHY def get_health(server_url: str) -> httpx.Response: @@ -31,7 +31,7 @@ def test_health_response_structure(server_url: str) -> None: response_body = response.json() assert "status" in response_body - assert response_body["status"] == health.STATUS_HEALTHY + assert response_body["status"] == STATUS_HEALTHY assert "service" in response_body - assert response_body["service"] == health.SERVICE_NAME + assert response_body["service"] == SERVICE_NAME diff --git a/tests/node/api/test_aggregator_controller.py b/tests/node/api/test_aggregator_controller.py deleted file mode 100644 index 4e9487bdb..000000000 --- a/tests/node/api/test_aggregator_controller.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Tests for the aggregator runtime controller.""" - -from __future__ import annotations - -import asyncio - -from consensus_testing import MockEventSource, create_mock_sync_service -from lean_spec.node.api import AggregatorController -from lean_spec.node.networking import NetworkService, PeerId -from lean_spec.node.sync import SyncService - -FORK_DIGEST = "0x00000000" - - -def _make_controller( - peer_id: PeerId, - *, - initial: bool = False, -) -> tuple[AggregatorController, SyncService, NetworkService]: - """Build a controller wired to realistic sync/network services.""" - sync_service = create_mock_sync_service(peer_id) - sync_service.is_aggregator = initial - network_service = NetworkService( - sync_service=sync_service, - event_source=MockEventSource(events=[]), - network_name=FORK_DIGEST, - is_aggregator=initial, - ) - controller = AggregatorController( - sync_service=sync_service, - network_service=network_service, - ) - return controller, sync_service, network_service - - -class TestAggregatorControllerRead: - """Tests for the read path.""" - - def test_is_enabled_reflects_sync_service_flag(self, peer_id: PeerId) -> None: - """is_enabled reads the sync service flag as the source of truth.""" - controller, sync_service, _ = _make_controller(peer_id, initial=False) - assert controller.is_enabled() is False - - sync_service.is_aggregator = True - assert controller.is_enabled() is True - - -class TestAggregatorControllerWrite: - """Tests for the write path.""" - - async def test_set_enabled_activates_role(self, peer_id: PeerId) -> None: - """set_enabled(True) flips the flag on both services.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=False) - - previous = await controller.set_enabled(True) - - assert previous is False - assert controller.is_enabled() is True - assert sync_service.is_aggregator is True - assert network_service.is_aggregator is True - - async def test_set_enabled_deactivates_role(self, peer_id: PeerId) -> None: - """set_enabled(False) flips the flag off on both services.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=True) - - previous = await controller.set_enabled(False) - - assert previous is True - assert controller.is_enabled() is False - assert sync_service.is_aggregator is False - assert network_service.is_aggregator is False - - async def test_set_enabled_idempotent(self, peer_id: PeerId) -> None: - """Setting the same value returns the current value and leaves state intact.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=True) - - previous = await controller.set_enabled(True) - - assert previous is True - assert sync_service.is_aggregator is True - assert network_service.is_aggregator is True - - async def test_sequential_toggles_converge(self, peer_id: PeerId) -> None: - """Sequential toggles each see the prior state and converge to the last value.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=False) - - previous_states = await asyncio.gather( - controller.set_enabled(True), - controller.set_enabled(False), - controller.set_enabled(True), - ) - - # asyncio.gather on a single-threaded event loop preserves order. - # Each toggle sees the previous state correctly. - assert previous_states == [False, True, False] - assert controller.is_enabled() is True - assert sync_service.is_aggregator is True - assert network_service.is_aggregator is True diff --git a/tests/node/api/test_server.py b/tests/node/api/test_server.py index 1f616aef0..1b2ac002a 100644 --- a/tests/node/api/test_server.py +++ b/tests/node/api/test_server.py @@ -16,30 +16,19 @@ import httpx -from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.node.api import ApiServer, ApiServerConfig from lean_spec.spec.forks.lstar import Store @dataclass(slots=True) -class _AggregatorStub: - """Minimal aggregator role holder for wiring AggregatorController in tests.""" +class _AggregatorRoleStub: + """Minimal stand-in exposing only the aggregator flag the endpoints touch. - is_aggregator: bool = field(default=False) - - -def _make_test_controller(initial: bool = False) -> AggregatorController: + Avoids pulling in the full sync service dependency graph for server-level + tests that only exercise the flag-toggle contract. """ - Build an AggregatorController backed by lightweight stubs. - Avoids pulling in the full SyncService / NetworkService dependency graph - for endpoint-level tests that only exercise the flag-toggle contract. - """ - sync_stub = _AggregatorStub(is_aggregator=initial) - network_stub = _AggregatorStub(is_aggregator=initial) - return AggregatorController( - sync_service=sync_stub, # type: ignore[arg-type] - network_service=network_stub, # type: ignore[arg-type] - ) + is_aggregator: bool = field(default=False) class TestApiServerConfiguration: @@ -192,8 +181,8 @@ async def test_returns_200_with_initialized_store(self, base_store: Store) -> No class TestAggregatorAdminEndpoint: """Tests for the /lean/v0/admin/aggregator endpoint.""" - async def test_status_returns_503_without_controller(self) -> None: - """GET returns 503 when no controller is wired.""" + async def test_status_returns_503_without_aggregator_control(self) -> None: + """GET returns 503 when no aggregator role control is wired.""" config = ApiServerConfig(port=15060) server = ApiServer(config=config) @@ -209,10 +198,10 @@ async def test_status_returns_503_without_controller(self) -> None: await server.aclose() async def test_status_returns_current_role(self) -> None: - """GET returns the current aggregator role from the controller.""" - controller = _make_test_controller(initial=True) + """GET returns the current aggregator role from the sync service flag.""" + aggregator_role_stub = _AggregatorRoleStub(is_aggregator=True) config = ApiServerConfig(port=15061) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -229,9 +218,9 @@ async def test_status_returns_current_role(self) -> None: async def test_toggle_activates_role(self) -> None: """POST with enabled=true activates the aggregator role.""" - controller = _make_test_controller(initial=False) + aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15062) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -244,7 +233,7 @@ async def test_toggle_activates_role(self) -> None: assert response.status_code == 200 assert response.json() == {"is_aggregator": True, "previous": False} - assert controller.is_enabled() is True + assert aggregator_role_stub.is_aggregator is True # A follow-up GET sees the new value. follow_up = await client.get("http://127.0.0.1:15062/lean/v0/admin/aggregator") @@ -255,9 +244,9 @@ async def test_toggle_activates_role(self) -> None: async def test_toggle_deactivates_role(self) -> None: """POST with enabled=false deactivates the aggregator role.""" - controller = _make_test_controller(initial=True) + aggregator_role_stub = _AggregatorRoleStub(is_aggregator=True) config = ApiServerConfig(port=15063) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -270,16 +259,15 @@ async def test_toggle_deactivates_role(self) -> None: assert response.status_code == 200 assert response.json() == {"is_aggregator": False, "previous": True} - assert controller.is_enabled() is False + assert aggregator_role_stub.is_aggregator is False finally: await server.aclose() async def test_toggle_rejects_missing_body(self) -> None: """POST with no body returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15064) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) await server.start() @@ -298,9 +286,8 @@ async def test_toggle_rejects_missing_body(self) -> None: async def test_toggle_rejects_missing_field(self) -> None: """POST without 'enabled' field returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15065) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) await server.start() @@ -318,9 +305,9 @@ async def test_toggle_rejects_missing_field(self) -> None: async def test_toggle_rejects_non_boolean(self) -> None: """POST with non-boolean 'enabled' returns 400 and does not change state.""" - controller = _make_test_controller(initial=False) + aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15066) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -332,7 +319,7 @@ async def test_toggle_rejects_non_boolean(self) -> None: ) assert response.status_code == 400 - assert controller.is_enabled() is False + assert aggregator_role_stub.is_aggregator is False finally: await server.aclose() @@ -346,9 +333,9 @@ async def test_sequential_posts_converge(self) -> None: the last-writer-wins assertion becomes flaky on slower runners (observed: Python 3.14 macOS). """ - controller = _make_test_controller(initial=False) + aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15067) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -362,16 +349,15 @@ async def test_sequential_posts_converge(self) -> None: ] assert all(r.status_code == 200 for r in responses) - assert controller.is_enabled() is True + assert aggregator_role_stub.is_aggregator is True finally: await server.aclose() async def test_toggle_rejects_null_body(self) -> None: """POST with JSON null body returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15068) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) await server.start() @@ -390,9 +376,8 @@ async def test_toggle_rejects_null_body(self) -> None: async def test_toggle_rejects_array_body(self) -> None: """POST with JSON array body returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15069) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) await server.start() @@ -410,9 +395,9 @@ async def test_toggle_rejects_array_body(self) -> None: async def test_toggle_rejects_integer_enabled(self) -> None: """POST with integer 1 as enabled returns 400 (must be boolean).""" - controller = _make_test_controller(initial=False) + aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15070) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -424,7 +409,7 @@ async def test_toggle_rejects_integer_enabled(self) -> None: ) assert response.status_code == 400 - assert controller.is_enabled() is False + assert aggregator_role_stub.is_aggregator is False finally: await server.aclose() diff --git a/tests/node/networking/service/test_service.py b/tests/node/networking/service/test_service.py index f71ffeac9..f8cd8c387 100644 --- a/tests/node/networking/service/test_service.py +++ b/tests/node/networking/service/test_service.py @@ -490,27 +490,6 @@ def test_default_fork_digest(self, peer_id: PeerId) -> None: ) assert svc.network_name == "0x00000000" - def test_default_is_aggregator(self, peer_id: PeerId) -> None: - """is_aggregator defaults to False.""" - source = MockEventSource(events=[]) - sync_service = create_mock_sync_service(peer_id) - svc = NetworkService( - sync_service=sync_service, - event_source=source, - ) - assert svc.is_aggregator is False - - def test_custom_is_aggregator(self, peer_id: PeerId) -> None: - """Constructor accepts is_aggregator.""" - source = MockEventSource(events=[]) - sync_service = create_mock_sync_service(peer_id) - svc = NetworkService( - sync_service=sync_service, - event_source=source, - is_aggregator=True, - ) - assert svc.is_aggregator is True - # Behavioral routing to the forkchoice store From ff9acfe7524477c3d7b61dc9a703f57c047a676d Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 16 Jun 2026 18:07:11 +0200 Subject: [PATCH 2/4] refactor(node/api): inline the route table and trim the response base Follow-up simplifications after the API restructure: - Delete routes.py. The Route NamedTuple only mirrored aiohttp's own RouteDef and build_routes had a single caller, so register the routes inline with web.get/web.post in the server start path. - Drop arbitrary_types_allowed from the response base. The SSZ field types carry native pydantic schemas, so the flag earned nothing and falsely implied the fields were opaque. - Note on the JSON response helper why it is built by hand: web.json_response would append a charset the wire contract does not use. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/lean_spec/node/api/responses.py | 4 ++- src/lean_spec/node/api/routes.py | 39 ----------------------------- src/lean_spec/node/api/server.py | 19 ++++++++++---- 3 files changed, 17 insertions(+), 45 deletions(-) delete mode 100644 src/lean_spec/node/api/routes.py diff --git a/src/lean_spec/node/api/responses.py b/src/lean_spec/node/api/responses.py index a182f643a..cf3477e03 100644 --- a/src/lean_spec/node/api/responses.py +++ b/src/lean_spec/node/api/responses.py @@ -10,7 +10,7 @@ class ApiResponseBody(BaseModel): """Base for every JSON response body, keyed snake_case, never camelCase-aliased.""" - model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True) + model_config = ConfigDict(frozen=True) class HealthBody(ApiResponseBody): @@ -63,4 +63,6 @@ class AggregatorToggleBody(ApiResponseBody): def json_response(body: ApiResponseBody) -> web.Response: """Serialize a response model to a JSON HTTP response.""" + # Build the response by hand rather than via web.json_response. + # That helper appends "; charset=utf-8", but the wire contract is the bare media type. return web.Response(body=body.model_dump_json(), content_type="application/json") diff --git a/src/lean_spec/node/api/routes.py b/src/lean_spec/node/api/routes.py deleted file mode 100644 index 7a303fff2..000000000 --- a/src/lean_spec/node/api/routes.py +++ /dev/null @@ -1,39 +0,0 @@ -"""API route definitions.""" - -from __future__ import annotations - -from collections.abc import Awaitable, Callable -from typing import NamedTuple - -from aiohttp import web - -from lean_spec.node.api.handlers import ApiHandlers - -Handler = Callable[[web.Request], Awaitable[web.Response]] -"""Request handler already bound to its dependencies.""" - - -class Route(NamedTuple): - """One API route: its verb, path, and handler.""" - - method: str - """HTTP verb the route responds to.""" - - path: str - """URL path the route is registered under.""" - - handler: Handler - """Coroutine that serves requests to this route.""" - - -def build_routes(handlers: ApiHandlers) -> list[Route]: - """Bind every API route to its handler method.""" - return [ - Route("GET", "/lean/v0/health", handlers.health), - Route("GET", "/lean/v0/states/finalized", handlers.finalized_state), - Route("GET", "/lean/v0/checkpoints/justified", handlers.justified_checkpoint), - Route("GET", "/lean/v0/fork_choice", handlers.fork_choice), - Route("GET", "/metrics", handlers.metrics), - Route("GET", "/lean/v0/admin/aggregator", handlers.aggregator_status), - Route("POST", "/lean/v0/admin/aggregator", handlers.aggregator_toggle), - ] diff --git a/src/lean_spec/node/api/server.py b/src/lean_spec/node/api/server.py index efbd58105..7726e16fe 100644 --- a/src/lean_spec/node/api/server.py +++ b/src/lean_spec/node/api/server.py @@ -11,7 +11,6 @@ from lean_spec.node.api.context import AggregatorRoleControl, ApiContext from lean_spec.node.api.handlers import ApiHandlers -from lean_spec.node.api.routes import build_routes from lean_spec.spec.forks import LstarSpec, Store logger = logging.getLogger(__name__) @@ -70,10 +69,20 @@ async def start(self) -> None: aggregator_role_control=self.aggregator_role_control, ) handlers = ApiHandlers(context) - routes = [ - web.route(route.method, route.path, route.handler) for route in build_routes(handlers) - ] - app.add_routes(routes) + + # The admin routes under /lean/v0/admin are unauthenticated. + # Deployments must restrict access to them at the network layer. + app.add_routes( + [ + web.get("/lean/v0/health", handlers.health), + web.get("/lean/v0/states/finalized", handlers.finalized_state), + web.get("/lean/v0/checkpoints/justified", handlers.justified_checkpoint), + web.get("/lean/v0/fork_choice", handlers.fork_choice), + web.get("/metrics", handlers.metrics), + web.get("/lean/v0/admin/aggregator", handlers.aggregator_status), + web.post("/lean/v0/admin/aggregator", handlers.aggregator_toggle), + ] + ) self._runner = web.AppRunner(app) await self._runner.setup() From 3525259e4115058db99eb2193012f7b73692d23a Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 16 Jun 2026 18:22:21 +0200 Subject: [PATCH 3/4] fix(node): start the API server once, and fix the conformance test path Lifecycle fix: - The node both awaited api_server.start() directly and scheduled api_server.run(), and run() calls start() again. That bound the port twice and leaked the first runner. Start the API server only through its run() task, consistent with the chain and network services. Conformance CLI fix: - The apitest entrypoint targeted tests/api, which does not exist, so the external-client suite collected nothing. Point it at the conformance tests under tests/node/api/endpoints. Test tidy-ups: - Delete the empty placeholder conftest under tests/node/api. - Trim stale and code-restating docstrings/comments in the API tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/consensus_testing/cli/apitest.py | 4 ++-- src/lean_spec/node/node.py | 4 ---- tests/node/api/conftest.py | 9 --------- tests/node/api/endpoints/conftest.py | 4 +--- tests/node/api/endpoints/test_checkpoints.py | 1 - tests/node/api/test_server.py | 20 +++++-------------- tests/node/test_node.py | 11 ++-------- 7 files changed, 10 insertions(+), 43 deletions(-) delete mode 100644 tests/node/api/conftest.py diff --git a/packages/testing/src/consensus_testing/cli/apitest.py b/packages/testing/src/consensus_testing/cli/apitest.py index 77974be9f..ef49089a7 100644 --- a/packages/testing/src/consensus_testing/cli/apitest.py +++ b/packages/testing/src/consensus_testing/cli/apitest.py @@ -40,7 +40,7 @@ def apitest( SERVER_URL is the base URL of the API server (e.g., http://localhost:5052). For testing the local leanSpec implementation, use the local pytest suite under - tests/api, which automatically starts a local server. + tests/node/api/endpoints, which automatically starts a local server. """ config_path = Path(__file__).parent / "pytest_ini_files" / "pytest-apitest.ini" @@ -59,7 +59,7 @@ def apitest( str(config_path), f"--rootdir={project_root}", f"--server-url={server_url}", - "tests/api", + "tests/node/api/endpoints", ] args.extend(pytest_args) diff --git a/src/lean_spec/node/node.py b/src/lean_spec/node/node.py index 6b10db1b7..d54e95195 100644 --- a/src/lean_spec/node/node.py +++ b/src/lean_spec/node/node.py @@ -463,10 +463,6 @@ async def run(self, *, install_signal_handlers: bool = True) -> None: if install_signal_handlers: self._install_signal_handlers() - # Start API server if configured - if self.api_server is not None: - await self.api_server.start() - # Run services concurrently. # # A separate task monitors the shutdown signal. diff --git a/tests/node/api/conftest.py b/tests/node/api/conftest.py deleted file mode 100644 index d93f4db17..000000000 --- a/tests/node/api/conftest.py +++ /dev/null @@ -1,9 +0,0 @@ -""" -Shared pytest fixtures for API server tests. - -Core fixtures inherited from parent conftest files. -""" - -# All fixtures are inherited from tests/conftest.py via pytest discovery. -# This file exists to establish the conftest hierarchy and can be extended -# with API-specific fixtures as needed. diff --git a/tests/node/api/endpoints/conftest.py b/tests/node/api/endpoints/conftest.py index dcecaaaa9..1cf8ded72 100644 --- a/tests/node/api/endpoints/conftest.py +++ b/tests/node/api/endpoints/conftest.py @@ -12,8 +12,8 @@ from consensus_testing import make_genesis_store from lean_spec.node.api import ApiServer, ApiServerConfig -# Default port for auto-started local server DEFAULT_PORT = 15099 +"""Port for the auto-started local server.""" @dataclass(slots=True) @@ -117,10 +117,8 @@ def server_url(request: pytest.FixtureRequest) -> Generator[str, None, None]: external_url = request.config.getoption("--server-url") if external_url: - # Use external server yield external_url else: - # Start local server server_thread = _ServerThread(DEFAULT_PORT) server_thread.start() server_thread.ready.wait(timeout=10.0) diff --git a/tests/node/api/endpoints/test_checkpoints.py b/tests/node/api/endpoints/test_checkpoints.py index 338e7fe51..6c6128d23 100644 --- a/tests/node/api/endpoints/test_checkpoints.py +++ b/tests/node/api/endpoints/test_checkpoints.py @@ -47,5 +47,4 @@ def test_has_root(self, server_url: str) -> None: assert root.startswith("0x"), "Root must have 0x prefix" assert len(root) == 66 - # Should be valid hex int(root, 16) diff --git a/tests/node/api/test_server.py b/tests/node/api/test_server.py index 1b2ac002a..9c7ba3a5b 100644 --- a/tests/node/api/test_server.py +++ b/tests/node/api/test_server.py @@ -1,14 +1,4 @@ -""" -Tests for the API server implementation details. - -API contract tests (status codes, content types, response structure) are in -tests/api/ and also run automatically with `uv run pytest`. - -These tests cover leanSpec-specific implementation details: -- Configuration behavior -- Store integration -- Error handling when store not initialized -""" +"""Tests for the API server implementation: configuration, store wiring, and admin endpoints.""" from __future__ import annotations @@ -328,10 +318,10 @@ async def test_sequential_posts_converge(self) -> None: """ Multiple sequential POSTs converge to the last-writer value. - Posts must be issued one at a time (not via ``asyncio.gather``); - with concurrent requests the server-side arrival order is racy and - the last-writer-wins assertion becomes flaky on slower runners - (observed: Python 3.14 macOS). + Posts must be issued one at a time, never concurrently. + Concurrent requests arrive in a racy order. + The last-writer-wins assertion then goes flaky on slower runners. + Observed on Python 3.14 macOS. """ aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15067) diff --git a/tests/node/test_node.py b/tests/node/test_node.py index 4c4104ab4..d92b99e5f 100644 --- a/tests/node/test_node.py +++ b/tests/node/test_node.py @@ -694,26 +694,19 @@ class TestRunWithOptionalServices: """Tests for run() with optional API and validator services.""" async def test_run_with_api_server(self, node_config: NodeConfig) -> None: - """run() starts and runs the API server task when configured.""" + """run() runs the API server task when configured.""" config = dataclasses.replace( node_config, api_config=ApiServerConfig(host="127.0.0.1", port=0) ) node = Node.from_genesis(config) assert node.api_server is not None - mock_start = AsyncMock() mock_run = AsyncMock() asyncio.get_running_loop().call_later(0.05, node.stop) - with ( - patch.object(type(node.api_server), "start", mock_start), - patch.object(type(node.api_server), "run", mock_run), - ): + with patch.object(type(node.api_server), "run", mock_run): await node.run(install_signal_handlers=False) - # Both start() (called before TaskGroup) and run() (added to TaskGroup) - # must be awaited for the API server to function. - mock_start.assert_awaited_once() mock_run.assert_awaited_once() async def test_run_with_validator_service(self, node_with_validator: Node) -> None: From f626de64413456bbc0dc3b311f5eee250135014b Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 16 Jun 2026 18:33:21 +0200 Subject: [PATCH 4/4] test(node/api): strengthen assertions, cover edge paths, share the stub - Assert full response bodies instead of piecemeal fields: health, the justified checkpoint (equals the genesis head), and the finalized state (exact genesis slot and validator count). - Assert the exact error reason on every internal negative test, so the three distinct 400 responses are no longer indistinguishable. - Add the two previously untested branches: fork-choice validator_count is null when the head post-state is absent, and the finalized-state endpoint returns 404 when that state is missing. - Move the aggregator role stub into the package conftest and share it, removing the duplicated definition. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/node/api/conftest.py | 10 +++ tests/node/api/endpoints/conftest.py | 11 +-- tests/node/api/endpoints/test_aggregator.py | 7 +- tests/node/api/endpoints/test_checkpoints.py | 27 +------ tests/node/api/endpoints/test_health.py | 10 +-- tests/node/api/endpoints/test_states.py | 22 ++---- tests/node/api/test_server.py | 80 ++++++++++++++------ 7 files changed, 81 insertions(+), 86 deletions(-) create mode 100644 tests/node/api/conftest.py diff --git a/tests/node/api/conftest.py b/tests/node/api/conftest.py new file mode 100644 index 000000000..73d80e3ac --- /dev/null +++ b/tests/node/api/conftest.py @@ -0,0 +1,10 @@ +"""Shared helpers for the API server tests.""" + +from dataclasses import dataclass + + +@dataclass(slots=True) +class AggregatorRoleStub: + """Minimal stand-in exposing only the aggregator flag the endpoints touch.""" + + is_aggregator: bool = False diff --git a/tests/node/api/endpoints/conftest.py b/tests/node/api/endpoints/conftest.py index 1cf8ded72..33caf08ce 100644 --- a/tests/node/api/endpoints/conftest.py +++ b/tests/node/api/endpoints/conftest.py @@ -3,7 +3,6 @@ import asyncio import threading import time -from dataclasses import dataclass, field from typing import Generator import httpx @@ -11,18 +10,12 @@ from consensus_testing import make_genesis_store from lean_spec.node.api import ApiServer, ApiServerConfig +from tests.node.api.conftest import AggregatorRoleStub DEFAULT_PORT = 15099 """Port for the auto-started local server.""" -@dataclass(slots=True) -class _AggregatorRoleStub: - """Minimal stand-in exposing only the aggregator flag the endpoints touch.""" - - is_aggregator: bool = field(default=False) - - class _ServerThread(threading.Thread): """Thread that runs the API server in its own event loop.""" @@ -61,7 +54,7 @@ def _create_server(self) -> ApiServer: return ApiServer( config=config, store_getter=lambda: store, - aggregator_role_control=_AggregatorRoleStub(), + aggregator_role_control=AggregatorRoleStub(), ) def stop(self) -> None: diff --git a/tests/node/api/endpoints/test_aggregator.py b/tests/node/api/endpoints/test_aggregator.py index 4cb25808d..e6b52bbab 100644 --- a/tests/node/api/endpoints/test_aggregator.py +++ b/tests/node/api/endpoints/test_aggregator.py @@ -1,9 +1,4 @@ -""" -Tests for the admin aggregator endpoint. - -The conformance server is started with the aggregator role seeded to disabled, -so tests exercise both the happy path and error cases. -""" +"""Tests for the admin aggregator endpoint.""" from __future__ import annotations diff --git a/tests/node/api/endpoints/test_checkpoints.py b/tests/node/api/endpoints/test_checkpoints.py index 6c6128d23..a3e626702 100644 --- a/tests/node/api/endpoints/test_checkpoints.py +++ b/tests/node/api/endpoints/test_checkpoints.py @@ -25,26 +25,7 @@ def test_content_type_is_json(self, server_url: str) -> None: content_type = response.headers.get("content-type", "") assert "application/json" in content_type - def test_has_slot(self, server_url: str) -> None: - """Justified checkpoint response has a slot field.""" - response = get_justified_checkpoint(server_url) - response_body = response.json() - - assert "slot" in response_body - assert isinstance(response_body["slot"], int) - assert response_body["slot"] >= 0 - - def test_has_root(self, server_url: str) -> None: - """Justified checkpoint response has a valid root field.""" - response = get_justified_checkpoint(server_url) - response_body = response.json() - - assert "root" in response_body - root = response_body["root"] - - # Root should be a 0x-prefixed hex string (32 bytes = 66 characters with prefix) - assert isinstance(root, str) - assert root.startswith("0x"), "Root must have 0x prefix" - assert len(root) == 66 - - int(root, 16) + def test_justified_checkpoint_is_genesis(self, server_url: str) -> None: + """At genesis the justified checkpoint is the genesis block at slot 0.""" + head = httpx.get(f"{server_url}/lean/v0/fork_choice").json()["head"] + assert get_justified_checkpoint(server_url).json() == {"slot": 0, "root": head} diff --git a/tests/node/api/endpoints/test_health.py b/tests/node/api/endpoints/test_health.py index 67416e64f..5ec9e602c 100644 --- a/tests/node/api/endpoints/test_health.py +++ b/tests/node/api/endpoints/test_health.py @@ -26,12 +26,6 @@ def test_health_content_type_is_json(server_url: str) -> None: def test_health_response_structure(server_url: str) -> None: - """Health endpoint returns expected JSON structure.""" + """Health response is exactly the healthy status and service identifier.""" response = get_health(server_url) - response_body = response.json() - - assert "status" in response_body - assert response_body["status"] == STATUS_HEALTHY - - assert "service" in response_body - assert response_body["service"] == SERVICE_NAME + assert response.json() == {"status": STATUS_HEALTHY, "service": SERVICE_NAME} diff --git a/tests/node/api/endpoints/test_states.py b/tests/node/api/endpoints/test_states.py index 2271e3219..631c059d4 100644 --- a/tests/node/api/endpoints/test_states.py +++ b/tests/node/api/endpoints/test_states.py @@ -27,20 +27,8 @@ def test_content_type_is_octet_stream(self, server_url: str) -> None: content_type = response.headers.get("content-type", "") assert "application/octet-stream" in content_type - def test_ssz_deserializes(self, server_url: str) -> None: - """Finalized state SSZ bytes deserialize to a valid State object.""" - response = get_finalized_state(server_url) - state = State.decode_bytes(response.content) - assert state is not None - - def test_has_valid_slot(self, server_url: str) -> None: - """Finalized state has a non-negative slot.""" - response = get_finalized_state(server_url) - state = State.decode_bytes(response.content) - assert int(state.slot) >= 0 - - def test_has_validators(self, server_url: str) -> None: - """Finalized state has at least one validator.""" - response = get_finalized_state(server_url) - state = State.decode_bytes(response.content) - assert len(state.validators) > 0 + def test_finalized_state_is_genesis(self, server_url: str) -> None: + """Finalized state decodes to the genesis state: slot 0 with three validators.""" + state = State.decode_bytes(get_finalized_state(server_url).content) + assert int(state.slot) == 0 + assert len(state.validators) == 3 diff --git a/tests/node/api/test_server.py b/tests/node/api/test_server.py index 9c7ba3a5b..340abe85a 100644 --- a/tests/node/api/test_server.py +++ b/tests/node/api/test_server.py @@ -2,23 +2,11 @@ from __future__ import annotations -from dataclasses import dataclass, field - import httpx from lean_spec.node.api import ApiServer, ApiServerConfig from lean_spec.spec.forks.lstar import Store - - -@dataclass(slots=True) -class _AggregatorRoleStub: - """Minimal stand-in exposing only the aggregator flag the endpoints touch. - - Avoids pulling in the full sync service dependency graph for server-level - tests that only exercise the flag-toggle contract. - """ - - is_aggregator: bool = field(default=False) +from tests.node.api.conftest import AggregatorRoleStub class TestApiServerConfiguration: @@ -76,6 +64,25 @@ async def test_returns_503_when_store_not_initialized(self) -> None: response = await client.get("http://127.0.0.1:15054/lean/v0/states/finalized") assert response.status_code == 503 + assert response.reason_phrase == "Store not initialized" + + finally: + await server.aclose() + + async def test_returns_404_when_finalized_state_missing(self, base_store: Store) -> None: + """Endpoint returns 404 when the finalized state is absent from the store.""" + store_without_states = base_store.model_copy(update={"states": {}}) + config = ApiServerConfig(port=15072) + server = ApiServer(config=config, store_getter=lambda: store_without_states) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15072/lean/v0/states/finalized") + + assert response.status_code == 404 + assert response.reason_phrase == "Finalized state not available" finally: await server.aclose() @@ -96,6 +103,7 @@ async def test_returns_503_when_store_not_initialized(self) -> None: response = await client.get("http://127.0.0.1:15057/lean/v0/checkpoints/justified") assert response.status_code == 503 + assert response.reason_phrase == "Store not initialized" finally: await server.aclose() @@ -116,6 +124,7 @@ async def test_returns_503_when_store_not_initialized(self) -> None: response = await client.get("http://127.0.0.1:15058/lean/v0/fork_choice") assert response.status_code == 503 + assert response.reason_phrase == "Store not initialized" finally: await server.aclose() @@ -167,6 +176,24 @@ async def test_returns_200_with_initialized_store(self, base_store: Store) -> No finally: await server.aclose() + async def test_validator_count_is_null_when_head_state_missing(self, base_store: Store) -> None: + """Fork choice reports a null validator count when the head post-state is absent.""" + store_without_states = base_store.model_copy(update={"states": {}}) + config = ApiServerConfig(port=15071) + server = ApiServer(config=config, store_getter=lambda: store_without_states) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15071/lean/v0/fork_choice") + + assert response.status_code == 200 + assert response.json()["validator_count"] is None + + finally: + await server.aclose() + class TestAggregatorAdminEndpoint: """Tests for the /lean/v0/admin/aggregator endpoint.""" @@ -183,13 +210,14 @@ async def test_status_returns_503_without_aggregator_control(self) -> None: response = await client.get("http://127.0.0.1:15060/lean/v0/admin/aggregator") assert response.status_code == 503 + assert response.reason_phrase == "Aggregator role control not available" finally: await server.aclose() async def test_status_returns_current_role(self) -> None: """GET returns the current aggregator role from the sync service flag.""" - aggregator_role_stub = _AggregatorRoleStub(is_aggregator=True) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=True) config = ApiServerConfig(port=15061) server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) @@ -208,7 +236,7 @@ async def test_status_returns_current_role(self) -> None: async def test_toggle_activates_role(self) -> None: """POST with enabled=true activates the aggregator role.""" - aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15062) server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) @@ -234,7 +262,7 @@ async def test_toggle_activates_role(self) -> None: async def test_toggle_deactivates_role(self) -> None: """POST with enabled=false deactivates the aggregator role.""" - aggregator_role_stub = _AggregatorRoleStub(is_aggregator=True) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=True) config = ApiServerConfig(port=15063) server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) @@ -257,7 +285,7 @@ async def test_toggle_deactivates_role(self) -> None: async def test_toggle_rejects_missing_body(self) -> None: """POST with no body returns 400.""" config = ApiServerConfig(port=15064) - server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -270,6 +298,7 @@ async def test_toggle_rejects_missing_body(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Invalid JSON body" finally: await server.aclose() @@ -277,7 +306,7 @@ async def test_toggle_rejects_missing_body(self) -> None: async def test_toggle_rejects_missing_field(self) -> None: """POST without 'enabled' field returns 400.""" config = ApiServerConfig(port=15065) - server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -289,13 +318,14 @@ async def test_toggle_rejects_missing_field(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Missing 'enabled' field in body" finally: await server.aclose() async def test_toggle_rejects_non_boolean(self) -> None: """POST with non-boolean 'enabled' returns 400 and does not change state.""" - aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15066) server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) @@ -309,6 +339,7 @@ async def test_toggle_rejects_non_boolean(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "'enabled' must be a boolean" assert aggregator_role_stub.is_aggregator is False finally: @@ -323,7 +354,7 @@ async def test_sequential_posts_converge(self) -> None: The last-writer-wins assertion then goes flaky on slower runners. Observed on Python 3.14 macOS. """ - aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15067) server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) @@ -347,7 +378,7 @@ async def test_sequential_posts_converge(self) -> None: async def test_toggle_rejects_null_body(self) -> None: """POST with JSON null body returns 400.""" config = ApiServerConfig(port=15068) - server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -360,6 +391,7 @@ async def test_toggle_rejects_null_body(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Missing 'enabled' field in body" finally: await server.aclose() @@ -367,7 +399,7 @@ async def test_toggle_rejects_null_body(self) -> None: async def test_toggle_rejects_array_body(self) -> None: """POST with JSON array body returns 400.""" config = ApiServerConfig(port=15069) - server = ApiServer(config=config, aggregator_role_control=_AggregatorRoleStub()) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -379,13 +411,14 @@ async def test_toggle_rejects_array_body(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Missing 'enabled' field in body" finally: await server.aclose() async def test_toggle_rejects_integer_enabled(self) -> None: """POST with integer 1 as enabled returns 400 (must be boolean).""" - aggregator_role_stub = _AggregatorRoleStub(is_aggregator=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15070) server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) @@ -399,6 +432,7 @@ async def test_toggle_rejects_integer_enabled(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "'enabled' must be a boolean" assert aggregator_role_stub.is_aggregator is False finally: