diff --git a/src/lean_spec/node/anchor.py b/src/lean_spec/node/anchor.py index e36b31704..60e5be073 100644 --- a/src/lean_spec/node/anchor.py +++ b/src/lean_spec/node/anchor.py @@ -5,7 +5,7 @@ Two sources land on the same return shape: - Genesis: synthesise the store from the genesis validator set. -- Checkpoint: fetch a finalized state from a peer and build the store from it. +- Checkpoint: fetch a finalized block and state from a peer and build the store. Once the store exists the protocol cannot tell the two sources apart. """ @@ -19,14 +19,12 @@ from lean_spec.node.networking.reqresp.message import Status from lean_spec.node.sync.checkpoint_sync import ( CheckpointSyncError, + fetch_finalized_block, fetch_finalized_state, verify_checkpoint_state, ) from lean_spec.spec.crypto.merkleization import hash_tree_root from lean_spec.spec.forks import ( - AggregatedAttestations, - Block, - BlockBody, Checkpoint, ForkProtocol, Slot, @@ -81,10 +79,12 @@ async def from_checkpoint( validator_index: ValidatorIndex | None, ) -> Anchor: """ - Build an anchor by fetching a finalized state from a peer. + Build an anchor by fetching a finalized block and state from a peer. The fetched state replaces the genesis validator set. Deposits and exits since genesis are already baked into it. + The fetched block anchors the store at the same finalized root the + network agrees on; a source that cannot serve it cannot be used. Args: url: HTTP endpoint of the node serving the checkpoint state. @@ -96,6 +96,10 @@ async def from_checkpoint( CheckpointSyncError: For every failure mode covering transport, structural verification, and genesis-time mismatch. """ + # The block comes first: it is small, so an incapable source fails + # fast before the multi-megabyte state download starts. + signed_block = await fetch_finalized_block(url) + state = await fetch_finalized_state(url, fork.state_class) # Catches a corrupt download before it contaminates the forkchoice store. @@ -110,24 +114,17 @@ async def from_checkpoint( f"local={genesis.genesis_time}" ) - # Reconstruct the anchor block from the header embedded in the state. - # A header stored before its post-state root carries a zero placeholder; - # in that case we recompute the root from the state itself. - # Fork choice only needs identity and lineage, so the body is left empty. - header = state.latest_block_header - state_root = ( - header.state_root if header.state_root != Bytes32.zero() else hash_tree_root(state) - ) - anchor_block = Block( - slot=header.slot, - proposer_index=header.proposer_index, - parent_root=header.parent_root, - state_root=state_root, - body=BlockBody(attestations=AggregatedAttestations(data=[])), - ) + # Both fetches read the snapshot at the finalized root. + # A pairing mismatch means finalization advanced between the two + # requests; refetching is the fix. + if signed_block.block.state_root != hash_tree_root(state): + raise CheckpointSyncError( + "anchor block / state mismatch; " + "source advanced finalization between requests, retry" + ) # The protocol return type is structural, but only one concrete store ships. - store = cast(Store, fork.create_store(state, anchor_block, validator_index)) + store = cast(Store, fork.create_store(state, signed_block.block, validator_index)) return cls( validators=state.validators, diff --git a/src/lean_spec/node/api/context.py b/src/lean_spec/node/api/context.py index c341a620b..9fa0c8321 100644 --- a/src/lean_spec/node/api/context.py +++ b/src/lean_spec/node/api/context.py @@ -8,7 +8,8 @@ from aiohttp import web -from lean_spec.spec.forks import LstarSpec, Store +from lean_spec.spec.forks import LstarSpec, SignedBlock, Store +from lean_spec.spec.ssz import Bytes32 class AggregatorRoleControl(Protocol): @@ -30,6 +31,14 @@ class ApiContext: aggregator_role_control: AggregatorRoleControl | None """Holder of the aggregator flag, or None when aggregator control is unwired.""" + signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None + """ + Callable returning the signed block for a block root, or None when unwired. + + The fork-choice store retains only unsigned blocks. + Serving the checkpoint-sync anchor block needs a separate signed-block source. + """ + def require_store(self) -> Store: """ Return the live store, or raise 503 when the node has no store yet. @@ -46,3 +55,9 @@ def require_aggregator_role_control(self) -> AggregatorRoleControl: if self.aggregator_role_control is None: raise web.HTTPServiceUnavailable(reason="Aggregator role control not available") return self.aggregator_role_control + + def require_signed_block_getter(self) -> Callable[[Bytes32], SignedBlock | None]: + """Return the signed-block source, or raise 503 when it is unwired.""" + if self.signed_block_getter is None: + raise web.HTTPServiceUnavailable(reason="Signed block source not configured") + return self.signed_block_getter diff --git a/src/lean_spec/node/api/handlers.py b/src/lean_spec/node/api/handlers.py index f3c709a5c..43542e428 100644 --- a/src/lean_spec/node/api/handlers.py +++ b/src/lean_spec/node/api/handlers.py @@ -129,6 +129,38 @@ async def finalized_state(self, request: web.Request) -> web.Response: return web.Response(body=ssz_bytes, content_type="application/octet-stream") + async def finalized_block(self, request: web.Request) -> web.Response: + """ + Return the finalized signed block as SSZ bytes. + + Together with the finalized state endpoint, this gives a + checkpoint-syncing peer the (state, signed block) anchor pair. + External consumers bootstrap their fork-choice store from this pair. + + The fork-choice store holds only unsigned blocks. + Serving a signed block therefore needs a separate signed-block source. + Nodes without such a source answer with 503. + + Raises: + HTTPNotFound: The source has no block for the finalized root. + HTTPInternalServerError: Encoding the signed block failed. + """ + store = self.context.require_store() + signed_block_getter = self.context.require_signed_block_getter() + + signed_block = signed_block_getter(store.latest_finalized.root) + if signed_block is None: + raise web.HTTPNotFound(reason="Finalized signed block not available") + + # Encoding a full block is CPU-heavy, so run it off the event loop. + try: + ssz_bytes = await asyncio.to_thread(signed_block.encode_bytes) + except Exception as exception: + logger.error("Failed to encode signed block: %s", exception) + raise web.HTTPInternalServerError(reason="Encoding failed") from exception + + return web.Response(body=ssz_bytes, content_type="application/octet-stream") + async def aggregator_status(self, request: web.Request) -> web.Response: """Report whether the node is acting as an aggregator.""" aggregator_role_control = self.context.require_aggregator_role_control() diff --git a/src/lean_spec/node/api/server.py b/src/lean_spec/node/api/server.py index 7726e16fe..3693595bc 100644 --- a/src/lean_spec/node/api/server.py +++ b/src/lean_spec/node/api/server.py @@ -11,7 +11,8 @@ from lean_spec.node.api.context import AggregatorRoleControl, ApiContext from lean_spec.node.api.handlers import ApiHandlers -from lean_spec.spec.forks import LstarSpec, Store +from lean_spec.spec.forks import LstarSpec, SignedBlock, Store +from lean_spec.spec.ssz import Bytes32 logger = logging.getLogger(__name__) @@ -40,6 +41,16 @@ class ApiServer: store_getter: Callable[[], Store | None] | None = None """Callable that returns the current Store instance.""" + signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None = None + """ + Optional callable returning the signed block for a block root. + + The fork-choice store retains only unsigned blocks, so serving the + checkpoint-sync anchor block needs a separate signed-block source. + The embedding node injects one here. + When absent, the finalized block endpoint returns 503. + """ + aggregator_role_control: AggregatorRoleControl | None = None """Optional runtime accessor for the node's aggregator role.""" @@ -67,6 +78,7 @@ async def start(self) -> None: spec=self.spec, store_getter=self.store_getter, aggregator_role_control=self.aggregator_role_control, + signed_block_getter=self.signed_block_getter, ) handlers = ApiHandlers(context) @@ -76,6 +88,7 @@ async def start(self) -> None: [ web.get("/lean/v0/health", handlers.health), web.get("/lean/v0/states/finalized", handlers.finalized_state), + web.get("/lean/v0/blocks/finalized", handlers.finalized_block), web.get("/lean/v0/checkpoints/justified", handlers.justified_checkpoint), web.get("/lean/v0/fork_choice", handlers.fork_choice), web.get("/metrics", handlers.metrics), diff --git a/src/lean_spec/node/node.py b/src/lean_spec/node/node.py index d54e95195..9d23a3012 100644 --- a/src/lean_spec/node/node.py +++ b/src/lean_spec/node/node.py @@ -44,7 +44,8 @@ Validators, ) from lean_spec.spec.forks.lstar.config import ATTESTATION_COMMITTEE_COUNT -from lean_spec.spec.ssz import Bytes32, Uint64 +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate +from lean_spec.spec.ssz import ByteList512KiB, Bytes32, Uint64 logger = logging.getLogger(__name__) @@ -310,6 +311,21 @@ def from_genesis(cls, config: NodeConfig) -> Node: # Create API server if configured api_server: ApiServer | None = None if config.api_config is not None: + + def signed_block_for_root(block_root: Bytes32) -> SignedBlock | None: + # The store and database retain only unsigned blocks. + # Wrap the looked-up block in an empty proof to serve the + # checkpoint-sync anchor pair. + # The receiving peer pairs the block with the finalized state + # and never verifies this proof, so an empty one suffices. + block = sync_service.store.blocks.get(block_root) + if block is None: + return None + return SignedBlock( + block=block, + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + # The admin API reads and mutates the sync service aggregator flag, # letting operators rotate the role at runtime without a restart. # Store getter captures sync_service to get the live store. @@ -317,6 +333,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: config=config.api_config, spec=fork, store_getter=lambda: sync_service.store, + signed_block_getter=signed_block_for_root, aggregator_role_control=sync_service, ) diff --git a/src/lean_spec/node/sync/checkpoint_sync.py b/src/lean_spec/node/sync/checkpoint_sync.py index 9207f45b9..a52a8cb7d 100644 --- a/src/lean_spec/node/sync/checkpoint_sync.py +++ b/src/lean_spec/node/sync/checkpoint_sync.py @@ -7,7 +7,7 @@ import httpx -from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, State +from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, SignedBlock, State logger = logging.getLogger(__name__) @@ -21,6 +21,13 @@ FINALIZED_STATE_ENDPOINT: Final = "/lean/v0/states/finalized" """Beacon API path for the finalized state.""" +FINALIZED_BLOCK_ENDPOINT: Final = "/lean/v0/blocks/finalized" +"""API endpoint for fetching the signed block matching the finalized state. + +Together with the state, this forms the anchor pair for store creation. +Checkpoint sync requires the source to serve it. +""" + class CheckpointSyncError(Exception): """ @@ -79,6 +86,56 @@ async def fetch_finalized_state(url: str, state_class: type[State]) -> State: return state +async def fetch_finalized_block(url: str) -> SignedBlock: + """ + Fetch the signed block matching the finalized state via checkpoint sync. + + The returned block carries the real body, so its hash tree root equals + the finalized root the rest of the network agrees on. + The caller must verify the block's state root against the fetched state + before pairing them into a store. + + Args: + url: Base URL of the node API (e.g., "http://localhost:5052"). + + Returns: + The finalized signed block. + + Raises: + CheckpointSyncError: If the request fails or block bytes are invalid. + """ + base_url = url.rstrip("/") + full_url = f"{base_url}{FINALIZED_BLOCK_ENDPOINT}" + + logger.info("Fetching finalized signed block from %s", full_url) + + headers = {"Accept": "application/octet-stream"} + + try: + async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: + response = await client.get(full_url, headers=headers) + response.raise_for_status() + + ssz_data = response.content + logger.info("Downloaded %d bytes of SSZ signed block data", len(ssz_data)) + + signed_block = SignedBlock.decode_bytes(ssz_data) + logger.info("Deserialized signed block at slot %s", signed_block.block.slot) + + return signed_block + + except httpx.RequestError as exception: + raise CheckpointSyncError( + f"Network error while connecting to {exception.request.url}: {exception}" + ) from exception + except httpx.HTTPStatusError as exception: + raise CheckpointSyncError( + f"HTTP error {exception.response.status_code}: {exception.response.text[:200]}" + ) from exception + except Exception as exception: + raise CheckpointSyncError(f"Failed to fetch signed block: {exception}") from exception + + def verify_checkpoint_state(state: State) -> bool: """ Check structural invariants on a downloaded checkpoint state. diff --git a/tests/node/api/endpoints/conftest.py b/tests/node/api/endpoints/conftest.py index 33caf08ce..f9c38dad3 100644 --- a/tests/node/api/endpoints/conftest.py +++ b/tests/node/api/endpoints/conftest.py @@ -10,6 +10,9 @@ from consensus_testing import make_genesis_store from lean_spec.node.api import ApiServer, ApiServerConfig +from lean_spec.spec.forks import SignedBlock +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 from tests.node.api.conftest import AggregatorRoleStub DEFAULT_PORT = 15099 @@ -50,10 +53,23 @@ def _create_server(self) -> ApiServer: """Create the API server with a test store and aggregator flag holder.""" store = make_genesis_store(num_validators=3, observer=True, genesis_time=int(time.time())) + def signed_block_for(root: Bytes32) -> SignedBlock | None: + # The store retains only unsigned blocks. + # Wrap the anchor block with an empty proof, like a node + # serving a genesis anchor that no proposer ever signed. + block = store.blocks.get(root) + if block is None: + return None + return SignedBlock( + block=block, + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + config = ApiServerConfig(host="127.0.0.1", port=self.port) return ApiServer( config=config, store_getter=lambda: store, + signed_block_getter=signed_block_for, aggregator_role_control=AggregatorRoleStub(), ) diff --git a/tests/node/api/endpoints/test_blocks.py b/tests/node/api/endpoints/test_blocks.py new file mode 100644 index 000000000..e0f4d4146 --- /dev/null +++ b/tests/node/api/endpoints/test_blocks.py @@ -0,0 +1,54 @@ +"""Tests for the blocks endpoints.""" + +import httpx + +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import SignedBlock +from lean_spec.spec.forks.lstar import State + + +def get_finalized_block(server_url: str) -> httpx.Response: + """Fetch the finalized signed block from the server.""" + return httpx.get( + f"{server_url}/lean/v0/blocks/finalized", + headers={"Accept": "application/octet-stream"}, + ) + + +class TestFinalizedBlock: + """Tests for the /lean/v0/blocks/finalized endpoint.""" + + def test_returns_200(self, server_url: str) -> None: + """Finalized block endpoint returns 200 status code.""" + response = get_finalized_block(server_url) + assert response.status_code == 200 + + def test_content_type_is_octet_stream(self, server_url: str) -> None: + """Finalized block endpoint returns octet-stream content type.""" + response = get_finalized_block(server_url) + content_type = response.headers.get("content-type", "") + assert "application/octet-stream" in content_type + + def test_ssz_deserializes(self, server_url: str) -> None: + """Finalized block SSZ bytes deserialize to a valid SignedBlock object.""" + response = get_finalized_block(server_url) + signed_block = SignedBlock.decode_bytes(response.content) + assert signed_block is not None + + def test_state_root_matches_finalized_state(self, server_url: str) -> None: + """ + Returned block's state root equals the finalized state's hash tree root. + + Store creation from a checkpoint asserts exactly this. + If it fails, the (state, signed block) pair cannot bootstrap a store. + """ + block_response = get_finalized_block(server_url) + signed_block = SignedBlock.decode_bytes(block_response.content) + + state_response = httpx.get( + f"{server_url}/lean/v0/states/finalized", + headers={"Accept": "application/octet-stream"}, + ) + state = State.decode_bytes(state_response.content) + + assert signed_block.block.state_root == hash_tree_root(state) diff --git a/tests/node/api/test_server.py b/tests/node/api/test_server.py index 340abe85a..b6c800c55 100644 --- a/tests/node/api/test_server.py +++ b/tests/node/api/test_server.py @@ -2,10 +2,16 @@ from __future__ import annotations +from collections.abc import Callable + import httpx from lean_spec.node.api import ApiServer, ApiServerConfig +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import SignedBlock from lean_spec.spec.forks.lstar import Store +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 from tests.node.api.conftest import AggregatorRoleStub @@ -88,6 +94,102 @@ async def test_returns_404_when_finalized_state_missing(self, base_store: Store) await server.aclose() +class TestFinalizedBlockEndpoint: + """Tests for the /lean/v0/blocks/finalized endpoint.""" + + @staticmethod + def _signed_block_getter_for(store: Store) -> Callable[[Bytes32], SignedBlock | None]: + """Build a signed-block lookup wrapping the store's unsigned blocks.""" + + def signed_block_for(root: Bytes32) -> SignedBlock | None: + block = store.blocks.get(root) + if block is None: + return None + return SignedBlock( + block=block, + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + + return signed_block_for + + async def test_returns_503_when_store_not_initialized(self) -> None: + """Endpoint returns 503 Service Unavailable when store is not set.""" + config = ApiServerConfig(port=15071) + server = ApiServer(config=config) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15071/lean/v0/blocks/finalized") + + assert response.status_code == 503 + + finally: + await server.aclose() + + async def test_returns_503_without_signed_block_source(self, base_store: Store) -> None: + """Endpoint returns 503 when no signed-block source is configured.""" + config = ApiServerConfig(port=15075) + server = ApiServer(config=config, store_getter=lambda: base_store) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15075/lean/v0/blocks/finalized") + + assert response.status_code == 503 + + finally: + await server.aclose() + + async def test_returns_404_when_block_unavailable(self, base_store: Store) -> None: + """Endpoint returns 404 when the source has no block for the finalized root.""" + config = ApiServerConfig(port=15073) + server = ApiServer( + config=config, + store_getter=lambda: base_store, + signed_block_getter=lambda root: None, + ) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15073/lean/v0/blocks/finalized") + + assert response.status_code == 404 + + finally: + await server.aclose() + + async def test_returns_finalized_anchor_block(self, base_store: Store) -> None: + """Endpoint serves the signed block matching the finalized checkpoint root.""" + config = ApiServerConfig(port=15074) + server = ApiServer( + config=config, + store_getter=lambda: base_store, + signed_block_getter=self._signed_block_getter_for(base_store), + ) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15074/lean/v0/blocks/finalized") + + assert response.status_code == 200 + assert "application/octet-stream" in response.headers["content-type"] + + signed_block = SignedBlock.decode_bytes(response.content) + + assert hash_tree_root(signed_block.block) == base_store.latest_finalized.root + + finally: + await server.aclose() + + class TestJustifiedCheckpointEndpoint: """Tests for the /lean/v0/checkpoints/justified endpoint error handling.""" diff --git a/tests/node/sync/test_checkpoint_sync.py b/tests/node/sync/test_checkpoint_sync.py index 5896ffc48..184c123bd 100644 --- a/tests/node/sync/test_checkpoint_sync.py +++ b/tests/node/sync/test_checkpoint_sync.py @@ -9,14 +9,18 @@ from lean_spec.node.api import ApiServer, ApiServerConfig from lean_spec.node.sync.checkpoint_sync import ( + FINALIZED_BLOCK_ENDPOINT, FINALIZED_STATE_ENDPOINT, CheckpointSyncError, + fetch_finalized_block, fetch_finalized_state, verify_checkpoint_state, ) -from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, Slot +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, SignedBlock, Slot from lean_spec.spec.forks.lstar import State, Store -from lean_spec.spec.forks.lstar.containers import Validators +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate, Validators +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 class _MockTransport(httpx.AsyncBaseTransport): @@ -196,6 +200,79 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: assert captured == [f"http://example.com{FINALIZED_STATE_ENDPOINT}"] +class TestFetchFinalizedBlock: + """ + Tests for error handling when fetching the finalized block over HTTP. + + Mirrors the state-fetch error tests: failures are injected through the + transport so the real httpx client and error wrapping run unchanged. + """ + + async def test_network_error_raises_checkpoint_sync_error(self) -> None: + """TCP-level failure surfaces as CheckpointSyncError with the URL.""" + transport = _MockTransport( + exc=httpx.RequestError( + "connection refused", + request=httpx.Request("GET", f"http://example.com{FINALIZED_BLOCK_ENDPOINT}"), + ) + ) + + with ( + patch( + "lean_spec.node.sync.checkpoint_sync.httpx.AsyncClient", + return_value=httpx.AsyncClient(transport=transport), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await fetch_finalized_block("http://example.com") + assert str(exception_info.value) == ( + "Network error while connecting to " + "http://example.com/lean/v0/blocks/finalized: connection refused" + ) + + @pytest.mark.parametrize( + ("status_code", "status_text"), + [ + (404, "Not Found"), + (503, "Service Unavailable"), + ], + ) + async def test_http_error_response_raises_checkpoint_sync_error( + self, status_code: int, status_text: str + ) -> None: + """ + Non-success HTTP status surfaces as CheckpointSyncError with the code. + + Covers missing endpoints (404) and sources without a signed-block + source (503), the two cases the anchor builder falls back on. + """ + transport = _MockTransport(status=status_code, content=status_text.encode()) + + with ( + patch( + "lean_spec.node.sync.checkpoint_sync.httpx.AsyncClient", + return_value=httpx.AsyncClient(transport=transport), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await fetch_finalized_block("http://example.com") + assert str(exception_info.value) == f"HTTP error {status_code}: {status_text}" + + async def test_corrupt_ssz_raises_checkpoint_sync_error(self) -> None: + """Corrupt response body surfaces as CheckpointSyncError.""" + transport = _MockTransport(content=b"\xff\xfe corrupt") + + with ( + patch( + "lean_spec.node.sync.checkpoint_sync.httpx.AsyncClient", + return_value=httpx.AsyncClient(transport=transport), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await fetch_finalized_block("http://example.com") + assert str(exception_info.value).startswith("Failed to fetch signed block: ") + + class TestCheckpointSyncClientServerIntegration: """Integration tests for checkpoint sync client fetching from server.""" @@ -217,3 +294,46 @@ async def test_client_fetches_and_deserializes_state(self, base_store: Store) -> finally: await server.aclose() + + async def test_client_fetches_and_deserializes_block(self, base_store: Store) -> None: + """Client fetches the finalized block the server's signed-block source provides.""" + + def signed_block_for(root: Bytes32) -> SignedBlock | None: + block = base_store.blocks.get(root) + if block is None: + return None + return SignedBlock( + block=block, + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + + config = ApiServerConfig(port=15075) + server = ApiServer( + config=config, + store_getter=lambda: base_store, + signed_block_getter=signed_block_for, + ) + + await server.start() + + try: + signed_block = await fetch_finalized_block("http://127.0.0.1:15075") + + assert hash_tree_root(signed_block.block) == base_store.latest_finalized.root + + finally: + await server.aclose() + + async def test_block_fetch_raises_when_source_unconfigured(self, base_store: Store) -> None: + """A server without a signed-block source yields the 503 error path.""" + config = ApiServerConfig(port=15076) + server = ApiServer(config=config, store_getter=lambda: base_store) + + await server.start() + + try: + with pytest.raises(CheckpointSyncError, match="HTTP error 503"): + await fetch_finalized_block("http://127.0.0.1:15076") + + finally: + await server.aclose() diff --git a/tests/node/test_anchor.py b/tests/node/test_anchor.py index 0979ee86f..64ca5fb6a 100644 --- a/tests/node/test_anchor.py +++ b/tests/node/test_anchor.py @@ -6,13 +6,24 @@ import pytest -from consensus_testing import make_genesis_state +from consensus_testing import make_genesis_state, reconstruct_block_from_header from lean_spec.node.anchor import Anchor from lean_spec.node.genesis import GenesisConfig from lean_spec.node.sync.checkpoint_sync import CheckpointSyncError -from lean_spec.spec.forks import Slot +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import SignedBlock, Slot +from lean_spec.spec.forks.lstar import State +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate from lean_spec.spec.forks.lstar.spec import LstarSpec -from lean_spec.spec.ssz import Bytes32 +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 + + +def _signed_genesis_block(state: State) -> SignedBlock: + """Wrap the genesis block matching a state with an empty proof.""" + return SignedBlock( + block=reconstruct_block_from_header(state), + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) class TestAnchorFromGenesis: @@ -43,6 +54,11 @@ async def test_genesis_time_mismatch_raises(self) -> None: ) with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=_signed_genesis_block(checkpoint_state), + ), patch( "lean_spec.node.anchor.fetch_finalized_state", new_callable=AsyncMock, @@ -66,6 +82,11 @@ async def test_verification_failure_raises(self) -> None: ) with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=_signed_genesis_block(checkpoint_state), + ), patch( "lean_spec.node.anchor.fetch_finalized_state", new_callable=AsyncMock, @@ -85,13 +106,41 @@ async def test_verification_failure_raises(self) -> None: ) assert str(exception_info.value) == ("checkpoint state failed structural verification") - async def test_network_error_propagates(self) -> None: - """Network errors surface as CheckpointSyncError.""" + async def test_block_fetch_failure_propagates(self) -> None: + """A source that cannot serve the finalized block aborts checkpoint sync.""" local_genesis = GenesisConfig.model_validate( {"GENESIS_TIME": 1000, "GENESIS_VALIDATORS": []} ) with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + side_effect=CheckpointSyncError("HTTP error 503: no signed block source"), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await Anchor.from_checkpoint( + url="http://localhost:5052", + genesis=local_genesis, + fork=LstarSpec(), + validator_index=None, + ) + assert str(exception_info.value) == "HTTP error 503: no signed block source" + + async def test_state_fetch_failure_propagates(self) -> None: + """Network errors on the state fetch surface as CheckpointSyncError.""" + checkpoint_state = make_genesis_state(num_validators=3, genesis_time=1000) + local_genesis = GenesisConfig.model_validate( + {"GENESIS_TIME": 1000, "GENESIS_VALIDATORS": []} + ) + + with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=_signed_genesis_block(checkpoint_state), + ), patch( "lean_spec.node.anchor.fetch_finalized_state", new_callable=AsyncMock, @@ -111,14 +160,22 @@ async def test_success_builds_store_and_status(self) -> None: """Successful checkpoint sync produces a populated anchor.""" genesis_time = 1000 checkpoint_state = make_genesis_state(num_validators=3, genesis_time=genesis_time) + signed_block = _signed_genesis_block(checkpoint_state) local_genesis = GenesisConfig.model_validate( {"GENESIS_TIME": genesis_time, "GENESIS_VALIDATORS": []} ) - with patch( - "lean_spec.node.anchor.fetch_finalized_state", - new_callable=AsyncMock, - return_value=checkpoint_state, + with ( + patch( + "lean_spec.node.anchor.fetch_finalized_state", + new_callable=AsyncMock, + return_value=checkpoint_state, + ), + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=signed_block, + ), ): anchor = await Anchor.from_checkpoint( url="http://localhost:5052", @@ -130,3 +187,36 @@ async def test_success_builds_store_and_status(self) -> None: assert anchor.store is not None assert anchor.validators == checkpoint_state.validators assert anchor.initial_status.finalized == anchor.store.latest_finalized + # The anchor is keyed by the fetched block's root, so the store's + # finalized checkpoint matches the root the network agrees on. + assert anchor.store.latest_finalized.root == hash_tree_root(signed_block.block) + + async def test_block_state_pairing_mismatch_raises(self) -> None: + """A block not matching the fetched state raises instead of falling back.""" + genesis_time = 1000 + checkpoint_state = make_genesis_state(num_validators=3, genesis_time=genesis_time) + other_state = make_genesis_state(num_validators=4, genesis_time=genesis_time) + mismatched_block = _signed_genesis_block(other_state) + local_genesis = GenesisConfig.model_validate( + {"GENESIS_TIME": genesis_time, "GENESIS_VALIDATORS": []} + ) + + with ( + patch( + "lean_spec.node.anchor.fetch_finalized_state", + new_callable=AsyncMock, + return_value=checkpoint_state, + ), + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=mismatched_block, + ), + pytest.raises(CheckpointSyncError, match="mismatch"), + ): + await Anchor.from_checkpoint( + url="http://localhost:5052", + genesis=local_genesis, + fork=LstarSpec(), + validator_index=None, + ) diff --git a/tests/node/test_node.py b/tests/node/test_node.py index d92b99e5f..29c8cbfc1 100644 --- a/tests/node/test_node.py +++ b/tests/node/test_node.py @@ -40,10 +40,12 @@ JustificationRoots, JustificationValidators, JustifiedSlots, + MultiMessageAggregate, + SignedBlock, Validators, ) from lean_spec.spec.forks.lstar.spec import LstarSpec -from lean_spec.spec.ssz import Bytes32, Uint64 +from lean_spec.spec.ssz import ByteList512KiB, Bytes32, Uint64 GENESIS_TIME = Uint64(1704067200) @@ -295,6 +297,37 @@ def test_api_server_none_when_no_config(self, node_config: NodeConfig) -> None: node = Node.from_genesis(node_config) assert node.api_server is None + def test_api_server_serves_finalized_signed_block(self, node_config: NodeConfig) -> None: + """The wired signed-block source returns the finalized block in an empty proof.""" + config = dataclasses.replace( + node_config, api_config=ApiServerConfig(host="127.0.0.1", port=5052) + ) + node = Node.from_genesis(config) + assert node.api_server is not None + assert node.api_server.signed_block_getter is not None + + store = node.api_server.store + assert store is not None + finalized_root = store.latest_finalized.root + + assert node.api_server.signed_block_getter(finalized_root) == SignedBlock( + block=store.blocks[finalized_root], + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + + def test_api_server_signed_block_source_returns_none_for_unknown_root( + self, node_config: NodeConfig + ) -> None: + """The wired signed-block source returns None for a root the store does not hold.""" + config = dataclasses.replace( + node_config, api_config=ApiServerConfig(host="127.0.0.1", port=5052) + ) + node = Node.from_genesis(config) + assert node.api_server is not None + assert node.api_server.signed_block_getter is not None + + assert node.api_server.signed_block_getter(Bytes32.zero()) is None + def test_validator_service_created_when_registry_provided( self, node_with_validator: Node ) -> None: