diff --git a/packages/testing/src/consensus_testing/cli/apitest.py b/packages/testing/src/consensus_testing/cli/apitest.py index 77974be9f..ef49089a7 100644 --- a/packages/testing/src/consensus_testing/cli/apitest.py +++ b/packages/testing/src/consensus_testing/cli/apitest.py @@ -40,7 +40,7 @@ def apitest( SERVER_URL is the base URL of the API server (e.g., http://localhost:5052). For testing the local leanSpec implementation, use the local pytest suite under - tests/api, which automatically starts a local server. + tests/node/api/endpoints, which automatically starts a local server. """ config_path = Path(__file__).parent / "pytest_ini_files" / "pytest-apitest.ini" @@ -59,7 +59,7 @@ def apitest( str(config_path), f"--rootdir={project_root}", f"--server-url={server_url}", - "tests/api", + "tests/node/api/endpoints", ] args.extend(pytest_args) diff --git a/src/lean_spec/node/api/__init__.py b/src/lean_spec/node/api/__init__.py index 26db171d6..883800152 100644 --- a/src/lean_spec/node/api/__init__.py +++ b/src/lean_spec/node/api/__init__.py @@ -1,10 +1,8 @@ """API server module for various API endpoints.""" -from lean_spec.node.api.aggregator_controller import AggregatorController from lean_spec.node.api.server import ApiServer, ApiServerConfig __all__ = [ - "AggregatorController", "ApiServer", "ApiServerConfig", ] diff --git a/src/lean_spec/node/api/aggregator_controller.py b/src/lean_spec/node/api/aggregator_controller.py deleted file mode 100644 index dbba621af..000000000 --- a/src/lean_spec/node/api/aggregator_controller.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -Runtime controller for the node's aggregator role. - -Exposes get/set operations over the shared is_aggregator flag so the admin -API can rotate aggregator duties across nodes without restarting. - -Toggles are serialized under an asyncio lock so concurrent admin requests -cannot leave the sync and network services disagreeing on the current role. -""" - -from __future__ import annotations - -import asyncio -import logging -from dataclasses import dataclass, field - -from lean_spec.node.networking import NetworkService -from lean_spec.node.sync import SyncService - -logger = logging.getLogger(__name__) - - -@dataclass(slots=True) -class AggregatorController: - """ - Runtime control over the node's aggregator role. - - Operators toggle the flag to rotate aggregation duties across nodes when - an active aggregator becomes unhealthy, without restarting the node. - - The spec-level semantics are unchanged: the sync service reads - is_aggregator on each gossip event and each tick, so flipping the flag - takes effect from the next event or tick onward. - """ - - sync_service: SyncService - """Sync service whose flag drives gossip-side aggregator behavior.""" - - network_service: NetworkService - """Network service whose flag mirrors the sync service for consistency.""" - - _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False, repr=False) - """Serializes concurrent toggles from API handlers.""" - - def is_enabled(self) -> bool: - """Return whether the node is currently acting as aggregator.""" - return self.sync_service.is_aggregator - - async def set_enabled(self, enabled: bool) -> bool: - """ - Update the aggregator role and return the previous value. - - The sync and network services are updated together under a lock so - both views remain consistent from any observer's perspective. - - Args: - enabled: Desired aggregator state. - - Returns: - Aggregator state prior to the update. - """ - async with self._lock: - previous = self.sync_service.is_aggregator - self.sync_service.is_aggregator = enabled - self.network_service.is_aggregator = enabled - if previous != enabled: - logger.info( - "Aggregator role %s via admin API", - "activated" if enabled else "deactivated", - ) - return previous diff --git a/src/lean_spec/node/api/context.py b/src/lean_spec/node/api/context.py new file mode 100644 index 000000000..c341a620b --- /dev/null +++ b/src/lean_spec/node/api/context.py @@ -0,0 +1,48 @@ +"""Typed dependencies the HTTP API handlers receive.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import Protocol + +from aiohttp import web + +from lean_spec.spec.forks import LstarSpec, Store + + +class AggregatorRoleControl(Protocol): + """The slice of the sync service the admin endpoints read and write.""" + + is_aggregator: bool + + +@dataclass(frozen=True, slots=True) +class ApiContext: + """Dependencies shared across handlers, resolved once at startup.""" + + spec: LstarSpec + """Fork spec driving consensus computations such as fork-choice weights.""" + + store_getter: Callable[[], Store | None] | None + """Callable returning the live store, or None before the store exists.""" + + aggregator_role_control: AggregatorRoleControl | None + """Holder of the aggregator flag, or None when aggregator control is unwired.""" + + def require_store(self) -> Store: + """ + Return the live store, or raise 503 when the node has no store yet. + + The store is a frozen snapshot, so all reads in one handler stay consistent. + """ + store = self.store_getter() if self.store_getter else None + if store is None: + raise web.HTTPServiceUnavailable(reason="Store not initialized") + return store + + def require_aggregator_role_control(self) -> AggregatorRoleControl: + """Return the aggregator role control, or raise 503 when it is unwired.""" + if self.aggregator_role_control is None: + raise web.HTTPServiceUnavailable(reason="Aggregator role control not available") + return self.aggregator_role_control diff --git a/src/lean_spec/node/api/endpoints/__init__.py b/src/lean_spec/node/api/endpoints/__init__.py deleted file mode 100644 index 78ef5b6ae..000000000 --- a/src/lean_spec/node/api/endpoints/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""API endpoint specifications.""" diff --git a/src/lean_spec/node/api/endpoints/aggregator.py b/src/lean_spec/node/api/endpoints/aggregator.py deleted file mode 100644 index 69f2a9580..000000000 --- a/src/lean_spec/node/api/endpoints/aggregator.py +++ /dev/null @@ -1,78 +0,0 @@ -"""Admin endpoints for toggling the aggregator role at runtime.""" - -from __future__ import annotations - -import json -import logging - -from aiohttp import web - -logger = logging.getLogger(__name__) - - -async def handle_status(request: web.Request) -> web.Response: - """ - Handle aggregator status request. - - Returns whether the node is currently acting as an aggregator. - - Response: JSON object with fields: - - is_aggregator (bool): Whether the node is currently acting as aggregator. - - Status Codes: - 200 OK: Status returned successfully. - 503 Service Unavailable: Aggregator controller not wired. - """ - controller = request.app.get("aggregator_controller") - if controller is None: - raise web.HTTPServiceUnavailable(reason="Aggregator controller not available") - - return web.Response( - body=json.dumps({"is_aggregator": controller.is_enabled()}), - content_type="application/json", - ) - - -async def handle_toggle(request: web.Request) -> web.Response: - """ - Handle aggregator toggle request. - - Activates or deactivates the aggregator role at runtime so operators can - rotate aggregator duties across nodes without restarting. - - Request body: JSON object with fields: - - enabled (bool): Desired aggregator state. - - Response: JSON object with fields: - - is_aggregator (bool): Aggregator state after the update. - - previous (bool): Aggregator state before the update. - - Status Codes: - 200 OK: Role updated successfully. - 400 Bad Request: Body missing, malformed, or with wrong field types. - 503 Service Unavailable: Aggregator controller not wired. - """ - controller = request.app.get("aggregator_controller") - if controller is None: - raise web.HTTPServiceUnavailable(reason="Aggregator controller not available") - - try: - request_body = await request.json() - except json.JSONDecodeError as exception: - raise web.HTTPBadRequest(reason="Invalid JSON body") from exception - - if not isinstance(request_body, dict) or "enabled" not in request_body: - raise web.HTTPBadRequest(reason="Missing 'enabled' field in body") - - enabled = request_body["enabled"] - # Explicit bool check rejects ints like 0/1, which JSON does not distinguish - # from booleans in loose parsers but Python does. - if not isinstance(enabled, bool): - raise web.HTTPBadRequest(reason="'enabled' must be a boolean") - - previous_aggregator_state = await controller.set_enabled(enabled) - - return web.Response( - body=json.dumps({"is_aggregator": enabled, "previous": previous_aggregator_state}), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/checkpoints.py b/src/lean_spec/node/api/endpoints/checkpoints.py deleted file mode 100644 index 371c86c8a..000000000 --- a/src/lean_spec/node/api/endpoints/checkpoints.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Checkpoints endpoint handlers.""" - -from __future__ import annotations - -import json - -from aiohttp import web - - -async def handle_justified(request: web.Request) -> web.Response: - """ - Handle justified checkpoint request. - - Returns the latest justified checkpoint for monitoring consensus progress. - - Response: JSON object with fields: - - slot (integer): The slot number of the justified checkpoint. - - root (string): The block root as 0x-prefixed hex string (66 characters total). - - Status Codes: - 200 OK: Checkpoint returned successfully. - 503 Service Unavailable: Store not initialized. - """ - store_getter = request.app.get("store_getter") - store = store_getter() if store_getter else None - - if store is None: - raise web.HTTPServiceUnavailable(reason="Store not initialized") - - justified = store.latest_justified - - return web.Response( - body=json.dumps( - { - "slot": justified.slot, - "root": "0x" + justified.root.hex(), - } - ), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/fork_choice.py b/src/lean_spec/node/api/endpoints/fork_choice.py deleted file mode 100644 index 1867c2d18..000000000 --- a/src/lean_spec/node/api/endpoints/fork_choice.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Fork choice endpoint handler.""" - -from __future__ import annotations - -import json - -from aiohttp import web - - -async def handle(request: web.Request) -> web.Response: - """ - Handle fork choice tree request. - - Returns the fork choice tree snapshot: blocks with weights, head, - checkpoints, safe target, and validator count. - - Response: JSON object with fields: - - nodes (array): Blocks in the tree, each with root, slot, parent_root, - proposer_index, and weight. - - head (string): Current head block root as 0x-prefixed hex. - - justified (object): Latest justified checkpoint (slot, root). - - finalized (object): Latest finalized checkpoint (slot, root). - - safe_target (string): Safe target block root as 0x-prefixed hex. - - validator_count (integer): Number of validators in head state. - - Status Codes: - 200 OK: Fork choice tree returned successfully. - 503 Service Unavailable: Store not initialized. - """ - store_getter = request.app.get("store_getter") - store = store_getter() if store_getter else None - - if store is None: - raise web.HTTPServiceUnavailable(reason="Store not initialized") - - finalized_slot = store.latest_finalized.slot - weights = request.app["spec"].compute_block_weights(store) - - nodes = [] - for root, block in store.blocks.items(): - if block.slot < finalized_slot: - continue - nodes.append( - { - "root": "0x" + root.hex(), - "slot": int(block.slot), - "parent_root": "0x" + block.parent_root.hex(), - "proposer_index": int(block.proposer_index), - "weight": weights.get(root, 0), - } - ) - - head_state = store.states.get(store.head) - validator_count = len(head_state.validators) if head_state is not None else 0 - - response = { - "nodes": nodes, - "head": "0x" + store.head.hex(), - "justified": { - "slot": int(store.latest_justified.slot), - "root": "0x" + store.latest_justified.root.hex(), - }, - "finalized": { - "slot": int(store.latest_finalized.slot), - "root": "0x" + store.latest_finalized.root.hex(), - }, - "safe_target": "0x" + store.safe_target.hex(), - "validator_count": validator_count, - } - - return web.Response( - body=json.dumps(response), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/health.py b/src/lean_spec/node/api/endpoints/health.py deleted file mode 100644 index cd6206fb1..000000000 --- a/src/lean_spec/node/api/endpoints/health.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Health endpoint specification and handler.""" - -from __future__ import annotations - -import json -from typing import Final - -from aiohttp import web - -STATUS_HEALTHY: Final = "healthy" -"""Fixed healthy status returned by the health endpoint.""" - -SERVICE_NAME: Final = "lean-rpc-api" -"""Fixed service identifier returned by the health endpoint.""" - - -async def handle(_request: web.Request) -> web.Response: - """ - Handle health check request. - - Returns server health status to indicate the service is operational. - - Response: JSON object with fields: - - status (string): Always healthy when the endpoint is reachable. - - service (string): Fixed identifier "lean-rpc-api". - - Status Codes: - 200 OK: Server is running. - """ - return web.Response( - body=json.dumps({"status": STATUS_HEALTHY, "service": SERVICE_NAME}), - content_type="application/json", - ) diff --git a/src/lean_spec/node/api/endpoints/metrics.py b/src/lean_spec/node/api/endpoints/metrics.py deleted file mode 100644 index bc1dee81e..000000000 --- a/src/lean_spec/node/api/endpoints/metrics.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Metrics endpoint (Prometheus exposition).""" - -from __future__ import annotations - -from aiohttp import web - -from lean_spec.node.metrics.registry import get_metrics_output - -CONTENT_TYPE = "text/plain; version=0.0.4" -"""Prometheus text exposition media type, without the charset parameter.""" - -CHARSET = "utf-8" -"""Body encoding, passed separately from the media type. - -aiohttp rejects a charset inside the content_type argument. -It must travel through the dedicated charset parameter instead. -""" - - -async def handle(_request: web.Request) -> web.Response: - """ - Handle Prometheus metrics scrape request. - - Returns metrics in Prometheus text exposition format. - """ - body = get_metrics_output() - return web.Response(body=body, content_type=CONTENT_TYPE, charset=CHARSET) diff --git a/src/lean_spec/node/api/endpoints/states.py b/src/lean_spec/node/api/endpoints/states.py deleted file mode 100644 index 555244685..000000000 --- a/src/lean_spec/node/api/endpoints/states.py +++ /dev/null @@ -1,46 +0,0 @@ -"""States endpoint handlers.""" - -from __future__ import annotations - -import asyncio -import logging - -from aiohttp import web - -logger = logging.getLogger(__name__) - - -async def handle_finalized(request: web.Request) -> web.Response: - """ - Handle finalized state request. - - Returns the finalized beacon state as raw SSZ bytes (not snappy compressed). - - Response: SSZ-encoded State (binary, application/octet-stream) - - Status Codes: - 200 OK: State returned successfully. - 404 Not Found: Finalized state not available in store. - 503 Service Unavailable: Store not initialized. - """ - store_getter = request.app.get("store_getter") - store = store_getter() if store_getter else None - - if store is None: - raise web.HTTPServiceUnavailable(reason="Store not initialized") - - finalized = store.latest_finalized - - if finalized.root not in store.states: - raise web.HTTPNotFound(reason="Finalized state not available") - - state = store.states[finalized.root] - - # Implementation detail: offload CPU-intensive encoding to thread pool - try: - ssz_bytes = await asyncio.to_thread(state.encode_bytes) - except Exception as exception: - logger.error("Failed to encode state: %s", exception) - raise web.HTTPInternalServerError(reason="Encoding failed") from exception - - return web.Response(body=ssz_bytes, content_type="application/octet-stream") diff --git a/src/lean_spec/node/api/handlers.py b/src/lean_spec/node/api/handlers.py new file mode 100644 index 000000000..f3c709a5c --- /dev/null +++ b/src/lean_spec/node/api/handlers.py @@ -0,0 +1,171 @@ +"""HTTP handlers exposing the node's API over aiohttp.""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass +from typing import Final + +from aiohttp import web + +from lean_spec.node.api.context import ApiContext +from lean_spec.node.api.responses import ( + AggregatorStatusBody, + AggregatorToggleBody, + CheckpointBody, + ForkChoiceBody, + ForkChoiceNode, + HealthBody, + json_response, +) +from lean_spec.node.metrics.registry import get_metrics_output + +logger = logging.getLogger(__name__) + +STATUS_HEALTHY: Final = "healthy" +"""Fixed healthy status returned by the health endpoint.""" + +SERVICE_NAME: Final = "lean-rpc-api" +"""Fixed service identifier returned by the health endpoint.""" + +METRICS_CONTENT_TYPE: Final = "text/plain; version=0.0.4" +"""Prometheus text exposition media type, without the charset parameter.""" + +METRICS_CHARSET: Final = "utf-8" +""" +Body encoding, passed separately from the media type. + +aiohttp rejects a charset inside the content_type argument. + +It must travel through the dedicated charset parameter instead. +""" + + +@dataclass(frozen=True, slots=True) +class ApiHandlers: + """The aiohttp request handlers, bound to the dependencies they serve.""" + + context: ApiContext + """Dependency bundle resolved once at server startup.""" + + async def health(self, request: web.Request) -> web.Response: + """Report that the service is reachable.""" + return json_response(HealthBody(status=STATUS_HEALTHY, service=SERVICE_NAME)) + + async def metrics(self, request: web.Request) -> web.Response: + """Return node metrics in Prometheus text exposition format.""" + return web.Response( + body=get_metrics_output(), + content_type=METRICS_CONTENT_TYPE, + charset=METRICS_CHARSET, + ) + + async def justified_checkpoint(self, request: web.Request) -> web.Response: + """Return the latest justified checkpoint.""" + store = self.context.require_store() + return json_response( + CheckpointBody(slot=store.latest_justified.slot, root=store.latest_justified.root) + ) + + async def fork_choice(self, request: web.Request) -> web.Response: + """ + Return a snapshot of the fork-choice tree. + + Weights count only the votes currently driving the head. + They exclude attestations seen but not yet counted. + """ + store = self.context.require_store() + weights = self.context.spec.compute_block_weights(store) + + nodes = [ + ForkChoiceNode( + root=root, + slot=block.slot, + parent_root=block.parent_root, + proposer_index=block.proposer_index, + weight=weights.get(root, 0), + ) + for root, block in store.blocks.items() + if block.slot >= store.latest_finalized.slot + ] + + # Report a missing head state as null, not zero. + # Zero would read as "no validators" rather than "state unavailable". + head_state = store.states.get(store.head) + validator_count = len(head_state.validators) if head_state is not None else None + + return json_response( + ForkChoiceBody( + nodes=nodes, + head=store.head, + justified=CheckpointBody( + slot=store.latest_justified.slot, root=store.latest_justified.root + ), + finalized=CheckpointBody( + slot=store.latest_finalized.slot, root=store.latest_finalized.root + ), + safe_target=store.safe_target, + validator_count=validator_count, + ) + ) + + async def finalized_state(self, request: web.Request) -> web.Response: + """Return the finalized beacon state as SSZ bytes.""" + store = self.context.require_store() + + if store.latest_finalized.root not in store.states: + raise web.HTTPNotFound(reason="Finalized state not available") + + state = store.states[store.latest_finalized.root] + + # Encoding a full state is CPU-heavy, so run it off the event loop. + try: + ssz_bytes = await asyncio.to_thread(state.encode_bytes) + except Exception as exception: + logger.error("Failed to encode state: %s", exception) + raise web.HTTPInternalServerError(reason="Encoding failed") from exception + + return web.Response(body=ssz_bytes, content_type="application/octet-stream") + + async def aggregator_status(self, request: web.Request) -> web.Response: + """Report whether the node is acting as an aggregator.""" + aggregator_role_control = self.context.require_aggregator_role_control() + return json_response( + AggregatorStatusBody(is_aggregator=aggregator_role_control.is_aggregator) + ) + + async def aggregator_toggle(self, request: web.Request) -> web.Response: + """ + Set the aggregator role at runtime and report the previous value. + + Raises: + HTTPBadRequest: Body missing, malformed, or 'enabled' not a boolean. + """ + aggregator_role_control = self.context.require_aggregator_role_control() + + try: + request_body = await request.json() + except json.JSONDecodeError as exception: + raise web.HTTPBadRequest(reason="Invalid JSON body") from exception + + if not isinstance(request_body, dict) or "enabled" not in request_body: + raise web.HTTPBadRequest(reason="Missing 'enabled' field in body") + + enabled = request_body["enabled"] + # Reject ints like 0 and 1, which loose JSON parsers blur with booleans but Python does not. + if not isinstance(enabled, bool): + raise web.HTTPBadRequest(reason="'enabled' must be a boolean") + + previous_aggregator_state = aggregator_role_control.is_aggregator + aggregator_role_control.is_aggregator = enabled + if previous_aggregator_state != enabled: + logger.info( + "Aggregator role %s via admin API", + "activated" if enabled else "deactivated", + ) + + return json_response( + AggregatorToggleBody(is_aggregator=enabled, previous=previous_aggregator_state) + ) diff --git a/src/lean_spec/node/api/responses.py b/src/lean_spec/node/api/responses.py new file mode 100644 index 000000000..cf3477e03 --- /dev/null +++ b/src/lean_spec/node/api/responses.py @@ -0,0 +1,68 @@ +"""Typed response bodies for the HTTP API wire contract.""" + +from aiohttp import web +from pydantic import BaseModel, ConfigDict + +from lean_spec.spec.forks.lstar.containers import Slot +from lean_spec.spec.ssz import Bytes32 + + +class ApiResponseBody(BaseModel): + """Base for every JSON response body, keyed snake_case, never camelCase-aliased.""" + + model_config = ConfigDict(frozen=True) + + +class HealthBody(ApiResponseBody): + """Liveness probe response.""" + + status: str + service: str + + +class CheckpointBody(ApiResponseBody): + """A checkpoint on the wire: its slot and block root.""" + + slot: Slot + root: Bytes32 + + +class ForkChoiceNode(ApiResponseBody): + """One block in the fork-choice tree view.""" + + root: Bytes32 + slot: Slot + parent_root: Bytes32 + proposer_index: int + weight: int + + +class ForkChoiceBody(ApiResponseBody): + """A snapshot of the fork-choice tree.""" + + nodes: list[ForkChoiceNode] + head: Bytes32 + justified: CheckpointBody + finalized: CheckpointBody + safe_target: Bytes32 + validator_count: int | None + + +class AggregatorStatusBody(ApiResponseBody): + """The node's current aggregator role.""" + + is_aggregator: bool + + +class AggregatorToggleBody(ApiResponseBody): + """The result of a runtime aggregator-role change.""" + + is_aggregator: bool + previous: bool + + +def json_response(body: ApiResponseBody) -> web.Response: + """Serialize a response model to a JSON HTTP response.""" + # Build the response by hand rather than via web.json_response. + # That helper appends "; charset=utf-8", but the wire contract is the bare media type. + return web.Response(body=body.model_dump_json(), content_type="application/json") diff --git a/src/lean_spec/node/api/routes.py b/src/lean_spec/node/api/routes.py deleted file mode 100644 index b7199cb39..000000000 --- a/src/lean_spec/node/api/routes.py +++ /dev/null @@ -1,48 +0,0 @@ -"""API route definitions.""" - -from __future__ import annotations - -from collections.abc import Awaitable, Callable -from typing import NamedTuple - -from aiohttp import web - -from lean_spec.node.api.endpoints import ( - aggregator, - checkpoints, - fork_choice, - health, - metrics, - states, -) - -Handler = Callable[[web.Request], Awaitable[web.Response]] -"""Type alias for aiohttp request handlers.""" - - -class Route(NamedTuple): - """One API route: its verb, path, handler, and access tier.""" - - method: str - """HTTP verb the route responds to.""" - - path: str - """URL path the route is registered under.""" - - handler: Handler - """Coroutine that serves requests to this route.""" - - is_admin: bool - """True for privileged admin routes, False for public read-only routes.""" - - -ROUTES: list[Route] = [ - Route("GET", "/lean/v0/health", health.handle, is_admin=False), - Route("GET", "/lean/v0/states/finalized", states.handle_finalized, is_admin=False), - Route("GET", "/lean/v0/checkpoints/justified", checkpoints.handle_justified, is_admin=False), - Route("GET", "/lean/v0/fork_choice", fork_choice.handle, is_admin=False), - Route("GET", "/metrics", metrics.handle, is_admin=False), - Route("GET", "/lean/v0/admin/aggregator", aggregator.handle_status, is_admin=True), - Route("POST", "/lean/v0/admin/aggregator", aggregator.handle_toggle, is_admin=True), -] -"""Every API route, public and admin alike, tagged by access tier.""" diff --git a/src/lean_spec/node/api/server.py b/src/lean_spec/node/api/server.py index 123fc435f..7726e16fe 100644 --- a/src/lean_spec/node/api/server.py +++ b/src/lean_spec/node/api/server.py @@ -1,9 +1,4 @@ -""" -API server implementation using aiohttp. - -Provides the HTTP server that serves routes defined in routes.py. -See endpoints/ for endpoint specifications and handlers. -""" +"""API server implementation using aiohttp.""" from __future__ import annotations @@ -14,25 +9,16 @@ from aiohttp import web -from lean_spec.node.api.aggregator_controller import AggregatorController -from lean_spec.node.api.routes import ROUTES +from lean_spec.node.api.context import AggregatorRoleControl, ApiContext +from lean_spec.node.api.handlers import ApiHandlers from lean_spec.spec.forks import LstarSpec, Store logger = logging.getLogger(__name__) -# The following classes are implementation details. -# Other implementations may structure their code differently. - - @dataclass(frozen=True, slots=True) class ApiServerConfig: - """ - Configuration for the API server. - - Implementation-specific. Other implementations may use different - configuration patterns (env vars, config files, CLI args, etc.). - """ + """Configuration for the API server.""" host: str = "0.0.0.0" """Host address to bind to.""" @@ -43,16 +29,7 @@ class ApiServerConfig: @dataclass(slots=True) class ApiServer: - """ - HTTP API server using aiohttp. - - Implementation-specific. This class handles: - - Server lifecycle (start, stop, run) - - Route registration - - Store access via callable getter - - Other implementations may use different frameworks or patterns. - """ + """HTTP API server using aiohttp.""" config: ApiServerConfig """Server configuration.""" @@ -63,13 +40,8 @@ class ApiServer: store_getter: Callable[[], Store | None] | None = None """Callable that returns the current Store instance.""" - aggregator_controller: AggregatorController | None = None - """ - Optional controller for toggling the aggregator role at runtime. - - When present, the admin aggregator endpoints can query and mutate the - node's aggregator flag. When absent, those endpoints return 503. - """ + aggregator_role_control: AggregatorRoleControl | None = None + """Optional runtime accessor for the node's aggregator role.""" _runner: web.AppRunner | None = field(default=None, init=False) """aiohttp application runner.""" @@ -89,19 +61,28 @@ async def start(self) -> None: """Start the API server in the background.""" app = web.Application() - # Store the store_getter in app for handlers that need store access - app["store_getter"] = self.store_getter - - # Expose the fork spec for handlers that drive consensus computations. - app["spec"] = self.spec - - # Expose the aggregator controller to admin endpoints. - # Absence is fine; endpoints return 503 when unset. - app["aggregator_controller"] = self.aggregator_controller - - # Register every route from the unified table, keyed by its verb. - routes = [web.route(route.method, route.path, route.handler) for route in ROUTES] - app.add_routes(routes) + # Resolve the shared dependencies once and bind them to the handlers. + # Every route then serves through a handler method that reads them. + context = ApiContext( + spec=self.spec, + store_getter=self.store_getter, + aggregator_role_control=self.aggregator_role_control, + ) + handlers = ApiHandlers(context) + + # The admin routes under /lean/v0/admin are unauthenticated. + # Deployments must restrict access to them at the network layer. + app.add_routes( + [ + web.get("/lean/v0/health", handlers.health), + web.get("/lean/v0/states/finalized", handlers.finalized_state), + web.get("/lean/v0/checkpoints/justified", handlers.justified_checkpoint), + web.get("/lean/v0/fork_choice", handlers.fork_choice), + web.get("/metrics", handlers.metrics), + web.get("/lean/v0/admin/aggregator", handlers.aggregator_status), + web.post("/lean/v0/admin/aggregator", handlers.aggregator_toggle), + ] + ) self._runner = web.AppRunner(app) await self._runner.setup() @@ -112,11 +93,7 @@ async def start(self) -> None: logger.info("API server listening on %s:%d", self.config.host, self.config.port) async def run(self) -> None: - """ - Run the API server until shutdown. - - Blocks until stop() is called. - """ + """Run the API server until it is asked to stop.""" await self.start() await self._stop_event.wait() diff --git a/src/lean_spec/node/networking/service/service.py b/src/lean_spec/node/networking/service/service.py index 63bb52990..178662a22 100644 --- a/src/lean_spec/node/networking/service/service.py +++ b/src/lean_spec/node/networking/service/service.py @@ -76,9 +76,6 @@ class NetworkService: network_name: str = field(default="0x00000000") """Network name for gossip topics (4-byte hex string).""" - is_aggregator: bool = field(default=False) - """Whether this node functions as an aggregator.""" - _running: bool = field(default=False, repr=False) """Whether the event loop is running.""" diff --git a/src/lean_spec/node/node.py b/src/lean_spec/node/node.py index 0d210a31a..d54e95195 100644 --- a/src/lean_spec/node/node.py +++ b/src/lean_spec/node/node.py @@ -19,7 +19,7 @@ from pathlib import Path from typing import Final -from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.node.api import ApiServer, ApiServerConfig from lean_spec.node.chain import SlotClock from lean_spec.node.chain.service import ChainService from lean_spec.node.metrics import registry as metrics @@ -128,10 +128,10 @@ class NodeConfig: When False (default): - The node runs in standard validator or passive mode - Seeds the initial value of the live aggregator flag on SyncService and - NetworkService. The flag can be toggled at runtime via the admin API - (see AggregatorController). Runtime toggles do not persist across - restarts and do not update the local ENR or subnet subscriptions. + Seeds the initial value of the live aggregator flag on the sync service. + The flag can be toggled at runtime via the admin API. + Runtime toggles do not persist across restarts. + They do not update the local ENR or subnet subscriptions. """ anchor_store: Store | None = field(default=None) @@ -299,7 +299,6 @@ def from_genesis(cls, config: NodeConfig) -> Node: sync_service=sync_service, event_source=config.event_source, network_name=config.network_name, - is_aggregator=config.is_aggregator, ) # Wire up aggregated attestation publishing. @@ -311,18 +310,14 @@ def from_genesis(cls, config: NodeConfig) -> Node: # Create API server if configured api_server: ApiServer | None = None if config.api_config is not None: - # Controller lets the admin API rotate the aggregator role at - # runtime when another aggregator becomes unhealthy. - aggregator_controller = AggregatorController( - sync_service=sync_service, - network_service=network_service, - ) - # Store getter captures sync_service to get the live store + # The admin API reads and mutates the sync service aggregator flag, + # letting operators rotate the role at runtime without a restart. + # Store getter captures sync_service to get the live store. api_server = ApiServer( config=config.api_config, spec=fork, store_getter=lambda: sync_service.store, - aggregator_controller=aggregator_controller, + aggregator_role_control=sync_service, ) # Create validator service if registry provided. @@ -468,10 +463,6 @@ async def run(self, *, install_signal_handlers: bool = True) -> None: if install_signal_handlers: self._install_signal_handlers() - # Start API server if configured - if self.api_server is not None: - await self.api_server.start() - # Run services concurrently. # # A separate task monitors the shutdown signal. diff --git a/tests/node/api/conftest.py b/tests/node/api/conftest.py index d93f4db17..73d80e3ac 100644 --- a/tests/node/api/conftest.py +++ b/tests/node/api/conftest.py @@ -1,9 +1,10 @@ -""" -Shared pytest fixtures for API server tests. +"""Shared helpers for the API server tests.""" -Core fixtures inherited from parent conftest files. -""" +from dataclasses import dataclass -# All fixtures are inherited from tests/conftest.py via pytest discovery. -# This file exists to establish the conftest hierarchy and can be extended -# with API-specific fixtures as needed. + +@dataclass(slots=True) +class AggregatorRoleStub: + """Minimal stand-in exposing only the aggregator flag the endpoints touch.""" + + is_aggregator: bool = False diff --git a/tests/node/api/endpoints/conftest.py b/tests/node/api/endpoints/conftest.py index d834914ae..33caf08ce 100644 --- a/tests/node/api/endpoints/conftest.py +++ b/tests/node/api/endpoints/conftest.py @@ -3,34 +3,17 @@ import asyncio import threading import time -from dataclasses import dataclass, field from typing import Generator import httpx import pytest from consensus_testing import make_genesis_store -from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.node.api import ApiServer, ApiServerConfig +from tests.node.api.conftest import AggregatorRoleStub -# Default port for auto-started local server DEFAULT_PORT = 15099 - - -@dataclass(slots=True) -class _AggregatorStub: - """Minimal stub exposing only the is_aggregator flag.""" - - is_aggregator: bool = field(default=False) - - -def _make_conformance_controller(initial: bool = False) -> AggregatorController: - """Build an AggregatorController backed by lightweight stubs.""" - sync_stub = _AggregatorStub(is_aggregator=initial) - network_stub = _AggregatorStub(is_aggregator=initial) - return AggregatorController( - sync_service=sync_stub, # type: ignore[arg-type] - network_service=network_stub, # type: ignore[arg-type] - ) +"""Port for the auto-started local server.""" class _ServerThread(threading.Thread): @@ -64,15 +47,14 @@ def run(self) -> None: self.loop.close() def _create_server(self) -> ApiServer: - """Create the API server with a test store and aggregator controller.""" + """Create the API server with a test store and aggregator flag holder.""" store = make_genesis_store(num_validators=3, observer=True, genesis_time=int(time.time())) - controller = _make_conformance_controller(initial=False) config = ApiServerConfig(host="127.0.0.1", port=self.port) return ApiServer( config=config, store_getter=lambda: store, - aggregator_controller=controller, + aggregator_role_control=AggregatorRoleStub(), ) def stop(self) -> None: @@ -128,10 +110,8 @@ def server_url(request: pytest.FixtureRequest) -> Generator[str, None, None]: external_url = request.config.getoption("--server-url") if external_url: - # Use external server yield external_url else: - # Start local server server_thread = _ServerThread(DEFAULT_PORT) server_thread.start() server_thread.ready.wait(timeout=10.0) diff --git a/tests/node/api/endpoints/test_aggregator.py b/tests/node/api/endpoints/test_aggregator.py index 6fb8044e9..e6b52bbab 100644 --- a/tests/node/api/endpoints/test_aggregator.py +++ b/tests/node/api/endpoints/test_aggregator.py @@ -1,9 +1,4 @@ -""" -Tests for the admin aggregator endpoint. - -The conformance server is started with a controller seeded to disabled, -so tests exercise both the happy path and error cases. -""" +"""Tests for the admin aggregator endpoint.""" from __future__ import annotations @@ -14,7 +9,7 @@ class TestAggregatorStatus: """Tests for GET /lean/v0/admin/aggregator.""" def test_returns_200(self, server_url: str) -> None: - """GET returns 200 when the controller is wired.""" + """GET returns 200 when the sync service is wired.""" response = httpx.get(f"{server_url}/lean/v0/admin/aggregator") assert response.status_code == 200 diff --git a/tests/node/api/endpoints/test_checkpoints.py b/tests/node/api/endpoints/test_checkpoints.py index 338e7fe51..a3e626702 100644 --- a/tests/node/api/endpoints/test_checkpoints.py +++ b/tests/node/api/endpoints/test_checkpoints.py @@ -25,27 +25,7 @@ def test_content_type_is_json(self, server_url: str) -> None: content_type = response.headers.get("content-type", "") assert "application/json" in content_type - def test_has_slot(self, server_url: str) -> None: - """Justified checkpoint response has a slot field.""" - response = get_justified_checkpoint(server_url) - response_body = response.json() - - assert "slot" in response_body - assert isinstance(response_body["slot"], int) - assert response_body["slot"] >= 0 - - def test_has_root(self, server_url: str) -> None: - """Justified checkpoint response has a valid root field.""" - response = get_justified_checkpoint(server_url) - response_body = response.json() - - assert "root" in response_body - root = response_body["root"] - - # Root should be a 0x-prefixed hex string (32 bytes = 66 characters with prefix) - assert isinstance(root, str) - assert root.startswith("0x"), "Root must have 0x prefix" - assert len(root) == 66 - - # Should be valid hex - int(root, 16) + def test_justified_checkpoint_is_genesis(self, server_url: str) -> None: + """At genesis the justified checkpoint is the genesis block at slot 0.""" + head = httpx.get(f"{server_url}/lean/v0/fork_choice").json()["head"] + assert get_justified_checkpoint(server_url).json() == {"slot": 0, "root": head} diff --git a/tests/node/api/endpoints/test_health.py b/tests/node/api/endpoints/test_health.py index ce6a95306..5ec9e602c 100644 --- a/tests/node/api/endpoints/test_health.py +++ b/tests/node/api/endpoints/test_health.py @@ -2,7 +2,7 @@ import httpx -from lean_spec.node.api.endpoints import health +from lean_spec.node.api.handlers import SERVICE_NAME, STATUS_HEALTHY def get_health(server_url: str) -> httpx.Response: @@ -26,12 +26,6 @@ def test_health_content_type_is_json(server_url: str) -> None: def test_health_response_structure(server_url: str) -> None: - """Health endpoint returns expected JSON structure.""" + """Health response is exactly the healthy status and service identifier.""" response = get_health(server_url) - response_body = response.json() - - assert "status" in response_body - assert response_body["status"] == health.STATUS_HEALTHY - - assert "service" in response_body - assert response_body["service"] == health.SERVICE_NAME + assert response.json() == {"status": STATUS_HEALTHY, "service": SERVICE_NAME} diff --git a/tests/node/api/endpoints/test_states.py b/tests/node/api/endpoints/test_states.py index 2271e3219..631c059d4 100644 --- a/tests/node/api/endpoints/test_states.py +++ b/tests/node/api/endpoints/test_states.py @@ -27,20 +27,8 @@ def test_content_type_is_octet_stream(self, server_url: str) -> None: content_type = response.headers.get("content-type", "") assert "application/octet-stream" in content_type - def test_ssz_deserializes(self, server_url: str) -> None: - """Finalized state SSZ bytes deserialize to a valid State object.""" - response = get_finalized_state(server_url) - state = State.decode_bytes(response.content) - assert state is not None - - def test_has_valid_slot(self, server_url: str) -> None: - """Finalized state has a non-negative slot.""" - response = get_finalized_state(server_url) - state = State.decode_bytes(response.content) - assert int(state.slot) >= 0 - - def test_has_validators(self, server_url: str) -> None: - """Finalized state has at least one validator.""" - response = get_finalized_state(server_url) - state = State.decode_bytes(response.content) - assert len(state.validators) > 0 + def test_finalized_state_is_genesis(self, server_url: str) -> None: + """Finalized state decodes to the genesis state: slot 0 with three validators.""" + state = State.decode_bytes(get_finalized_state(server_url).content) + assert int(state.slot) == 0 + assert len(state.validators) == 3 diff --git a/tests/node/api/test_aggregator_controller.py b/tests/node/api/test_aggregator_controller.py deleted file mode 100644 index 4e9487bdb..000000000 --- a/tests/node/api/test_aggregator_controller.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Tests for the aggregator runtime controller.""" - -from __future__ import annotations - -import asyncio - -from consensus_testing import MockEventSource, create_mock_sync_service -from lean_spec.node.api import AggregatorController -from lean_spec.node.networking import NetworkService, PeerId -from lean_spec.node.sync import SyncService - -FORK_DIGEST = "0x00000000" - - -def _make_controller( - peer_id: PeerId, - *, - initial: bool = False, -) -> tuple[AggregatorController, SyncService, NetworkService]: - """Build a controller wired to realistic sync/network services.""" - sync_service = create_mock_sync_service(peer_id) - sync_service.is_aggregator = initial - network_service = NetworkService( - sync_service=sync_service, - event_source=MockEventSource(events=[]), - network_name=FORK_DIGEST, - is_aggregator=initial, - ) - controller = AggregatorController( - sync_service=sync_service, - network_service=network_service, - ) - return controller, sync_service, network_service - - -class TestAggregatorControllerRead: - """Tests for the read path.""" - - def test_is_enabled_reflects_sync_service_flag(self, peer_id: PeerId) -> None: - """is_enabled reads the sync service flag as the source of truth.""" - controller, sync_service, _ = _make_controller(peer_id, initial=False) - assert controller.is_enabled() is False - - sync_service.is_aggregator = True - assert controller.is_enabled() is True - - -class TestAggregatorControllerWrite: - """Tests for the write path.""" - - async def test_set_enabled_activates_role(self, peer_id: PeerId) -> None: - """set_enabled(True) flips the flag on both services.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=False) - - previous = await controller.set_enabled(True) - - assert previous is False - assert controller.is_enabled() is True - assert sync_service.is_aggregator is True - assert network_service.is_aggregator is True - - async def test_set_enabled_deactivates_role(self, peer_id: PeerId) -> None: - """set_enabled(False) flips the flag off on both services.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=True) - - previous = await controller.set_enabled(False) - - assert previous is True - assert controller.is_enabled() is False - assert sync_service.is_aggregator is False - assert network_service.is_aggregator is False - - async def test_set_enabled_idempotent(self, peer_id: PeerId) -> None: - """Setting the same value returns the current value and leaves state intact.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=True) - - previous = await controller.set_enabled(True) - - assert previous is True - assert sync_service.is_aggregator is True - assert network_service.is_aggregator is True - - async def test_sequential_toggles_converge(self, peer_id: PeerId) -> None: - """Sequential toggles each see the prior state and converge to the last value.""" - controller, sync_service, network_service = _make_controller(peer_id, initial=False) - - previous_states = await asyncio.gather( - controller.set_enabled(True), - controller.set_enabled(False), - controller.set_enabled(True), - ) - - # asyncio.gather on a single-threaded event loop preserves order. - # Each toggle sees the previous state correctly. - assert previous_states == [False, True, False] - assert controller.is_enabled() is True - assert sync_service.is_aggregator is True - assert network_service.is_aggregator is True diff --git a/tests/node/api/test_server.py b/tests/node/api/test_server.py index 1f616aef0..340abe85a 100644 --- a/tests/node/api/test_server.py +++ b/tests/node/api/test_server.py @@ -1,45 +1,12 @@ -""" -Tests for the API server implementation details. - -API contract tests (status codes, content types, response structure) are in -tests/api/ and also run automatically with `uv run pytest`. - -These tests cover leanSpec-specific implementation details: -- Configuration behavior -- Store integration -- Error handling when store not initialized -""" +"""Tests for the API server implementation: configuration, store wiring, and admin endpoints.""" from __future__ import annotations -from dataclasses import dataclass, field - import httpx -from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.node.api import ApiServer, ApiServerConfig from lean_spec.spec.forks.lstar import Store - - -@dataclass(slots=True) -class _AggregatorStub: - """Minimal aggregator role holder for wiring AggregatorController in tests.""" - - is_aggregator: bool = field(default=False) - - -def _make_test_controller(initial: bool = False) -> AggregatorController: - """ - Build an AggregatorController backed by lightweight stubs. - - Avoids pulling in the full SyncService / NetworkService dependency graph - for endpoint-level tests that only exercise the flag-toggle contract. - """ - sync_stub = _AggregatorStub(is_aggregator=initial) - network_stub = _AggregatorStub(is_aggregator=initial) - return AggregatorController( - sync_service=sync_stub, # type: ignore[arg-type] - network_service=network_stub, # type: ignore[arg-type] - ) +from tests.node.api.conftest import AggregatorRoleStub class TestApiServerConfiguration: @@ -97,6 +64,25 @@ async def test_returns_503_when_store_not_initialized(self) -> None: response = await client.get("http://127.0.0.1:15054/lean/v0/states/finalized") assert response.status_code == 503 + assert response.reason_phrase == "Store not initialized" + + finally: + await server.aclose() + + async def test_returns_404_when_finalized_state_missing(self, base_store: Store) -> None: + """Endpoint returns 404 when the finalized state is absent from the store.""" + store_without_states = base_store.model_copy(update={"states": {}}) + config = ApiServerConfig(port=15072) + server = ApiServer(config=config, store_getter=lambda: store_without_states) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15072/lean/v0/states/finalized") + + assert response.status_code == 404 + assert response.reason_phrase == "Finalized state not available" finally: await server.aclose() @@ -117,6 +103,7 @@ async def test_returns_503_when_store_not_initialized(self) -> None: response = await client.get("http://127.0.0.1:15057/lean/v0/checkpoints/justified") assert response.status_code == 503 + assert response.reason_phrase == "Store not initialized" finally: await server.aclose() @@ -137,6 +124,7 @@ async def test_returns_503_when_store_not_initialized(self) -> None: response = await client.get("http://127.0.0.1:15058/lean/v0/fork_choice") assert response.status_code == 503 + assert response.reason_phrase == "Store not initialized" finally: await server.aclose() @@ -188,12 +176,30 @@ async def test_returns_200_with_initialized_store(self, base_store: Store) -> No finally: await server.aclose() + async def test_validator_count_is_null_when_head_state_missing(self, base_store: Store) -> None: + """Fork choice reports a null validator count when the head post-state is absent.""" + store_without_states = base_store.model_copy(update={"states": {}}) + config = ApiServerConfig(port=15071) + server = ApiServer(config=config, store_getter=lambda: store_without_states) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15071/lean/v0/fork_choice") + + assert response.status_code == 200 + assert response.json()["validator_count"] is None + + finally: + await server.aclose() + class TestAggregatorAdminEndpoint: """Tests for the /lean/v0/admin/aggregator endpoint.""" - async def test_status_returns_503_without_controller(self) -> None: - """GET returns 503 when no controller is wired.""" + async def test_status_returns_503_without_aggregator_control(self) -> None: + """GET returns 503 when no aggregator role control is wired.""" config = ApiServerConfig(port=15060) server = ApiServer(config=config) @@ -204,15 +210,16 @@ async def test_status_returns_503_without_controller(self) -> None: response = await client.get("http://127.0.0.1:15060/lean/v0/admin/aggregator") assert response.status_code == 503 + assert response.reason_phrase == "Aggregator role control not available" finally: await server.aclose() async def test_status_returns_current_role(self) -> None: - """GET returns the current aggregator role from the controller.""" - controller = _make_test_controller(initial=True) + """GET returns the current aggregator role from the sync service flag.""" + aggregator_role_stub = AggregatorRoleStub(is_aggregator=True) config = ApiServerConfig(port=15061) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -229,9 +236,9 @@ async def test_status_returns_current_role(self) -> None: async def test_toggle_activates_role(self) -> None: """POST with enabled=true activates the aggregator role.""" - controller = _make_test_controller(initial=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15062) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -244,7 +251,7 @@ async def test_toggle_activates_role(self) -> None: assert response.status_code == 200 assert response.json() == {"is_aggregator": True, "previous": False} - assert controller.is_enabled() is True + assert aggregator_role_stub.is_aggregator is True # A follow-up GET sees the new value. follow_up = await client.get("http://127.0.0.1:15062/lean/v0/admin/aggregator") @@ -255,9 +262,9 @@ async def test_toggle_activates_role(self) -> None: async def test_toggle_deactivates_role(self) -> None: """POST with enabled=false deactivates the aggregator role.""" - controller = _make_test_controller(initial=True) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=True) config = ApiServerConfig(port=15063) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -270,16 +277,15 @@ async def test_toggle_deactivates_role(self) -> None: assert response.status_code == 200 assert response.json() == {"is_aggregator": False, "previous": True} - assert controller.is_enabled() is False + assert aggregator_role_stub.is_aggregator is False finally: await server.aclose() async def test_toggle_rejects_missing_body(self) -> None: """POST with no body returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15064) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -292,15 +298,15 @@ async def test_toggle_rejects_missing_body(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Invalid JSON body" finally: await server.aclose() async def test_toggle_rejects_missing_field(self) -> None: """POST without 'enabled' field returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15065) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -312,15 +318,16 @@ async def test_toggle_rejects_missing_field(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Missing 'enabled' field in body" finally: await server.aclose() async def test_toggle_rejects_non_boolean(self) -> None: """POST with non-boolean 'enabled' returns 400 and does not change state.""" - controller = _make_test_controller(initial=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15066) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -332,7 +339,8 @@ async def test_toggle_rejects_non_boolean(self) -> None: ) assert response.status_code == 400 - assert controller.is_enabled() is False + assert response.reason_phrase == "'enabled' must be a boolean" + assert aggregator_role_stub.is_aggregator is False finally: await server.aclose() @@ -341,14 +349,14 @@ async def test_sequential_posts_converge(self) -> None: """ Multiple sequential POSTs converge to the last-writer value. - Posts must be issued one at a time (not via ``asyncio.gather``); - with concurrent requests the server-side arrival order is racy and - the last-writer-wins assertion becomes flaky on slower runners - (observed: Python 3.14 macOS). + Posts must be issued one at a time, never concurrently. + Concurrent requests arrive in a racy order. + The last-writer-wins assertion then goes flaky on slower runners. + Observed on Python 3.14 macOS. """ - controller = _make_test_controller(initial=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15067) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -362,16 +370,15 @@ async def test_sequential_posts_converge(self) -> None: ] assert all(r.status_code == 200 for r in responses) - assert controller.is_enabled() is True + assert aggregator_role_stub.is_aggregator is True finally: await server.aclose() async def test_toggle_rejects_null_body(self) -> None: """POST with JSON null body returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15068) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -384,15 +391,15 @@ async def test_toggle_rejects_null_body(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Missing 'enabled' field in body" finally: await server.aclose() async def test_toggle_rejects_array_body(self) -> None: """POST with JSON array body returns 400.""" - controller = _make_test_controller() config = ApiServerConfig(port=15069) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=AggregatorRoleStub()) await server.start() @@ -404,15 +411,16 @@ async def test_toggle_rejects_array_body(self) -> None: ) assert response.status_code == 400 + assert response.reason_phrase == "Missing 'enabled' field in body" finally: await server.aclose() async def test_toggle_rejects_integer_enabled(self) -> None: """POST with integer 1 as enabled returns 400 (must be boolean).""" - controller = _make_test_controller(initial=False) + aggregator_role_stub = AggregatorRoleStub(is_aggregator=False) config = ApiServerConfig(port=15070) - server = ApiServer(config=config, aggregator_controller=controller) + server = ApiServer(config=config, aggregator_role_control=aggregator_role_stub) await server.start() @@ -424,7 +432,8 @@ async def test_toggle_rejects_integer_enabled(self) -> None: ) assert response.status_code == 400 - assert controller.is_enabled() is False + assert response.reason_phrase == "'enabled' must be a boolean" + assert aggregator_role_stub.is_aggregator is False finally: await server.aclose() diff --git a/tests/node/networking/service/test_service.py b/tests/node/networking/service/test_service.py index f71ffeac9..f8cd8c387 100644 --- a/tests/node/networking/service/test_service.py +++ b/tests/node/networking/service/test_service.py @@ -490,27 +490,6 @@ def test_default_fork_digest(self, peer_id: PeerId) -> None: ) assert svc.network_name == "0x00000000" - def test_default_is_aggregator(self, peer_id: PeerId) -> None: - """is_aggregator defaults to False.""" - source = MockEventSource(events=[]) - sync_service = create_mock_sync_service(peer_id) - svc = NetworkService( - sync_service=sync_service, - event_source=source, - ) - assert svc.is_aggregator is False - - def test_custom_is_aggregator(self, peer_id: PeerId) -> None: - """Constructor accepts is_aggregator.""" - source = MockEventSource(events=[]) - sync_service = create_mock_sync_service(peer_id) - svc = NetworkService( - sync_service=sync_service, - event_source=source, - is_aggregator=True, - ) - assert svc.is_aggregator is True - # Behavioral routing to the forkchoice store diff --git a/tests/node/test_node.py b/tests/node/test_node.py index 4c4104ab4..d92b99e5f 100644 --- a/tests/node/test_node.py +++ b/tests/node/test_node.py @@ -694,26 +694,19 @@ class TestRunWithOptionalServices: """Tests for run() with optional API and validator services.""" async def test_run_with_api_server(self, node_config: NodeConfig) -> None: - """run() starts and runs the API server task when configured.""" + """run() runs the API server task when configured.""" config = dataclasses.replace( node_config, api_config=ApiServerConfig(host="127.0.0.1", port=0) ) node = Node.from_genesis(config) assert node.api_server is not None - mock_start = AsyncMock() mock_run = AsyncMock() asyncio.get_running_loop().call_later(0.05, node.stop) - with ( - patch.object(type(node.api_server), "start", mock_start), - patch.object(type(node.api_server), "run", mock_run), - ): + with patch.object(type(node.api_server), "run", mock_run): await node.run(install_signal_handlers=False) - # Both start() (called before TaskGroup) and run() (added to TaskGroup) - # must be awaited for the API server to function. - mock_start.assert_awaited_once() mock_run.assert_awaited_once() async def test_run_with_validator_service(self, node_with_validator: Node) -> None: