diff --git a/CLAUDE.md b/CLAUDE.md index 5b10c6dd..10f535d1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -47,17 +47,17 @@ crates/ ### Tick-Based Validator Duties (4-second slots, 5 intervals per slot) ``` -Interval 0: Block proposal → accept attestations if proposal exists +Interval 0: Block published (at the slot boundary). The build+publish code path is merged into the previous slot's interval 4 (see below) and aligned to publish here; no attestation acceptance happens at interval 0. Interval 1: Attestation production (all validators, including proposer) Interval 2: Aggregation (aggregators create proofs from gossip signatures) Interval 3: Safe target update (fork choice) -Interval 4: Accept accumulated attestations +Interval 4: Accept accumulated attestations; build the NEXT slot's block and publish it aligned to that slot's interval 0 (build and publish merged into this tick) ``` ### Attestation Pipeline ``` Gossip → Signature verification → new_attestations (pending) - ↓ (intervals 0/4) + ↓ (interval 4) promote → known_attestations (fork choice active) ↓ Fork choice head update diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 4360377d..2a2178d3 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -252,24 +252,24 @@ impl BlockChainServer { self.pre_merge_coverage = Some(snapshot); } - let scheduled_proposer = (interval == 0 && slot > 0) + // Whether one of our validators proposes this slot. Drives the store's + // interval-0 attestation acceptance. + let is_proposer = (interval == 0 && slot > 0) .then(|| self.get_our_proposer(slot)) - .flatten(); - let is_proposer = scheduled_proposer.is_some(); + .flatten() + .is_some(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal store::on_tick(&mut self.store, timestamp_ms, is_proposer); // ==== interval 0 ==== - // Now build and publish the block (after attestations have been accepted) - if let Some(validator_id) = scheduled_proposer { - if self.sync_status.duties_allowed() { - self.propose_block(slot, validator_id); - } else { - info!(%slot, %validator_id, "Skipping block proposal while syncing"); - } - } + // No actor work at interval 0. The block is published here conceptually + // (at the slot boundary), but the build+publish code path runs at + // interval 4 of the previous slot — where it also advances the store to + // this slot's interval 0 before building (see `propose_block`). The real + // interval-0 tick is then skipped by the idempotency guard above, since + // the store clock is already here. // ==== interval 1 ==== @@ -316,7 +316,22 @@ impl BlockChainServer { // ==== interval 4 ==== - // Handled by the pre-tick snapshot above. + // Build and publish the NEXT slot's block here, one interval early, so + // the heavy leanVM work happens during this otherwise-idle interval. + // `propose_block` blocks the actor for the build and aligns publication + // to the slot boundary. Doing the whole proposal here — rather than + // stashing it for the interval-0 tick — keeps it robust: `handle_tick` + // skips the interval-0 tick whenever this build overruns its interval. + if interval == 4 { + let next_slot = slot + 1; + let next_proposer = self + .get_our_proposer(next_slot) + .filter(|_| self.sync_status.duties_allowed()); + + if let Some(validator_id) = next_proposer { + self.propose_block(next_slot, validator_id).await; + } + } // Update safe target slot metric (updated by store.on_tick at interval 3) metrics::update_safe_target_slot(self.store.safe_target_slot()); @@ -383,78 +398,45 @@ impl BlockChainServer { }); } - /// Returns the validator ID if any of our validators is the proposer for this slot. - fn get_our_proposer(&self, slot: u64) -> Option { - let head_state = self.store.head_state(); - let num_validators = head_state.validators.len() as u64; - - self.key_manager - .validator_ids() - .into_iter() - .find(|&vid| is_proposer(vid, slot, num_validators)) - } - - fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) { - let _timing = metrics::time_attestations_production(); - - // Produce attestation data once for all validators - let attestation_data = store::produce_attestation_data(&self.store, slot); - - // For each registered validator, produce and publish attestation - for validator_id in self.key_manager.validator_ids() { - // Sign the attestation - let Ok(signature) = self - .key_manager - .sign_attestation(validator_id, &attestation_data) - .inspect_err( - |err| error!(%slot, %validator_id, %err, "Failed to sign attestation"), - ) - else { - continue; - }; - - // Create signed attestation - let signed_attestation = SignedAttestation { - validator_id, - data: attestation_data.clone(), - signature, - }; - - // Self-deliver: store our own attestation locally for aggregation. - // Gossipsub does not deliver messages back to the sender, so without - // this the aggregator never sees its own validator's signature in - // gossip_signatures and it is excluded from aggregated proofs. - if is_aggregator { - let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation, true) - .inspect_err(|err| { - warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed") - }); - } - - // Publish to gossip network - if let Some(ref p2p) = self.p2p { - let _ = p2p.publish_attestation(signed_attestation).inspect_err( - |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), - ); - info!(%slot, %validator_id, "Published attestation"); - } - } - } - - /// Build and publish a block for the given slot and validator. - fn propose_block(&mut self, slot: u64, validator_id: u64) { + /// Build the target slot's block and publish it, one interval early. + /// + /// Runs at the previous slot's interval 4, blocking the actor for the build + /// (the expensive part is the leanVM Type-1 → Type-2 merge). It first + /// advances the store to the target slot's interval 0 (accepting + /// attestations) so the block is built on exactly the interval-0 state a + /// non-prebuilding proposer would see, then builds and publishes — aligned + /// to the slot boundary: if the build finishes before the slot opens we wait + /// out the remainder so the block is not published early; if it overran (the + /// common case under load) we publish at once. The whole proposal is + /// self-contained here, so it never depends on the interval-0 tick — which + /// `handle_tick` skips whenever this build overruns its interval. + async fn propose_block(&mut self, slot: u64, validator_id: u64) { info!(%slot, %validator_id, "We are the proposer for this slot"); + let genesis_time_ms = self.store.config().genesis_time * 1000; + let slot_start_ms = genesis_time_ms + slot * MILLISECONDS_PER_SLOT; + + // Advance the store to this slot's interval 0 — one interval ahead of the + // interval-4 tick we are running in — accepting attestations exactly as + // the real interval-0 tick would, so the block is built on the interval-0 + // state rather than the previous slot's end state. Building early is safe + // because we publish below (nothing is stashed for a later tick), and the + // real interval-0 tick is then skipped by the idempotency guard in + // `on_tick`, since the store clock is already here. + store::on_tick(&mut self.store, slot_start_ms, true); + let parent_root = self.store.head(); + + // Build the block on the interval-0 head. let _timing = metrics::time_block_building(); - - // Build the block with attestation signatures - let Ok((block, type_one_proofs, _post_checkpoints)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) - else { - metrics::inc_block_building_failures(); - return; - }; + let (block, type_one_proofs, _post_checkpoints) = + match store::produce_block_on_head(&mut self.store, slot, validator_id, parent_root) { + Ok(built) => built, + Err(err) => { + error!(%slot, %validator_id, %err, "Failed to build block"); + metrics::inc_block_building_failures(); + return; + } + }; coverage::emit_proposal_coverage( &self.store, @@ -462,7 +444,7 @@ impl BlockChainServer { block.body.attestations.iter(), ); - // Sign the block root with the proposal key + // Sign the block root with the proposal key. let block_root = block.hash_tree_root(); let Ok(proposer_signature) = self .key_manager @@ -473,9 +455,8 @@ impl BlockChainServer { return; }; - // Assemble SignedBlock: wrap the proposer's raw XMSS signature into a - // singleton Type-1 SNARK, then merge it with every attestation Type-1 - // into the block's single Type-2 proof. + // Wrap the proposer's raw XMSS signature into a singleton Type-1 SNARK, + // then merge it with every attestation Type-1 into the single Type-2. let head_state = self.store.head_state(); let validators = &head_state.validators; let Some(proposer_validator) = validators.get(validator_id as usize) else { @@ -565,23 +546,102 @@ impl BlockChainServer { return; } }; - // `type_one_proofs` is no longer needed past this point. - drop(type_one_proofs); let signed_block = SignedBlock { message: block, proof, }; - // Process the block locally before publishing + // Stop timing here: the build is done, and the alignment wait below must + // not count toward the block-building metric. + drop(_timing); + + let now_ms = unix_now_ms(); + + // Align publication to the slot boundary. If the build finished before + // the slot opened, wait out the remainder so the block is not published + // early; if it overran, publish immediately. + if now_ms < genesis_time_ms + slot * crate::MILLISECONDS_PER_SLOT { + let wait_ms = slot_start_ms.saturating_sub(now_ms); + tokio::time::sleep(Duration::from_millis(wait_ms)).await; + } + + self.process_and_publish_block(slot, validator_id, signed_block); + } + + /// Returns the validator ID if any of our validators is the proposer for this slot. + fn get_our_proposer(&self, slot: u64) -> Option { + let head_state = self.store.head_state(); + let num_validators = head_state.validators.len() as u64; + + self.key_manager + .validator_ids() + .into_iter() + .find(|&vid| is_proposer(vid, slot, num_validators)) + } + + fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) { + let _timing = metrics::time_attestations_production(); + + // Produce attestation data once for all validators + let attestation_data = store::produce_attestation_data(&self.store, slot); + + // For each registered validator, produce and publish attestation + for validator_id in self.key_manager.validator_ids() { + // Sign the attestation + let Ok(signature) = self + .key_manager + .sign_attestation(validator_id, &attestation_data) + .inspect_err( + |err| error!(%slot, %validator_id, %err, "Failed to sign attestation"), + ) + else { + continue; + }; + + // Create signed attestation + let signed_attestation = SignedAttestation { + validator_id, + data: attestation_data.clone(), + signature, + }; + + // Self-deliver: store our own attestation locally for aggregation. + // Gossipsub does not deliver messages back to the sender, so without + // this the aggregator never sees its own validator's signature in + // gossip_signatures and it is excluded from aggregated proofs. + if is_aggregator { + let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation, true) + .inspect_err(|err| { + warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed") + }); + } + + // Publish to gossip network + if let Some(ref p2p) = self.p2p { + let _ = p2p.publish_attestation(signed_attestation).inspect_err( + |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), + ); + info!(%slot, %validator_id, "Published attestation"); + } + } + } + + /// Import a freshly built block locally, then publish it to gossip. On + /// import failure, logs and counts it, and returns without publishing. + fn process_and_publish_block( + &mut self, + slot: u64, + validator_id: u64, + signed_block: SignedBlock, + ) { if let Err(err) = self.process_block(signed_block.clone()) { error!(%slot, %validator_id, %err, "Failed to process built block"); metrics::inc_block_building_failures(); return; - }; + } metrics::inc_block_building_success(); - // Publish to gossip network if let Some(ref p2p) = self.p2p { let _ = p2p .publish_block(signed_block) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 8f7807d2..69953138 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -18,7 +18,7 @@ use tracing::{info, trace, warn}; use crate::{ GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA, - MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, + MILLISECONDS_PER_INTERVAL, block_builder::{PostBlockCheckpoints, build_block}, metrics, }; @@ -750,41 +750,25 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { } } -/// Get the head for block proposal at the given slot. +/// Produce a block and per-aggregated-attestation signature payloads on top of +/// `head_root`, without moving the store clock. /// -/// Ensures store is up-to-date and processes any pending attestations -/// before returning the canonical head. -fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { - // Calculate time corresponding to this slot - let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; - - // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true); - - // Process any pending attestations before proposal - accept_new_attestations(store, false); - - store.head() -} - -/// Produce a block and per-aggregated-attestation signature payloads for the target slot. -/// -/// Returns the finalized block and attestation signature payloads aligned -/// with `block.body.attestations`. -pub fn produce_block_with_signatures( +/// Returns the block and attestation signature payloads aligned with +/// `block.body.attestations`. The proposer resolves `head_root` from +/// [`Store::head`] at the previous slot's interval 4 (read-only); the build +/// must not tick the store, which would advance the clock an interval early. +pub(crate) fn produce_block_on_head( store: &mut Store, slot: u64, validator_index: u64, + head_root: H256, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { - // Get parent block and state to build upon - let head_root = get_proposal_head(store, slot); let head_state = store .get_state(&head_root) .ok_or(StoreError::MissingParentState { parent_root: head_root, slot, - })? - .clone(); + })?; // Validate proposer authorization for this slot let num_validators = head_state.validators.len() as u64; diff --git a/docs/infographics/ethlambda_architecture.html b/docs/infographics/ethlambda_architecture.html index 466483ae..c05b58f9 100644 --- a/docs/infographics/ethlambda_architecture.html +++ b/docs/infographics/ethlambda_architecture.html @@ -751,7 +751,7 @@

ethlambda

'Two-phase attestation pipeline: new -> known', 'on_tick(): advance slot, promote attestations at intervals 0/3', 'on_block(): state transition + fork choice + storage persist', - 'produce_block_with_signatures(): block building for proposers', + 'produce_block_on_head(): block building for proposers', 'justified_slots: relative indexing from finalized_slot', 'Fallback pruning for stalled finalization', ],