Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 31 additions & 60 deletions src/lean_spec/node/chain/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ChainService:
"""Sync service whose store we tick."""

clock: SlotClock
"""Clock for time calculation."""
"""Source of wall-clock time and interval boundaries."""

spec: LstarSpec = field(default_factory=LstarSpec)
"""Fork spec driving consensus methods."""
Expand All @@ -34,40 +34,27 @@ async def run(self) -> None:
"""Tick the store forward at each interval boundary, until stopped."""
self._running = True

# Catch up store time to current wall clock.
# - Before genesis this returns nothing; the loop handles the wait.
# - After genesis this keeps attestation validation from rejecting valid votes.
# Catch the store time up to the wall clock before looping.
# Otherwise stale store time makes attestation validation reject valid votes.
last_handled_total_interval = await self._initial_tick()

while self._running:
# Wait for genesis if we are before it.
# The clock sleeps exactly until genesis when called before it.
if self.clock.current_time() < self.clock.genesis_time:
await self.clock.sleep_until_next_interval()
continue

total_interval = self.clock.total_intervals()

# Already handled this interval: sleep to the next boundary.
already_handled = (
last_handled_total_interval is not None
and total_interval <= last_handled_total_interval
)
if already_handled:
# Wait when no new interval is due yet.
# Before genesis the clock sleeps straight through to it.
# A frozen clock past startup reports the same interval, so keep waiting.
if (
self.clock.current_time() < self.clock.genesis_time
or total_interval <= last_handled_total_interval
):
await self.clock.sleep_until_next_interval()
if not self._running:
break
# Time may not have advanced during the sleep.
# Skip this iteration to avoid ticking the same interval twice.
total_interval = self.clock.total_intervals()
if total_interval <= last_handled_total_interval:
continue

# Advance the store to the current interval.
# This service never proposes; block production needs validator keys.
continue

# This service only follows the chain; proposing needs validator keys.
new_aggregated_attestations = await self._tick_to(total_interval)

# No publisher is wired in tests or offline runs, so guard on its presence.
# Offline runs and tests wire no publisher, so guard on its presence.
publish = self.sync_service.publish_aggregated_attestation
if new_aggregated_attestations and publish is not None:
for aggregate in new_aggregated_attestations:
Expand All @@ -85,23 +72,17 @@ async def run(self) -> None:

async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAttestation]:
"""
Advance the store to the target interval, skipping stale work and yielding.

When the node falls behind by more than one slot, stale intervals are skipped.
Processing every missed interval synchronously blocks the event loop.
That starves gossip and pushes the node further behind.
Advance the store to the target interval, one interval at a time.

Between remaining ticks, yield so gossip messages can be processed.
Update the sync service store after each tick so gossip handlers see current time.

Returns aggregated attestations produced during the ticks.
Skip stale intervals when far behind so synchronous ticking never starves gossip.
Returns the aggregated attestations produced along the way.
"""
store = self.sync_service.store
all_new_aggregates: list[SignedAggregatedAttestation] = []

# The target comes from the wall clock, which can step backward.
# NTP slew, a leap second, or a VM migration can move it before the store time.
# A backward target ticks nothing, so return the empty result without ticking.
# A backward target would tick nothing, so return early.
if target_interval <= store.time:
return []

Expand All @@ -111,13 +92,14 @@ async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAtte
# That preserves aggregation, safe target, and attestation acceptance.
#
# Acceptance for the jumped slots waits for the final slot's tick.
# That is safe: acceptance is a monotone pool merge, and the head recomputes from scratch.
# That is safe: acceptance only merges into a monotone pool.
# The head recomputes from scratch, so nothing is lost by waiting.
if target_interval - store.time > Interval(INTERVALS_PER_SLOT):
store = store.model_copy(
update={"time": target_interval - Interval(INTERVALS_PER_SLOT)}
)

# Tick remaining intervals one at a time.
# Tick the remaining intervals one at a time.
while store.time < target_interval:
store, new_aggregates = self.spec.tick_interval(
store,
Expand All @@ -134,30 +116,19 @@ async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAtte

return all_new_aggregates

async def _initial_tick(self) -> Interval | None:
"""Catch up store time to wall clock at startup."""
current_time = self.clock.current_time()

# Only tick once past genesis.
if current_time >= self.clock.genesis_time:
target_interval = self.clock.total_intervals()

# Reuse the skip-and-yield path for catch-up.
# Discard aggregated attestations from catch-up.
# During initial sync we may be many slots behind.
# Publishing stale aggregations would spam the network.
await self._tick_to(target_interval)

return target_interval
async def _initial_tick(self) -> Interval:
"""Catch up store time to the wall clock at startup."""
if self.clock.current_time() < self.clock.genesis_time:
return Interval(0)

return None
# Discard the aggregates: at startup we may be many slots behind.
# Publishing those stale aggregations would spam the network.
target_interval = self.clock.total_intervals()
await self._tick_to(target_interval)
return target_interval

def stop(self) -> None:
"""
Stop the service.

The loop exits after its current sleep cycle finishes.
"""
"""Stop the service; the loop exits after its current sleep finishes."""
self._running = False

@property
Expand Down
2 changes: 2 additions & 0 deletions src/lean_spec/node/sync/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ async def on_gossip_block(
# Only update our store if the block was actually processed.
#
# A block may be cached instead of processed if its parent is unknown.
# The processed path never awaits, so a concurrent clock tick cannot
# advance the store between the read above and this write-back.
if gossip_block_outcome.processed:
block_root = hash_tree_root(block.block)
logger.info(
Expand Down
8 changes: 4 additions & 4 deletions tests/node/chain/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ class TestStartupTick:
@pytest.mark.parametrize(
("intervals_elapsed", "expected_interval", "expected_tick_times"),
[
# Before genesis: nothing ticks, the caller waits.
(-0.5, None, []),
# Before genesis: nothing ticks, the anchor interval is reported.
(-0.5, Interval(0), []),
# Exactly at genesis: the store already sits at the anchor.
(0.0, Interval(0), []),
# One slot in: five ticks, no skip.
Expand All @@ -246,10 +246,10 @@ class TestStartupTick:
async def test_catches_store_up_to_wall_clock(
self,
intervals_elapsed: float,
expected_interval: Interval | None,
expected_interval: Interval,
expected_tick_times: list[int],
) -> None:
"""Startup ticks the store to the current interval, or reports waiting if pre-genesis."""
"""Startup ticks the store to the current interval and returns it."""
spec = ProbeSpec()
now = 1000 + intervals_elapsed * INTERVAL_SECONDS
service = make_service(spec, time_fn=lambda: now)
Expand Down
Loading