diff --git a/CLAUDE.md b/CLAUDE.md index 5b10c6dd..73dfcc3f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -366,24 +366,33 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads finalized boundary, signatures are pruned (`prune_old_block_signatures`) while headers and bodies are kept forever. `get_signed_block` returns `None` for a pruned finalized block +- States are stored as parent-linked diffs (`StateDiffs`, never pruned) plus + full-state snapshots (`States`) written only at 1024-slot anchors (and the + bootstrap). Neither is ever pruned. `get_state` returns an anchor snapshot or + reconstructs by walking diffs back to the nearest anchor; results are memoized + in an in-memory LRU (`STATE_CACHE_CAPACITY`) so recent reads stay hot - `LiveChain` table provides fast `(slot||root) → parent_root` index for fork choice - Storage uses trait-based API: `StorageBackend` → `StorageReadView` (reads) + `StorageWriteBatch` (atomic writes) -### Storage Tables (10) +### Storage Tables (7) + +These are the variants of the `Table` enum (`crates/storage/src/api/tables.rs`). | Table | Key → Value | Purpose | |-------|-------------|---------| | `BlockHeaders` | H256 → BlockHeader | Block headers by root | | `BlockBodies` | H256 → BlockBody | Block bodies (empty for genesis) | -| `BlockSignatures` | H256 → BlockSignatures | Signatures (absent for genesis) | -| `States` | H256 → State | Beacon states by root | -| `LatestKnownAttestations` | u64 → AttestationData | Fork-choice-active attestations | -| `LatestNewAttestations` | u64 → AttestationData | Pending (pre-promotion) attestations | -| `GossipSignatures` | SignatureKey → ValidatorSignature | Individual validator signatures | -| `AggregatedPayloads` | SignatureKey → Vec\ | Aggregated proofs | +| `BlockSignatures` | (slot\|\|root) → BlockSignatures | Type-2 proof blob; keyed slot\|\|root so pruning scans in slot order and stops early; absent for genesis, pruned below finalized | +| `States` | H256 → State | Full-state snapshots; bootstrap + 1024-slot anchors only; never pruned | +| `StateDiffs` | H256 → StateDiff | Parent-linked state diff per non-genesis state; never pruned | | `Metadata` | string → various | Store state (head, config, checkpoints) | | `LiveChain` | (slot\|\|root) → parent\_root | Fast fork choice traversal index | +Attestations and gossip signatures are **not** persisted tables; they live in +in-memory `Store` buffers (`new_payloads`, `known_payloads`, `gossip_signatures`) +and are consumed during the tick pipeline (promotion at intervals 0/4, +aggregation at interval 2). + ### State Root Computation - Always computed via `tree_hash_root()` after full state transition - Must match proposer's pre-computed `block.state_root` diff --git a/Cargo.lock b/Cargo.lock index 764d89b0..5c0ed65f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2170,6 +2170,8 @@ dependencies = [ "leansig", "libssz", "libssz-derive", + "libssz-types", + "lru", "rand 0.10.1", "rocksdb", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 2b20d590..100b1a4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ vergen-git2 = { version = "9", features = ["rustc"] } rayon = "1.11" rand = "0.10" +lru = "0.16" rocksdb = "0.24" libc = "0.2" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 8f7807d2..2d144dfb 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use ethlambda_state_transition::{is_proposer, slot_is_justifiable_after}; -use ethlambda_storage::{ForkCheckpoints, Store}; +use ethlambda_storage::{DiffBase, ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, attestation::{ @@ -556,6 +556,10 @@ fn on_block_core( let block = signed_block.message.clone(); + // Capture the diff base before the parent is consumed into the post-state + // (avoids cloning the multi-MB historical_block_hashes list). + let diff_base = DiffBase::from_state(block.parent_root, &parent_state); + // Execute state transition function to compute post-block state let state_transition_start = std::time::Instant::now(); let mut post_state = parent_state; @@ -576,9 +580,9 @@ fn on_block_core( store.update_checkpoints(ForkCheckpoints::new(store.head(), Some(justified), None)); } - // Store signed block and state + // Store signed block and state (as a parent-linked diff + snapshot) store.insert_signed_block(block_root, signed_block.clone()); - store.insert_state(block_root, post_state); + store.insert_state_with_diff(block_root, diff_base, post_state); for att in block.body.attestations.iter() { // Count each participating validator as a valid attestation. @@ -1251,7 +1255,13 @@ mod tests { let head_justified = Checkpoint { root: a, slot: 1 }; let mut head_state = State::from_genesis(1000, vec![]); head_state.latest_justified = head_justified; - store.insert_state(b, head_state); + // Persist `b`'s post-state via the diff API, diffed against the genesis + // anchor that already lives in the store. The base must describe the + // parent (genesis) state, not the target; `get_state(b)` then resolves + // via the cache or by replaying this diff onto the genesis snapshot. + let genesis_state = store.get_state(&genesis).expect("genesis state"); + let diff_base = DiffBase::from_state(genesis, &genesis_state); + store.insert_state_with_diff(b, diff_base, head_state); // Store's global justified latched onto a higher, off-head checkpoint, // as it would after a minority fork justified a slot the head never saw. diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index f5b2ca58..7381a53e 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -18,6 +18,9 @@ thiserror.workspace = true libssz.workspace = true libssz-derive.workspace = true +libssz-types.workspace = true + +lru.workspace = true [dev-dependencies] tempfile = "3" diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 5884f1f9..dcda1cbf 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -11,7 +11,16 @@ pub enum Table { /// All other blocks must have an entry in this table. BlockSignatures, /// State storage: H256 -> State + /// + /// Holds full-state snapshots only: the bootstrap anchor plus one anchor per + /// 1024-slot window. Never pruned. Non-anchor states live in `StateDiffs` and + /// are reconstructed on demand (memoized by an in-memory cache). States, + /// State diffs: H256 -> StateDiff + /// + /// Parent-linked diff written for every non-genesis state. Never pruned, so + /// it preserves full state history. See `get_state` for reconstruction. + StateDiffs, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -23,11 +32,12 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 6] = [ +pub const ALL_TABLES: [Table; 7] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, + Table::StateDiffs, Table::Metadata, Table::LiveChain, ]; @@ -40,6 +50,7 @@ impl Table { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", + Table::StateDiffs => "state_diffs", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index e278c8fe..58f6119f 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -11,15 +11,11 @@ use std::path::Path; use std::sync::Arc; /// Returns the column family name for a table. +/// +/// Delegates to [`Table::name`] so the CF name and the metrics label share a +/// single source of truth (and a new table only needs one mapping). fn cf_name(table: Table) -> &'static str { - match table { - Table::BlockHeaders => "block_headers", - Table::BlockBodies => "block_bodies", - Table::BlockSignatures => "block_signatures", - Table::States => "states", - Table::Metadata => "metadata", - Table::LiveChain => "live_chain", - } + table.name() } /// RocksDB storage backend. diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index de5f20df..13b6410d 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,7 +1,9 @@ mod api; pub mod backend; mod error; +mod state_diff; mod store; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; +pub use state_diff::DiffBase; pub use store::{ForkCheckpoints, GetForkchoiceStoreError, MAX_RESUMABLE_DB_STATE_AGE, Store}; diff --git a/crates/storage/src/state_diff.rs b/crates/storage/src/state_diff.rs new file mode 100644 index 00000000..3714b516 --- /dev/null +++ b/crates/storage/src/state_diff.rs @@ -0,0 +1,267 @@ +//! Parent-linked state diffs for diff-layer state storage. +//! +//! A [`StateDiff`] captures the change from a base state (the parent block's +//! post-state) to a target state, storing only what cannot be recovered from a +//! snapshot plus the parent relationship. +//! +//! Field handling: +//! - `config`, `validators`: never change; omitted (taken from the snapshot). +//! - `latest_block_header`: omitted; reconstructed from the `BlockHeaders` table. +//! - `historical_block_hashes`: pure-append in the STF, so only the appended +//! tail (`hbh_appended`) is stored. +//! - everything else: stored verbatim (the justification fields are bounded by +//! the non-finalized window, so they stay small under healthy finality). + +use ethlambda_types::{ + block::BlockHeader, + checkpoint::Checkpoint, + primitives::H256, + state::{ + HISTORICAL_ROOTS_LIMIT, JustificationRoots, JustificationValidators, JustifiedSlots, State, + }, +}; +use libssz_derive::{SszDecode, SszEncode}; +use libssz_types::SszList; + +/// Appended tail of `historical_block_hashes`, bounded by the same limit as the +/// full list. +pub type HistoricalBlockHashesTail = SszList; + +/// Describes the parent state a new state's diff is built against. +/// +/// Captured by the caller before the parent is consumed into the post-state, so +/// the store can build the diff and decide anchoring without re-reading it. +/// Construct via [`DiffBase::from_state`]; fields are crate-internal. +pub struct DiffBase { + /// Block root of the parent state (the diff's `base_root`). + pub(crate) root: H256, + /// Parent state's `historical_block_hashes` length. + pub(crate) hbh_len: usize, + /// Parent state's slot (used for the anchor-boundary check). + pub(crate) slot: u64, +} + +impl DiffBase { + /// Build the diff base from the parent state and its block root. + /// + /// `root` is the parent block root (the child's `parent_root`), passed in + /// since the caller already has it; `hbh_len` and `slot` are read from + /// `state`. Call this before the parent is consumed into the child. + pub fn from_state(root: H256, state: &State) -> Self { + Self { + root, + hbh_len: state.historical_block_hashes.len(), + slot: state.slot, + } + } +} + +/// The change from a base (parent) state to a target state. +/// +/// Reconstruct the target with [`StateDiff`] applied against the nearest +/// ancestor snapshot; see the storage layer's `get_state` for the walk. +#[derive(Debug, Clone, PartialEq, Eq, SszEncode, SszDecode)] +pub struct StateDiff { + /// Block root of the base state this diff is relative to (`block.parent_root`). + pub base_root: H256, + /// Target state's slot. + pub slot: u64, + /// Target state's latest justified checkpoint. + pub latest_justified: Checkpoint, + /// Target state's latest finalized checkpoint. + pub latest_finalized: Checkpoint, + /// Target state's `justified_slots` (stored in full). + pub justified_slots: JustifiedSlots, + /// Target state's `justifications_roots` (stored in full). + pub justifications_roots: JustificationRoots, + /// Target state's `justifications_validators` (stored in full). + pub justifications_validators: JustificationValidators, + /// Elements appended to `historical_block_hashes` relative to the base. + pub hbh_appended: HistoricalBlockHashesTail, +} + +impl StateDiff { + /// Build a diff from a consumed target state against a base identified by its + /// `historical_block_hashes` length. + /// + /// Takes `target` by value so the multi-MB justification fields are moved + /// into the diff rather than cloned. On the block-import path the base state + /// has already been consumed into `target`, so only its length is retained; + /// `base_hbh_len` is that length. + /// + /// # Assumptions about how the base is modified into the target + /// + /// The diff stores only part of `target` and is lossless *only* because the + /// state transition changes the base (parent) state in a restricted way. + /// `reconstruct` depends on each of these; a future STF that broke one would + /// make reconstructed states silently wrong, not just fail: + /// + /// - **`config` and `validators` are unchanged from base to target.** They + /// are not stored in the diff; reconstruction takes them from the nearest + /// ancestor snapshot. (The lean STF never mutates either: `validators` is + /// fixed at genesis and `config` is static.) + /// - **`historical_block_hashes` only grows by appending.** The base's list + /// is a prefix of the target's, so only the appended tail + /// (`target[base_hbh_len..]`) is stored and the earlier entries are never + /// reordered or rewritten. (`process_slots` pushes the parent root and + /// zero-fills skipped slots, leaving the existing prefix intact.) This is + /// why `base_hbh_len` alone is enough to identify the base's contribution. + /// - **`latest_block_header` is not stored here.** It is read back from the + /// `BlockHeaders` table during reconstruction; the persisted post-state + /// caches the real `state_root` there, so the two are byte-identical. + /// + /// All remaining fields (`slot`, both checkpoints, and the three + /// justification fields) are captured verbatim, so the diff makes no + /// assumption about how those change. + /// + /// # Panics + /// + /// Panics if `target.historical_block_hashes` is shorter than `base_hbh_len`, + /// i.e. the append-only assumption above was violated. + pub fn from_base(base_root: H256, base_hbh_len: usize, target: State) -> Self { + let State { + slot, + latest_justified, + latest_finalized, + historical_block_hashes, + justified_slots, + justifications_roots, + justifications_validators, + .. + } = target; + + let hbh = historical_block_hashes.into_inner(); + assert!( + hbh.len() >= base_hbh_len, + "target historical_block_hashes shorter than base: {} < {base_hbh_len}", + hbh.len() + ); + let hbh_appended = HistoricalBlockHashesTail::try_from(hbh[base_hbh_len..].to_vec()) + .expect("appended tail cannot exceed HISTORICAL_ROOTS_LIMIT"); + + Self { + base_root, + slot, + latest_justified, + latest_finalized, + justified_slots, + justifications_roots, + justifications_validators, + hbh_appended, + } + } +} + +/// Rebuild a state from a base snapshot and the diffs leading to the target. +/// +/// `diffs` are ordered from the snapshot's child up to the target (inclusive, +/// non-empty). `latest_block_header` is the target's header (kept in the +/// `BlockHeaders` table rather than the diff). `config`/`validators` come from +/// `snapshot` (they never change), `historical_block_hashes` is replayed from +/// the appended tails, and the remaining fields come from the last diff. +/// +/// # Panics +/// +/// Panics if `diffs` is empty. +pub(crate) fn reconstruct( + snapshot: State, + diffs: &[StateDiff], + latest_block_header: BlockHeader, +) -> State { + let target = diffs + .last() + .expect("reconstruct requires at least one diff"); + + let mut hbh: Vec = snapshot.historical_block_hashes.to_vec(); + for diff in diffs { + hbh.extend_from_slice(&diff.hbh_appended); + } + let historical_block_hashes = hbh + .try_into() + .expect("reconstructed historical_block_hashes within limit"); + + State { + config: snapshot.config, + slot: target.slot, + latest_block_header, + latest_justified: target.latest_justified, + latest_finalized: target.latest_finalized, + historical_block_hashes, + justified_slots: target.justified_slots.clone(), + validators: snapshot.validators, + justifications_roots: target.justifications_roots.clone(), + justifications_validators: target.justifications_validators.clone(), + } +} + +#[cfg(test)] +mod tests { + use ethlambda_types::state::{State, Validator}; + use libssz::{SszDecode, SszEncode}; + + use super::*; + + fn h256(byte: u8) -> H256 { + H256::from([byte; 32]) + } + + /// A minimal genesis-like base state with two validators. + fn base_state() -> State { + let validators = vec![ + Validator { + attestation_pubkey: [1u8; 52], + proposal_pubkey: [2u8; 52], + index: 0, + }, + Validator { + attestation_pubkey: [3u8; 52], + proposal_pubkey: [4u8; 52], + index: 1, + }, + ]; + State::from_genesis(1_000, validators) + } + + #[test] + fn from_base_captures_appended_tail_and_absolute_fields() { + let base = base_state(); + let base_len = base.historical_block_hashes.len(); + + let mut target = base.clone(); + target.slot = 5; + let expected_justified = Checkpoint { + root: h256(7), + slot: 4, + }; + target.latest_justified = expected_justified; + // Append three roots (one real parent + two zero-filled empty slots). + let mut hbh: Vec = base.historical_block_hashes.to_vec(); + hbh.extend([h256(9), H256::ZERO, H256::ZERO]); + target.historical_block_hashes = hbh.try_into().unwrap(); + + let diff = StateDiff::from_base(h256(1), base_len, target); + + assert_eq!(diff.base_root, h256(1)); + assert_eq!(diff.slot, 5); + assert_eq!(diff.latest_justified, expected_justified); + assert_eq!(diff.hbh_appended.len(), 3); + assert_eq!(diff.hbh_appended[0], h256(9)); + assert_eq!(diff.hbh_appended[1], H256::ZERO); + } + + #[test] + fn ssz_roundtrips() { + let base = base_state(); + let base_len = base.historical_block_hashes.len(); + let mut target = base.clone(); + target.slot = 2; + let mut hbh: Vec = base.historical_block_hashes.to_vec(); + hbh.push(h256(9)); + target.historical_block_hashes = hbh.try_into().unwrap(); + + let diff = StateDiff::from_base(h256(1), base_len, target); + let bytes = diff.to_ssz(); + let decoded = StateDiff::from_ssz_bytes(&bytes).expect("decodes"); + assert_eq!(diff, decoded); + } +} diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 0700b02a..32fdecc4 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,6 +1,9 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::num::NonZeroUsize; use std::sync::{Arc, LazyLock, Mutex}; +use lru::LruCache; + use crate::api::{StorageBackend, StorageWriteBatch, Table}; use crate::error::Error; @@ -15,6 +18,8 @@ use ethlambda_types::{ state::{ChainConfig, State, anchor_pair_is_consistent}, }; use libssz::{SszDecode, SszEncode}; + +use crate::state_diff::{DiffBase, StateDiff}; use thiserror::Error; use tracing::{info, warn}; @@ -84,8 +89,21 @@ const KEY_LATEST_JUSTIFIED: &[u8] = b"latest_justified"; /// Key for "latest_finalized" field of the Store. Its value has type [`Checkpoint`] and it's SSZ-encoded. const KEY_LATEST_FINALIZED: &[u8] = b"latest_finalized"; -/// ~3.3 hours of state history at 4-second slots (12000 / 4 = 3000). -const STATES_TO_KEEP: usize = 3_000; +/// Persist a full-state snapshot whenever a block's slot crosses a multiple of +/// this value (relative to its parent's slot). +/// +/// Snapshots are the only entries written to `States` (plus the bootstrap +/// anchor); they are never pruned and bound state-reconstruction diff walks to +/// at most this many steps. ~68 minutes at 4-second slots. +const SNAPSHOT_ANCHOR_INTERVAL: u64 = 1_024; + +/// Number of reconstructed/imported states memoized in memory. +/// +/// States are content-addressed by block root and immutable, so the cache never +/// needs invalidation; it only bounds how many recent states stay hot for reads +/// (e.g. a block's `parent_state` right after import). A miss falls back to a +/// snapshot read or a diff-chain reconstruction. +const STATE_CACHE_CAPACITY: usize = 32; /// Keep block signatures for at least this many slots below the tip, even once /// finalized. Signatures older than this window are pruned only when the window @@ -522,6 +540,15 @@ pub struct Store { known_payloads: Arc>, /// In-memory gossip signatures, consumed at interval 2 aggregation. gossip_signatures: Arc>, + /// LRU memoization of states by block root, shared across `Store` clones. + /// Avoids reconstructing recent states from diffs on every read. + state_cache: Arc>>, +} + +/// Build an empty state cache sized to [`STATE_CACHE_CAPACITY`]. +fn new_state_cache() -> Arc>> { + let capacity = NonZeroUsize::new(STATE_CACHE_CAPACITY).expect("cache capacity is non-zero"); + Arc::new(Mutex::new(LruCache::new(capacity))) } impl Store { @@ -594,6 +621,7 @@ impl Store { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + state_cache: new_state_cache(), }) } @@ -664,7 +692,9 @@ impl Store { .expect("put block body"); } - // State + // State snapshot. The anchor has no parent in the store, so it is + // the base of every diff chain: store it as a full snapshot in + // `States` (never pruned) so reconstruction always terminates here. let state_entries = vec![(anchor_block_root.to_ssz(), anchor_state.to_ssz())]; batch .put_batch(Table::States, state_entries) @@ -691,6 +721,7 @@ impl Store { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + state_cache: new_state_cache(), }) } @@ -814,29 +845,24 @@ impl Store { } } - /// Prune old states and blocks to keep storage bounded. + /// Bound storage by evicting old state snapshots and finalized signatures. + /// + /// State diffs, block headers, and block bodies are retained for the full + /// history; only full-state snapshots outside the hot window (diffs remain) + /// and signatures of finalized blocks are removed. /// /// This is separated from `update_checkpoints` so callers can defer heavy /// pruning until after a batch of blocks has been fully processed. Running - /// this mid-cascade would delete states that pending children still need, + /// this mid-cascade would delete snapshots that pending children still need, /// causing infinite re-processing loops when fallback pruning is active. pub fn prune_old_data(&mut self) { - let protected_roots = [ - self.latest_finalized().root, - self.latest_justified().root, - self.head(), - ]; let finalized_slot = self.latest_finalized().slot; let tip_slot = self .get_block_header(&self.head()) .map_or(finalized_slot, |header| header.slot); - let pruned_states = self.prune_old_states(&protected_roots); let pruned_signatures = self.prune_old_block_signatures(finalized_slot, tip_slot); - if pruned_states > 0 || pruned_signatures > 0 { - info!( - pruned_states, - pruned_signatures, "Pruned old states and block signatures" - ); + if pruned_signatures > 0 { + info!(pruned_signatures, "Pruned old finalized block signatures"); } } @@ -940,55 +966,6 @@ impl Store { pruned_new + pruned_known } - /// Prune old states beyond the retention window. - /// - /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any - /// states whose roots appear in `protected_roots` (finalized, justified). - /// - /// Returns the number of states pruned. - pub fn prune_old_states(&mut self, protected_roots: &[H256]) -> usize { - let view = self.backend.begin_read().expect("read view"); - - // Collect (root_bytes, slot) from BlockHeaders to determine state age. - let mut entries: Vec<(Vec, u64)> = view - .prefix_iterator(Table::BlockHeaders, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(key, value)| { - let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); - (key.to_vec(), header.slot) - }) - .collect(); - drop(view); - - if entries.len() <= STATES_TO_KEEP { - return 0; - } - - // Sort by slot descending (newest first) - entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); - - let protected: HashSet> = protected_roots.iter().map(|r| r.to_ssz()).collect(); - - // Skip the retention window, collect remaining keys for deletion - let keys_to_delete: Vec> = entries - .into_iter() - .skip(STATES_TO_KEEP) - .filter(|(key, _)| !protected.contains(key)) - .map(|(key, _)| key) - .collect(); - - let count = keys_to_delete.len(); - if count > 0 { - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::States, keys_to_delete) - .expect("delete old states"); - batch.commit().expect("commit"); - } - count - } - /// Prune signatures of old finalized blocks, keeping a recent window. /// /// Signatures within [`SIGNATURE_PRUNING_RANGE`] slots of `tip_slot` are @@ -1039,10 +1016,7 @@ impl Store { /// Get the block header by root. pub fn get_block_header(&self, root: &H256) -> Option { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::BlockHeaders, &root.to_ssz()) - .expect("get") - .map(|bytes| BlockHeader::from_ssz_bytes(&bytes).expect("valid header")) + self.get_ssz(Table::BlockHeaders, root) } // ============ Signed Blocks ============ @@ -1111,12 +1085,13 @@ impl Store { /// or if the signature row is missing for any block other than the /// slot-0 anchor. /// - /// Signatures are absent for genesis-style anchor blocks (no proposer - /// ever signed them). To keep BlocksByRoot symmetric with the - /// fork-choice view for peers, synthesize an empty proof for the slot-0 - /// case only; for any other slot the missing-signature state is treated - /// as storage corruption and surfaces as `None` rather than as a - /// fabricated block. + /// Signatures are absent in two cases: genesis-style anchor blocks (no + /// proposer ever signed them), and finalized blocks whose signatures were + /// pruned by [`prune_old_block_signatures`](Self::prune_old_block_signatures). + /// To keep BlocksByRoot symmetric with the fork-choice view for peers, + /// synthesize an empty proof for the slot-0 anchor only; for any other slot + /// a missing signature surfaces as `None` (a pruned finalized block can no + /// longer be served with its proof) rather than as a fabricated block. pub fn get_signed_block(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); let key = root.to_ssz(); @@ -1137,9 +1112,9 @@ impl Store { Some(proof_bytes) => { MultiMessageAggregate::from_ssz_bytes(&proof_bytes).expect("valid block proof") } - // Synthesis only covers the genesis-style anchor (slot 0). Any other - // missing-proof case is a storage corruption that should surface - // as `None` rather than fabricating a block with an empty proof. + // Synthesis only covers the genesis-style anchor (slot 0). For any + // other slot a missing proof (pruned finalized block, or genuine + // corruption) surfaces as `None` rather than a fabricated block. None if header.slot == 0 => MultiMessageAggregate::default(), None => return None, }; @@ -1155,26 +1130,107 @@ impl Store { // ============ States ============ /// Returns the state for the given block root. + /// + /// Fast path: a full snapshot in `States`. Otherwise the state is + /// reconstructed by walking parent-linked `StateDiffs` back to the nearest + /// ancestor snapshot and replaying forward. Returns `None` if the diff chain + /// is broken or the target block header is unavailable. pub fn get_state(&self, root: &H256) -> Option { + // Memoized hot states first (states are immutable per root). + if let Some(state) = self.state_cache.lock().unwrap().get(root) { + return Some(state.clone()); + } + // Anchor snapshot in `States`, otherwise reconstruct from the diff chain. + let state = self + .get_ssz::(Table::States, root) + .or_else(|| self.reconstruct_state(root))?; + self.state_cache.lock().unwrap().put(*root, state.clone()); + Some(state) + } + + /// Read and SSZ-decode a value keyed by block root from `table`. + fn get_ssz(&self, table: Table, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); - view.get(Table::States, &root.to_ssz()) + view.get(table, &root.to_ssz()) .expect("get") - .map(|bytes| State::from_ssz_bytes(&bytes).expect("valid state")) + .map(|bytes| T::from_ssz_bytes(&bytes).expect("valid encoding")) } - /// Returns whether a state exists for the given block root. + /// Reconstruct a state from diffs and the nearest ancestor snapshot. + /// + /// Walks `base_root` pointers back until a snapshot is found, fetches the + /// target's block header, and delegates the assembly to + /// [`state_diff::reconstruct`](crate::state_diff::reconstruct). + fn reconstruct_state(&self, root: &H256) -> Option { + // Walk back collecting diffs until we reach a snapshot. + let mut diffs: Vec = Vec::new(); + let mut cursor = *root; + let snapshot = loop { + if let Some(snapshot) = self.get_ssz::(Table::States, &cursor) { + break snapshot; + } + let diff = self.get_ssz::(Table::StateDiffs, &cursor)?; + cursor = diff.base_root; + diffs.push(diff); + }; + + // `diffs` runs target -> snapshot child; reverse to snapshot child -> target. + diffs.reverse(); + + // The latest block header lives in BlockHeaders; the stored state caches + // the real state_root there, so it equals the header byte-for-byte. + let latest_block_header = self.get_block_header(root)?; + + Some(crate::state_diff::reconstruct( + snapshot, + &diffs, + latest_block_header, + )) + } + + /// Returns whether a state is available for the given block root. + /// + /// True if a snapshot exists or the state can be reconstructed from a diff. pub fn has_state(&self, root: &H256) -> bool { let view = self.backend.begin_read().expect("read view"); - view.get(Table::States, &root.to_ssz()) - .expect("get") - .is_some() + let key = root.to_ssz(); + view.get(Table::States, &key).expect("get").is_some() + || view.get(Table::StateDiffs, &key).expect("get").is_some() } - /// Stores a state indexed by block root. - pub fn insert_state(&mut self, root: H256, state: State) { + /// Persist a post-block state as a parent-linked diff, snapshotting at anchors. + /// + /// Every non-genesis state gets a `StateDiffs` entry (never pruned, so the + /// full state history is preserved). A full snapshot is written to `States` + /// only when the block crosses a [`SNAPSHOT_ANCHOR_INTERVAL`] boundary; these + /// anchors are never pruned and bound the reconstruction walk. The state is + /// also inserted into the in-memory cache so the immediate next read (e.g. as + /// a child block's parent state) is hot without reconstruction. + /// + /// `base` describes the parent state the diff is built against (see + /// [`DiffBase`]); its fields are captured before the parent is consumed into + /// `state`. + pub fn insert_state_with_diff(&mut self, root: H256, base: DiffBase, state: State) { + let slot = state.slot; + let is_anchor = slot / SNAPSHOT_ANCHOR_INTERVAL > base.slot / SNAPSHOT_ANCHOR_INTERVAL; + + // Snapshot only at anchors; serialize before `state` is consumed. + let snapshot_bytes = is_anchor.then(|| state.to_ssz()); + // Memoize the post-state for fast reads, then move it into the diff so + // its multi-MB justification fields are not cloned again. + self.state_cache.lock().unwrap().put(root, state.clone()); + let diff_bytes = StateDiff::from_base(base.root, base.hbh_len, state).to_ssz(); + + let key = root.to_ssz(); let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(root.to_ssz(), state.to_ssz())]; - batch.put_batch(Table::States, entries).expect("put state"); + batch + .put_batch(Table::StateDiffs, vec![(key.clone(), diff_bytes)]) + .expect("put state diff"); + if let Some(snapshot_bytes) = snapshot_bytes { + batch + .put_batch(Table::States, vec![(key, snapshot_bytes)]) + .expect("put state snapshot"); + } batch.commit().expect("commit"); } @@ -1487,13 +1543,12 @@ mod tests { batch.commit().expect("commit"); } - /// Insert a dummy state for a given root. - fn insert_state(backend: &dyn StorageBackend, root: H256) { + /// Insert a real full-state snapshot for a given root (seeds a diff-chain base). + fn insert_snapshot(backend: &dyn StorageBackend, root: H256, state: &State) { let mut batch = backend.begin_write().expect("write batch"); - let key = root.to_ssz(); batch - .put_batch(Table::States, vec![(key, vec![0u8; 4])]) - .expect("put state"); + .put_batch(Table::States, vec![(root.to_ssz(), state.to_ssz())]) + .expect("put snapshot"); batch.commit().expect("commit"); } @@ -1538,6 +1593,7 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + state_cache: new_state_cache(), } } @@ -1551,6 +1607,7 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + state_cache: new_state_cache(), } } } @@ -1621,193 +1678,128 @@ mod tests { assert_eq!(count_entries(backend.as_ref(), Table::BlockSignatures), 10); } - // ============ State Pruning Tests ============ + // ============ State Diff Reconstruction Tests ============ - #[test] - fn prune_old_states_within_retention() { - let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store::test_store_with_backend(backend.clone()); + use ethlambda_types::state::Validator; - // Insert STATES_TO_KEEP headers + states - for i in 0..STATES_TO_KEEP as u64 { - insert_header(backend.as_ref(), root(i), i); - insert_state(backend.as_ref(), root(i)); + /// The header `insert_header` writes for a given slot. + fn header_at(slot: u64) -> BlockHeader { + BlockHeader { + slot, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body_root: H256::ZERO, } - assert_eq!( - count_entries(backend.as_ref(), Table::States), - STATES_TO_KEEP - ); - - let pruned = store.prune_old_states(&[]); - assert_eq!(pruned, 0); } - #[test] - fn prune_old_states_exceeding_retention() { - let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store::test_store_with_backend(backend.clone()); - - let total = STATES_TO_KEEP + 5; - for i in 0..total as u64 { - insert_header(backend.as_ref(), root(i), i); - insert_state(backend.as_ref(), root(i)); - } - assert_eq!(count_entries(backend.as_ref(), Table::States), total); - - let pruned = store.prune_old_states(&[]); - assert_eq!(pruned, 5); - assert_eq!( - count_entries(backend.as_ref(), Table::States), - STATES_TO_KEEP - ); - - // Oldest states should be gone - for i in 0..5u64 { - assert!(!has_key(backend.as_ref(), Table::States, &root(i))); - } - // Newest states should remain - for i in 5..total as u64 { - assert!(has_key(backend.as_ref(), Table::States, &root(i))); - } + /// A real `State` at `slot` with the given historical_block_hashes and a + /// `latest_block_header` matching what `insert_header` stores. + fn sample_state(slot: u64, hbh: Vec) -> State { + let validators = vec![Validator { + attestation_pubkey: [7u8; 52], + proposal_pubkey: [9u8; 52], + index: 0, + }]; + let mut state = State::from_genesis(1_000, validators); + state.slot = slot; + state.latest_block_header = header_at(slot); + state.historical_block_hashes = hbh.try_into().unwrap(); + state } #[test] - fn prune_old_states_preserves_protected() { + fn get_state_reconstructs_from_diff() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); - let total = STATES_TO_KEEP + 5; - for i in 0..total as u64 { - insert_header(backend.as_ref(), root(i), i); - insert_state(backend.as_ref(), root(i)); - } - - let finalized_root = root(0); - let justified_root = root(2); - let pruned = store.prune_old_states(&[finalized_root, justified_root]); + // Genesis snapshot at slot 0. + let s0 = sample_state(0, vec![]); + let r0 = root(0); + insert_header(backend.as_ref(), r0, 0); + insert_snapshot(backend.as_ref(), r0, &s0); + + // Child at slot 1: appends one historical root, sets a checkpoint. + let r1 = root(1); + let mut s1 = sample_state(1, vec![root(42)]); + s1.latest_justified = Checkpoint { + root: root(7), + slot: 0, + }; + insert_header(backend.as_ref(), r1, 1); + let base = DiffBase::from_state(r0, &s0); + store.insert_state_with_diff(r1, base, s1.clone()); - // 5 would be pruned, but 2 are protected - assert_eq!(pruned, 3); - assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); - assert!(has_key(backend.as_ref(), Table::States, &justified_root)); - } + // Not an anchor, so no snapshot was written; only the diff. + assert!(!has_key(backend.as_ref(), Table::States, &r1)); - // ============ Periodic Pruning Tests ============ + // Hot path: the just-imported state is memoized in the cache. + assert_eq!(store.get_state(&r1).unwrap().to_ssz(), s1.to_ssz()); - /// Set up finalized and justified checkpoints in metadata. - fn set_checkpoints(backend: &dyn StorageBackend, finalized: Checkpoint, justified: Checkpoint) { - let mut batch = backend.begin_write().expect("write batch"); - batch - .put_batch( - Table::Metadata, - vec![ - (KEY_LATEST_FINALIZED.to_vec(), finalized.to_ssz()), - (KEY_LATEST_JUSTIFIED.to_vec(), justified.to_ssz()), - ], - ) - .expect("put checkpoints"); - batch.commit().expect("commit"); + // A cold store (empty cache, shared backend) reconstructs from the diff, + // byte-identically. + let cold = Store::test_store_with_backend(backend.clone()); + let reconstructed = cold.get_state(&r1).expect("reconstructs from diff"); + assert_eq!(reconstructed.to_ssz(), s1.to_ssz()); } #[test] - fn fallback_pruning_removes_old_states_and_blocks() { + fn get_state_reconstructs_across_multiple_diffs() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); - // Use roots that are within the retention window as finalized/justified - let finalized_root = root(0); - let justified_root = root(1); - set_checkpoints( - backend.as_ref(), - Checkpoint { - slot: 0, - root: finalized_root, - }, - Checkpoint { - slot: 1, - root: justified_root, - }, - ); - - // Insert more than STATES_TO_KEEP headers + states. - let total_states = STATES_TO_KEEP + 5; - for i in 0..total_states as u64 { - insert_header(backend.as_ref(), root(i), i); - insert_state(backend.as_ref(), root(i)); - } - - assert_eq!(count_entries(backend.as_ref(), Table::States), total_states); - assert_eq!( - count_entries(backend.as_ref(), Table::BlockHeaders), - total_states - ); - - // Use the last inserted root as head. Calling update_checkpoints with - // head_only triggers the fallback path (finalization doesn't advance). - let head_root = root(total_states as u64 - 1); - store.update_checkpoints(ForkCheckpoints::head_only(head_root)); - - // update_checkpoints no longer prunes states/blocks inline — the caller - // must invoke prune_old_data() separately (after a block cascade completes). - assert_eq!(count_entries(backend.as_ref(), Table::States), total_states); - - store.prune_old_data(); - - // 3005 headers total. Top 3000 by slot are kept in the retention window, - // leaving 5 candidates. 2 are protected (finalized + justified), - // so 3 are pruned → 3005 - 3 = 3002 states remaining. - assert_eq!( - count_entries(backend.as_ref(), Table::States), - STATES_TO_KEEP + 2 - ); - // Finalized and justified states must survive - assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); - assert!(has_key(backend.as_ref(), Table::States, &justified_root)); - - // Headers and bodies are never pruned, so all are retained. - assert_eq!( - count_entries(backend.as_ref(), Table::BlockHeaders), - total_states - ); + // Snapshot s0, then two chained diffs s1 -> s2. + let s0 = sample_state(0, vec![]); + let r0 = root(0); + insert_header(backend.as_ref(), r0, 0); + insert_snapshot(backend.as_ref(), r0, &s0); + + let r1 = root(1); + let s1 = sample_state(1, vec![root(42)]); + insert_header(backend.as_ref(), r1, 1); + let base = DiffBase::from_state(r0, &s0); + store.insert_state_with_diff(r1, base, s1.clone()); + + let r2 = root(2); + let s2 = sample_state(2, vec![root(42), root(43)]); + insert_header(backend.as_ref(), r2, 2); + let base = DiffBase::from_state(r1, &s1); + store.insert_state_with_diff(r2, base, s2.clone()); + + // Neither child is an anchor, so a cold store reconstructs s2 by walking + // the diff chain back to the s0 snapshot. + assert!(!has_key(backend.as_ref(), Table::States, &r1)); + assert!(!has_key(backend.as_ref(), Table::States, &r2)); + let cold = Store::test_store_with_backend(backend.clone()); + let reconstructed = cold.get_state(&r2).expect("reconstructs across diffs"); + assert_eq!(reconstructed.to_ssz(), s2.to_ssz()); } #[test] - fn fallback_pruning_no_op_within_retention() { + fn insert_state_with_diff_snapshots_only_on_boundary_crossing() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); - set_checkpoints( - backend.as_ref(), - Checkpoint { - slot: 0, - root: root(0), - }, - Checkpoint { - slot: 0, - root: root(0), - }, - ); - - // Insert exactly STATES_TO_KEEP entries (no excess) - for i in 0..STATES_TO_KEEP as u64 { - insert_header(backend.as_ref(), root(i), i); - insert_state(backend.as_ref(), root(i)); - } + let s0 = sample_state(SNAPSHOT_ANCHOR_INTERVAL - 1, vec![]); + let r0 = root(0); + insert_header(backend.as_ref(), r0, s0.slot); + insert_snapshot(backend.as_ref(), r0, &s0); - // Use the last inserted root as head - let head_root = root(STATES_TO_KEEP as u64 - 1); - store.update_checkpoints(ForkCheckpoints::head_only(head_root)); - store.prune_old_data(); + // Crossing the interval boundary records an anchor. + let r1 = root(1); + let s1 = sample_state(SNAPSHOT_ANCHOR_INTERVAL, vec![root(42)]); + insert_header(backend.as_ref(), r1, s1.slot); + let base = DiffBase::from_state(r0, &s0); + store.insert_state_with_diff(r1, base, s1.clone()); + assert!(has_key(backend.as_ref(), Table::States, &r1)); - // Nothing should be pruned (within retention window) - assert_eq!( - count_entries(backend.as_ref(), Table::States), - STATES_TO_KEEP - ); - assert_eq!( - count_entries(backend.as_ref(), Table::BlockHeaders), - STATES_TO_KEEP - ); + // A non-crossing child does not. + let r2 = root(2); + let s2 = sample_state(SNAPSHOT_ANCHOR_INTERVAL + 1, vec![root(42), root(43)]); + insert_header(backend.as_ref(), r2, s2.slot); + let base = DiffBase::from_state(r1, &s1); + store.insert_state_with_diff(r2, base, s2.clone()); + assert!(!has_key(backend.as_ref(), Table::States, &r2)); } // ============ PayloadBuffer Tests ============ @@ -2600,6 +2592,17 @@ mod tests { assert!(store.get_signed_block(&root).is_none()); } + /// The bootstrap anchor is stored as a full snapshot in `States`, the base of + /// every diff chain that reconstruction terminates at. + #[test] + fn from_anchor_state_stores_bootstrap_snapshot() { + let backend: Arc = Arc::new(InMemoryBackend::new()); + let store = Store::from_anchor_state(backend.clone(), State::from_genesis(0, vec![])); + + let anchor_root = store.head(); + assert!(has_key(backend.as_ref(), Table::States, &anchor_root)); + } + // ============ from_db_state Tests ============ #[test]