Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions src/lean_spec/node/anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Two sources land on the same return shape:

- Genesis: synthesise the store from the genesis validator set.
- Checkpoint: fetch a finalized state from a peer and build the store from it.
- Checkpoint: fetch a finalized block and state from a peer and build the store.

Once the store exists the protocol cannot tell the two sources apart.
"""
Expand All @@ -19,14 +19,12 @@
from lean_spec.node.networking.reqresp.message import Status
from lean_spec.node.sync.checkpoint_sync import (
CheckpointSyncError,
fetch_finalized_block,
fetch_finalized_state,
verify_checkpoint_state,
)
from lean_spec.spec.crypto.merkleization import hash_tree_root
from lean_spec.spec.forks import (
AggregatedAttestations,
Block,
BlockBody,
Checkpoint,
ForkProtocol,
Slot,
Expand Down Expand Up @@ -81,10 +79,12 @@ async def from_checkpoint(
validator_index: ValidatorIndex | None,
) -> Anchor:
"""
Build an anchor by fetching a finalized state from a peer.
Build an anchor by fetching a finalized block and state from a peer.

The fetched state replaces the genesis validator set.
Deposits and exits since genesis are already baked into it.
The fetched block anchors the store at the same finalized root the
network agrees on; a source that cannot serve it cannot be used.

Args:
url: HTTP endpoint of the node serving the checkpoint state.
Expand All @@ -96,6 +96,10 @@ async def from_checkpoint(
CheckpointSyncError: For every failure mode covering transport,
structural verification, and genesis-time mismatch.
"""
# The block comes first: it is small, so an incapable source fails
# fast before the multi-megabyte state download starts.
signed_block = await fetch_finalized_block(url)

state = await fetch_finalized_state(url, fork.state_class)

# Catches a corrupt download before it contaminates the forkchoice store.
Expand All @@ -110,24 +114,17 @@ async def from_checkpoint(
f"local={genesis.genesis_time}"
)

# Reconstruct the anchor block from the header embedded in the state.
# A header stored before its post-state root carries a zero placeholder;
# in that case we recompute the root from the state itself.
# Fork choice only needs identity and lineage, so the body is left empty.
header = state.latest_block_header
state_root = (
header.state_root if header.state_root != Bytes32.zero() else hash_tree_root(state)
)
anchor_block = Block(
slot=header.slot,
proposer_index=header.proposer_index,
parent_root=header.parent_root,
state_root=state_root,
body=BlockBody(attestations=AggregatedAttestations(data=[])),
)
# Both fetches read the snapshot at the finalized root.
# A pairing mismatch means finalization advanced between the two
# requests; refetching is the fix.
if signed_block.block.state_root != hash_tree_root(state):
raise CheckpointSyncError(
"anchor block / state mismatch; "
"source advanced finalization between requests, retry"
)

# The protocol return type is structural, but only one concrete store ships.
store = cast(Store, fork.create_store(state, anchor_block, validator_index))
store = cast(Store, fork.create_store(state, signed_block.block, validator_index))

return cls(
validators=state.validators,
Expand Down
17 changes: 16 additions & 1 deletion src/lean_spec/node/api/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from aiohttp import web

from lean_spec.spec.forks import LstarSpec, Store
from lean_spec.spec.forks import LstarSpec, SignedBlock, Store
from lean_spec.spec.ssz import Bytes32


class AggregatorRoleControl(Protocol):
Expand All @@ -30,6 +31,14 @@ class ApiContext:
aggregator_role_control: AggregatorRoleControl | None
"""Holder of the aggregator flag, or None when aggregator control is unwired."""

signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None
"""
Callable returning the signed block for a block root, or None when unwired.

The fork-choice store retains only unsigned blocks.
Serving the checkpoint-sync anchor block needs a separate signed-block source.
"""

def require_store(self) -> Store:
"""
Return the live store, or raise 503 when the node has no store yet.
Expand All @@ -46,3 +55,9 @@ def require_aggregator_role_control(self) -> AggregatorRoleControl:
if self.aggregator_role_control is None:
raise web.HTTPServiceUnavailable(reason="Aggregator role control not available")
return self.aggregator_role_control

def require_signed_block_getter(self) -> Callable[[Bytes32], SignedBlock | None]:
"""Return the signed-block source, or raise 503 when it is unwired."""
if self.signed_block_getter is None:
raise web.HTTPServiceUnavailable(reason="Signed block source not configured")
return self.signed_block_getter
32 changes: 32 additions & 0 deletions src/lean_spec/node/api/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,38 @@ async def finalized_state(self, request: web.Request) -> web.Response:

return web.Response(body=ssz_bytes, content_type="application/octet-stream")

async def finalized_block(self, request: web.Request) -> web.Response:
"""
Return the finalized signed block as SSZ bytes.

Together with the finalized state endpoint, this gives a
checkpoint-syncing peer the (state, signed block) anchor pair.
External consumers bootstrap their fork-choice store from this pair.

The fork-choice store holds only unsigned blocks.
Serving a signed block therefore needs a separate signed-block source.
Nodes without such a source answer with 503.

Raises:
HTTPNotFound: The source has no block for the finalized root.
HTTPInternalServerError: Encoding the signed block failed.
"""
store = self.context.require_store()
signed_block_getter = self.context.require_signed_block_getter()

signed_block = signed_block_getter(store.latest_finalized.root)
if signed_block is None:
raise web.HTTPNotFound(reason="Finalized signed block not available")

# Encoding a full block is CPU-heavy, so run it off the event loop.
try:
ssz_bytes = await asyncio.to_thread(signed_block.encode_bytes)
except Exception as exception:
logger.error("Failed to encode signed block: %s", exception)
raise web.HTTPInternalServerError(reason="Encoding failed") from exception

return web.Response(body=ssz_bytes, content_type="application/octet-stream")

async def aggregator_status(self, request: web.Request) -> web.Response:
"""Report whether the node is acting as an aggregator."""
aggregator_role_control = self.context.require_aggregator_role_control()
Expand Down
15 changes: 14 additions & 1 deletion src/lean_spec/node/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

from lean_spec.node.api.context import AggregatorRoleControl, ApiContext
from lean_spec.node.api.handlers import ApiHandlers
from lean_spec.spec.forks import LstarSpec, Store
from lean_spec.spec.forks import LstarSpec, SignedBlock, Store
from lean_spec.spec.ssz import Bytes32

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,6 +41,16 @@ class ApiServer:
store_getter: Callable[[], Store | None] | None = None
"""Callable that returns the current Store instance."""

signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None = None
"""
Optional callable returning the signed block for a block root.

The fork-choice store retains only unsigned blocks, so serving the
checkpoint-sync anchor block needs a separate signed-block source.
The embedding node injects one here.
When absent, the finalized block endpoint returns 503.
"""

aggregator_role_control: AggregatorRoleControl | None = None
"""Optional runtime accessor for the node's aggregator role."""

Expand Down Expand Up @@ -67,6 +78,7 @@ async def start(self) -> None:
spec=self.spec,
store_getter=self.store_getter,
aggregator_role_control=self.aggregator_role_control,
signed_block_getter=self.signed_block_getter,
)
handlers = ApiHandlers(context)

Expand All @@ -76,6 +88,7 @@ async def start(self) -> None:
[
web.get("/lean/v0/health", handlers.health),
web.get("/lean/v0/states/finalized", handlers.finalized_state),
web.get("/lean/v0/blocks/finalized", handlers.finalized_block),
web.get("/lean/v0/checkpoints/justified", handlers.justified_checkpoint),
web.get("/lean/v0/fork_choice", handlers.fork_choice),
web.get("/metrics", handlers.metrics),
Expand Down
19 changes: 18 additions & 1 deletion src/lean_spec/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
Validators,
)
from lean_spec.spec.forks.lstar.config import ATTESTATION_COMMITTEE_COUNT
from lean_spec.spec.ssz import Bytes32, Uint64
from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate
from lean_spec.spec.ssz import ByteList512KiB, Bytes32, Uint64

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -310,13 +311,29 @@ def from_genesis(cls, config: NodeConfig) -> Node:
# Create API server if configured
api_server: ApiServer | None = None
if config.api_config is not None:

def signed_block_for_root(block_root: Bytes32) -> SignedBlock | None:
# The store and database retain only unsigned blocks.
# Wrap the looked-up block in an empty proof to serve the
# checkpoint-sync anchor pair.
# The receiving peer pairs the block with the finalized state
# and never verifies this proof, so an empty one suffices.
block = sync_service.store.blocks.get(block_root)
if block is None:
return None
return SignedBlock(
block=block,
proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")),
)

# The admin API reads and mutates the sync service aggregator flag,
# letting operators rotate the role at runtime without a restart.
# Store getter captures sync_service to get the live store.
api_server = ApiServer(
config=config.api_config,
spec=fork,
store_getter=lambda: sync_service.store,
signed_block_getter=signed_block_for_root,
aggregator_role_control=sync_service,
)

Expand Down
59 changes: 58 additions & 1 deletion src/lean_spec/node/sync/checkpoint_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import httpx

from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, State
from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, SignedBlock, State

logger = logging.getLogger(__name__)

Expand All @@ -21,6 +21,13 @@
FINALIZED_STATE_ENDPOINT: Final = "/lean/v0/states/finalized"
"""Beacon API path for the finalized state."""

FINALIZED_BLOCK_ENDPOINT: Final = "/lean/v0/blocks/finalized"
"""API endpoint for fetching the signed block matching the finalized state.

Together with the state, this forms the anchor pair for store creation.
Checkpoint sync requires the source to serve it.
"""


class CheckpointSyncError(Exception):
"""
Expand Down Expand Up @@ -79,6 +86,56 @@ async def fetch_finalized_state(url: str, state_class: type[State]) -> State:
return state


async def fetch_finalized_block(url: str) -> SignedBlock:
"""
Fetch the signed block matching the finalized state via checkpoint sync.

The returned block carries the real body, so its hash tree root equals
the finalized root the rest of the network agrees on.
The caller must verify the block's state root against the fetched state
before pairing them into a store.

Args:
url: Base URL of the node API (e.g., "http://localhost:5052").

Returns:
The finalized signed block.

Raises:
CheckpointSyncError: If the request fails or block bytes are invalid.
"""
base_url = url.rstrip("/")
full_url = f"{base_url}{FINALIZED_BLOCK_ENDPOINT}"

logger.info("Fetching finalized signed block from %s", full_url)

headers = {"Accept": "application/octet-stream"}

try:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
response = await client.get(full_url, headers=headers)
response.raise_for_status()

ssz_data = response.content
logger.info("Downloaded %d bytes of SSZ signed block data", len(ssz_data))

signed_block = SignedBlock.decode_bytes(ssz_data)
logger.info("Deserialized signed block at slot %s", signed_block.block.slot)

return signed_block

except httpx.RequestError as exception:
raise CheckpointSyncError(
f"Network error while connecting to {exception.request.url}: {exception}"
) from exception
except httpx.HTTPStatusError as exception:
raise CheckpointSyncError(
f"HTTP error {exception.response.status_code}: {exception.response.text[:200]}"
) from exception
except Exception as exception:
raise CheckpointSyncError(f"Failed to fetch signed block: {exception}") from exception


def verify_checkpoint_state(state: State) -> bool:
"""
Check structural invariants on a downloaded checkpoint state.
Expand Down
16 changes: 16 additions & 0 deletions tests/node/api/endpoints/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

from consensus_testing import make_genesis_store
from lean_spec.node.api import ApiServer, ApiServerConfig
from lean_spec.spec.forks import SignedBlock
from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate
from lean_spec.spec.ssz import ByteList512KiB, Bytes32
from tests.node.api.conftest import AggregatorRoleStub

DEFAULT_PORT = 15099
Expand Down Expand Up @@ -50,10 +53,23 @@ def _create_server(self) -> ApiServer:
"""Create the API server with a test store and aggregator flag holder."""
store = make_genesis_store(num_validators=3, observer=True, genesis_time=int(time.time()))

def signed_block_for(root: Bytes32) -> SignedBlock | None:
# The store retains only unsigned blocks.
# Wrap the anchor block with an empty proof, like a node
# serving a genesis anchor that no proposer ever signed.
block = store.blocks.get(root)
if block is None:
return None
return SignedBlock(
block=block,
proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")),
)

config = ApiServerConfig(host="127.0.0.1", port=self.port)
return ApiServer(
config=config,
store_getter=lambda: store,
signed_block_getter=signed_block_for,
aggregator_role_control=AggregatorRoleStub(),
)

Expand Down
Loading
Loading