diff --git a/src/lean_spec/node/chain/service.py b/src/lean_spec/node/chain/service.py index d98c94f7..2071b56f 100644 --- a/src/lean_spec/node/chain/service.py +++ b/src/lean_spec/node/chain/service.py @@ -22,7 +22,7 @@ class ChainService: """Sync service whose store we tick.""" clock: SlotClock - """Clock for time calculation.""" + """Source of wall-clock time and interval boundaries.""" spec: LstarSpec = field(default_factory=LstarSpec) """Fork spec driving consensus methods.""" @@ -34,40 +34,27 @@ async def run(self) -> None: """Tick the store forward at each interval boundary, until stopped.""" self._running = True - # Catch up store time to current wall clock. - # - Before genesis this returns nothing; the loop handles the wait. - # - After genesis this keeps attestation validation from rejecting valid votes. + # Catch the store time up to the wall clock before looping. + # Otherwise stale store time makes attestation validation reject valid votes. last_handled_total_interval = await self._initial_tick() while self._running: - # Wait for genesis if we are before it. - # The clock sleeps exactly until genesis when called before it. - if self.clock.current_time() < self.clock.genesis_time: - await self.clock.sleep_until_next_interval() - continue - total_interval = self.clock.total_intervals() - # Already handled this interval: sleep to the next boundary. - already_handled = ( - last_handled_total_interval is not None - and total_interval <= last_handled_total_interval - ) - if already_handled: + # Wait when no new interval is due yet. + # Before genesis the clock sleeps straight through to it. + # A frozen clock past startup reports the same interval, so keep waiting. + if ( + self.clock.current_time() < self.clock.genesis_time + or total_interval <= last_handled_total_interval + ): await self.clock.sleep_until_next_interval() - if not self._running: - break - # Time may not have advanced during the sleep. - # Skip this iteration to avoid ticking the same interval twice. - total_interval = self.clock.total_intervals() - if total_interval <= last_handled_total_interval: - continue - - # Advance the store to the current interval. - # This service never proposes; block production needs validator keys. + continue + + # This service only follows the chain; proposing needs validator keys. new_aggregated_attestations = await self._tick_to(total_interval) - # No publisher is wired in tests or offline runs, so guard on its presence. + # Offline runs and tests wire no publisher, so guard on its presence. publish = self.sync_service.publish_aggregated_attestation if new_aggregated_attestations and publish is not None: for aggregate in new_aggregated_attestations: @@ -85,23 +72,17 @@ async def run(self) -> None: async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAttestation]: """ - Advance the store to the target interval, skipping stale work and yielding. - - When the node falls behind by more than one slot, stale intervals are skipped. - Processing every missed interval synchronously blocks the event loop. - That starves gossip and pushes the node further behind. + Advance the store to the target interval, one interval at a time. - Between remaining ticks, yield so gossip messages can be processed. - Update the sync service store after each tick so gossip handlers see current time. - - Returns aggregated attestations produced during the ticks. + Skip stale intervals when far behind so synchronous ticking never starves gossip. + Returns the aggregated attestations produced along the way. """ store = self.sync_service.store all_new_aggregates: list[SignedAggregatedAttestation] = [] # The target comes from the wall clock, which can step backward. # NTP slew, a leap second, or a VM migration can move it before the store time. - # A backward target ticks nothing, so return the empty result without ticking. + # A backward target would tick nothing, so return early. if target_interval <= store.time: return [] @@ -111,13 +92,14 @@ async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAtte # That preserves aggregation, safe target, and attestation acceptance. # # Acceptance for the jumped slots waits for the final slot's tick. - # That is safe: acceptance is a monotone pool merge, and the head recomputes from scratch. + # That is safe: acceptance only merges into a monotone pool. + # The head recomputes from scratch, so nothing is lost by waiting. if target_interval - store.time > Interval(INTERVALS_PER_SLOT): store = store.model_copy( update={"time": target_interval - Interval(INTERVALS_PER_SLOT)} ) - # Tick remaining intervals one at a time. + # Tick the remaining intervals one at a time. while store.time < target_interval: store, new_aggregates = self.spec.tick_interval( store, @@ -134,30 +116,19 @@ async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAtte return all_new_aggregates - async def _initial_tick(self) -> Interval | None: - """Catch up store time to wall clock at startup.""" - current_time = self.clock.current_time() - - # Only tick once past genesis. - if current_time >= self.clock.genesis_time: - target_interval = self.clock.total_intervals() - - # Reuse the skip-and-yield path for catch-up. - # Discard aggregated attestations from catch-up. - # During initial sync we may be many slots behind. - # Publishing stale aggregations would spam the network. - await self._tick_to(target_interval) - - return target_interval + async def _initial_tick(self) -> Interval: + """Catch up store time to the wall clock at startup.""" + if self.clock.current_time() < self.clock.genesis_time: + return Interval(0) - return None + # Discard the aggregates: at startup we may be many slots behind. + # Publishing those stale aggregations would spam the network. + target_interval = self.clock.total_intervals() + await self._tick_to(target_interval) + return target_interval def stop(self) -> None: - """ - Stop the service. - - The loop exits after its current sleep cycle finishes. - """ + """Stop the service; the loop exits after its current sleep finishes.""" self._running = False @property diff --git a/src/lean_spec/node/sync/service.py b/src/lean_spec/node/sync/service.py index d1b14ea5..b6fe503b 100644 --- a/src/lean_spec/node/sync/service.py +++ b/src/lean_spec/node/sync/service.py @@ -380,6 +380,8 @@ async def on_gossip_block( # Only update our store if the block was actually processed. # # A block may be cached instead of processed if its parent is unknown. + # The processed path never awaits, so a concurrent clock tick cannot + # advance the store between the read above and this write-back. if gossip_block_outcome.processed: block_root = hash_tree_root(block.block) logger.info( diff --git a/tests/node/chain/test_service.py b/tests/node/chain/test_service.py index 7d8cf561..1f8fb181 100644 --- a/tests/node/chain/test_service.py +++ b/tests/node/chain/test_service.py @@ -233,8 +233,8 @@ class TestStartupTick: @pytest.mark.parametrize( ("intervals_elapsed", "expected_interval", "expected_tick_times"), [ - # Before genesis: nothing ticks, the caller waits. - (-0.5, None, []), + # Before genesis: nothing ticks, the anchor interval is reported. + (-0.5, Interval(0), []), # Exactly at genesis: the store already sits at the anchor. (0.0, Interval(0), []), # One slot in: five ticks, no skip. @@ -246,10 +246,10 @@ class TestStartupTick: async def test_catches_store_up_to_wall_clock( self, intervals_elapsed: float, - expected_interval: Interval | None, + expected_interval: Interval, expected_tick_times: list[int], ) -> None: - """Startup ticks the store to the current interval, or reports waiting if pre-genesis.""" + """Startup ticks the store to the current interval and returns it.""" spec = ProbeSpec() now = 1000 + intervals_elapsed * INTERVAL_SECONDS service = make_service(spec, time_fn=lambda: now)