diff --git a/CLAUDE.md b/CLAUDE.md index 8807e15e..5b10c6dd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -362,7 +362,10 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads - Blocks are split into three tables: `BlockHeaders`, `BlockBodies`, `BlockSignatures` - Genesis/anchor blocks have empty bodies (detected via `EMPTY_BODY_ROOT`) — no entry in `BlockBodies` - Genesis block has no signatures — no entry in `BlockSignatures` -- All other blocks must have entries in all three tables +- Non-genesis blocks have a `BlockSignatures` entry until finalized: once below the + finalized boundary, signatures are pruned (`prune_old_block_signatures`) while + headers and bodies are kept forever. `get_signed_block` returns `None` for a + pruned finalized block - `LiveChain` table provides fast `(slot||root) → parent_root` index for fork choice - Storage uses trait-based API: `StorageBackend` → `StorageReadView` (reads) + `StorageWriteBatch` (atomic writes) diff --git a/crates/storage/src/backend/in_memory.rs b/crates/storage/src/backend/in_memory.rs index 00b9da33..b8af55d1 100644 --- a/crates/storage/src/backend/in_memory.rs +++ b/crates/storage/src/backend/in_memory.rs @@ -78,12 +78,20 @@ impl StorageReadView for InMemoryReadView<'_> { prefix: &[u8], ) -> Result + '_>, Error> { let table_data = self.guard.get(&table).expect("table exists"); - let prefix_owned = prefix.to_vec(); - let iter = table_data + // Collect and sort by key so iteration order matches the RocksDB backend + // (lexicographic). Callers rely on this for early-stop range scans over + // slot||root keys (e.g. signature/live-chain pruning). + let mut items: Vec<(Vec, Vec)> = table_data .iter() - .filter(move |(k, _)| k.starts_with(&prefix_owned)) - .map(|(k, v)| Ok((k.clone().into_boxed_slice(), v.clone().into_boxed_slice()))); + .filter(|(k, _)| k.starts_with(prefix)) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + items.sort_by(|a, b| a.0.cmp(&b.0)); + + let iter = items + .into_iter() + .map(|(k, v)| Ok((k.into_boxed_slice(), v.into_boxed_slice()))); Ok(Box::new(iter)) } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 3aea2a18..44e61d6a 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -84,20 +84,18 @@ const KEY_LATEST_JUSTIFIED: &[u8] = b"latest_justified"; /// Key for "latest_finalized" field of the Store. Its value has type [`Checkpoint`] and it's SSZ-encoded. const KEY_LATEST_FINALIZED: &[u8] = b"latest_finalized"; -/// ~1 day of block history at 4-second slots (86400 / 4 = 21600). -const BLOCKS_TO_KEEP: usize = 21_600; - /// ~3.3 hours of state history at 4-second slots (12000 / 4 = 3000). const STATES_TO_KEEP: usize = 3_000; +/// Keep block signatures for at least this many slots below the tip, even once +/// finalized. Signatures older than this window are pruned only when the window +/// lies entirely within finalized history; see [`Store::prune_old_block_signatures`]. +/// ~1 day at 4-second slots. +const SIGNATURE_PRUNING_RANGE: u64 = 21_600; + /// ~30 minutes of resume window at 4-second slots (1800 / 4 = 450). pub const MAX_RESUMABLE_DB_STATE_AGE: u64 = 450; -const _: () = assert!( - BLOCKS_TO_KEEP >= STATES_TO_KEEP, - "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" -); - /// Hard cap for the known aggregated payload buffer (number of distinct attestation messages). /// With 1 attestation/slot, this holds ~500 messages (~33 min at 4s/slot). const AGGREGATED_PAYLOAD_CAP: usize = 512; @@ -463,14 +461,14 @@ impl GossipSignatureBuffer { /// Encode a LiveChain key (slot, root) to bytes. /// Layout: slot (8 bytes big-endian) || root (32 bytes) /// Big-endian ensures lexicographic ordering matches numeric ordering. -fn encode_live_chain_key(slot: u64, root: &H256) -> Vec { +fn encode_slot_root_key(slot: u64, root: &H256) -> Vec { let mut result = slot.to_be_bytes().to_vec(); result.extend_from_slice(&root.0); result } -/// Decode a LiveChain key from bytes. -fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { +/// Decode a slot||root key (LiveChain / BlockSignatures) from bytes. +fn decode_slot_root_key(bytes: &[u8]) -> (u64, H256) { let slot = u64::from_be_bytes(bytes[..8].try_into().expect("valid slot bytes")); let root = H256::from_slice(&bytes[8..]); (slot, root) @@ -648,7 +646,7 @@ impl Store { // Live chain index let index_entries = vec![( - encode_live_chain_key(anchor_state.latest_block_header.slot, &anchor_block_root), + encode_slot_root_key(anchor_state.latest_block_header.slot, &anchor_block_root), anchor_state.latest_block_header.parent_root.to_ssz(), )]; batch @@ -802,10 +800,17 @@ impl Store { self.latest_justified().root, self.head(), ]; + let finalized_slot = self.latest_finalized().slot; + let tip_slot = self + .get_block_header(&self.head()) + .map_or(finalized_slot, |header| header.slot); let pruned_states = self.prune_old_states(&protected_roots); - let pruned_blocks = self.prune_old_blocks(&protected_roots); - if pruned_states > 0 || pruned_blocks > 0 { - info!(pruned_states, pruned_blocks, "Pruned old states and blocks"); + let pruned_signatures = self.prune_old_block_signatures(finalized_slot, tip_slot); + if pruned_states > 0 || pruned_signatures > 0 { + info!( + pruned_states, + pruned_signatures, "Pruned old states and block signatures" + ); } } @@ -821,7 +826,7 @@ impl Store { .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { - let (slot, root) = decode_live_chain_key(&k); + let (slot, root) = decode_slot_root_key(&k); let parent_root = H256::from_ssz_bytes(&v).expect("valid parent_root"); (root, (slot, parent_root)) }) @@ -834,7 +839,7 @@ impl Store { view.prefix_iterator(Table::LiveChain, &[]) .expect("iterator") .filter_map(Result::ok) - .map(|(key, _)| decode_live_chain_key(&key).0) + .map(|(key, _)| decode_slot_root_key(&key).0) .max() } @@ -847,7 +852,7 @@ impl Store { .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, _)| { - let (_, root) = decode_live_chain_key(&k); + let (_, root) = decode_slot_root_key(&k); root }) .collect() @@ -869,7 +874,7 @@ impl Store { .expect("iterator") .filter_map(|res| res.ok()) .take_while(|(k, _)| { - let (slot, _) = decode_live_chain_key(k); + let (slot, _) = decode_slot_root_key(k); slot < finalized_slot }) .map(|(k, _)| k.to_vec()) @@ -958,55 +963,49 @@ impl Store { count } - /// Prune old blocks beyond the retention window. + /// Prune signatures of old finalized blocks, keeping a recent window. + /// + /// Signatures within [`SIGNATURE_PRUNING_RANGE`] slots of `tip_slot` are + /// always kept, as are all signatures of non-finalized blocks. Concretely, + /// with `cutoff = tip_slot - SIGNATURE_PRUNING_RANGE`: /// - /// Keeps the most recent `BLOCKS_TO_KEEP` blocks (by slot), plus any - /// blocks whose roots appear in `protected_roots` (finalized, justified). - /// Deletes from `BlockHeaders`, `BlockBodies`, and `BlockSignatures`. + /// - if `cutoff <= finalized_slot` (healthy finality): delete signatures for + /// `slot < cutoff` (entirely within finalized history); + /// - otherwise (the non-finalized range exceeds the window): prune nothing, + /// since pruning up to `cutoff` would touch non-finalized blocks. /// - /// Returns the number of blocks pruned. - pub fn prune_old_blocks(&mut self, protected_roots: &[H256]) -> usize { + /// Headers and bodies are always retained. Finalized blocks can never be + /// reverted, so their signatures are not needed for fork choice, re-org + /// safety, or re-aggregation once outside the window. + /// + /// Returns the number of signatures pruned. + pub fn prune_old_block_signatures(&mut self, finalized_slot: u64, tip_slot: u64) -> usize { + let cutoff = tip_slot.saturating_sub(SIGNATURE_PRUNING_RANGE); + // Only prune when the whole window is finalized; never touch + // non-finalized signatures. + if cutoff > finalized_slot { + return 0; + } + let view = self.backend.begin_read().expect("read view"); - let mut entries: Vec<(Vec, u64)> = view - .prefix_iterator(Table::BlockHeaders, &[]) + // Keys are slot||root in big-endian slot order, so iteration ascends by + // slot: take entries below the cutoff and stop at the first one past it. + let keys_to_delete: Vec> = view + .prefix_iterator(Table::BlockSignatures, &[]) .expect("iterator") .filter_map(|res| res.ok()) - .map(|(key, value)| { - let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); - (key.to_vec(), header.slot) - }) + .map(|(key, _)| key.to_vec()) + .take_while(|key| decode_slot_root_key(key).0 < cutoff) .collect(); drop(view); - if entries.len() <= BLOCKS_TO_KEEP { - return 0; - } - - // Sort by slot descending (newest first) - entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); - - let protected: HashSet> = protected_roots.iter().map(|r| r.to_ssz()).collect(); - - let keys_to_delete: Vec> = entries - .into_iter() - .skip(BLOCKS_TO_KEEP) - .filter(|(key, _)| !protected.contains(key)) - .map(|(key, _)| key) - .collect(); - let count = keys_to_delete.len(); if count > 0 { let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) - .expect("delete old block headers"); - batch - .delete_batch(Table::BlockBodies, keys_to_delete.clone()) - .expect("delete old block bodies"); batch .delete_batch(Table::BlockSignatures, keys_to_delete) - .expect("delete old block signatures"); + .expect("delete finalized block signatures"); batch.commit().expect("commit"); } count @@ -1049,7 +1048,7 @@ impl Store { let block = write_signed_block(batch.as_mut(), &root, signed_block); let index_entries = vec![( - encode_live_chain_key(block.slot, &root), + encode_slot_root_key(block.slot, &root), block.parent_root.to_ssz(), )]; batch @@ -1107,7 +1106,8 @@ impl Store { BlockBody::from_ssz_bytes(&body_bytes).expect("valid body") }; - let proof = match view.get(Table::BlockSignatures, &key).expect("get") { + let sig_key = encode_slot_root_key(header.slot, root); + let proof = match view.get(Table::BlockSignatures, &sig_key).expect("get") { Some(proof_bytes) => { MultiMessageAggregate::from_ssz_bytes(&proof_bytes).expect("valid block proof") } @@ -1406,9 +1406,10 @@ fn write_signed_block( .expect("put block body"); } - // Store the merged Type-2 proof blob. Table name kept for the column-family - // migration cost; renaming to `BlockProof` is a follow-up. - let proof_entries = vec![(root_bytes, proof.to_ssz())]; + // Store the merged Type-2 proof blob, keyed by slot||root so signature + // pruning can scan in slot order and stop early. Table name kept for the + // column-family migration cost; renaming to `BlockProof` is a follow-up. + let proof_entries = vec![(encode_slot_root_key(header.slot, root), proof.to_ssz())]; batch .put_batch(Table::BlockSignatures, proof_entries) .expect("put block proof"); @@ -1439,7 +1440,10 @@ mod tests { .put_batch(Table::BlockBodies, vec![(key.clone(), vec![0u8; 4])]) .expect("put body"); batch - .put_batch(Table::BlockSignatures, vec![(key, vec![0u8; 4])]) + .put_batch( + Table::BlockSignatures, + vec![(encode_slot_root_key(slot, &root), vec![0u8; 4])], + ) .expect("put sigs"); batch.commit().expect("commit"); } @@ -1469,6 +1473,14 @@ mod tests { view.get(table, &root.to_ssz()).expect("get").is_some() } + /// Check whether a block signature exists for a (slot, root) pair. + fn has_signature(backend: &dyn StorageBackend, slot: u64, root: &H256) -> bool { + let view = backend.begin_read().expect("read view"); + view.get(Table::BlockSignatures, &encode_slot_root_key(slot, root)) + .expect("get") + .is_some() + } + /// Generate a deterministic H256 root from an index. fn root(index: u64) -> H256 { let mut bytes = [0u8; 32]; @@ -1504,103 +1516,70 @@ mod tests { } } - // ============ Block Pruning Tests ============ + // ============ Block Signature Pruning Tests ============ #[test] - fn prune_old_blocks_within_retention() { + fn prune_signatures_keeps_recent_window_when_finality_healthy() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); - // Insert exactly BLOCKS_TO_KEEP blocks - for i in 0..BLOCKS_TO_KEEP as u64 { + // Blocks at slots 0..12, each with header + body + signature. + for i in 0..13u64 { insert_header(backend.as_ref(), root(i), i); } - assert_eq!( - count_entries(backend.as_ref(), Table::BlockHeaders), - BLOCKS_TO_KEEP - ); - let pruned = store.prune_old_blocks(&[]); - assert_eq!(pruned, 0); - assert_eq!( - count_entries(backend.as_ref(), Table::BlockHeaders), - BLOCKS_TO_KEEP - ); + // Healthy finality: non-finalized gap (5) < SIGNATURE_PRUNING_RANGE. + // tip = range + 10, finalized = range + 5, so cutoff = tip - range = 10. + let tip_slot = SIGNATURE_PRUNING_RANGE + 10; + let finalized_slot = SIGNATURE_PRUNING_RANGE + 5; + let pruned = store.prune_old_block_signatures(finalized_slot, tip_slot); + + // cutoff = 10: slots 0..9 pruned, slots 10..12 kept (within the window). + assert_eq!(pruned, 10); + for i in 0..10u64 { + assert!(!has_signature(backend.as_ref(), i, &root(i))); + } + for i in 10..13u64 { + assert!(has_signature(backend.as_ref(), i, &root(i))); + } + + // Headers and bodies are always retained for the whole history. + assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), 13); + assert_eq!(count_entries(backend.as_ref(), Table::BlockBodies), 13); } #[test] - fn prune_old_blocks_exceeding_retention() { + fn prune_signatures_noop_when_non_finalized_range_exceeds_window() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); - let total = BLOCKS_TO_KEEP + 10; - for i in 0..total as u64 { + for i in 0..10u64 { insert_header(backend.as_ref(), root(i), i); } - assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), total); - - let pruned = store.prune_old_blocks(&[]); - assert_eq!(pruned, 10); - assert_eq!( - count_entries(backend.as_ref(), Table::BlockHeaders), - BLOCKS_TO_KEEP - ); - assert_eq!( - count_entries(backend.as_ref(), Table::BlockBodies), - BLOCKS_TO_KEEP - ); - assert_eq!( - count_entries(backend.as_ref(), Table::BlockSignatures), - BLOCKS_TO_KEEP - ); - // Oldest blocks (slots 0..10) should be gone - for i in 0..10u64 { - assert!(!has_key(backend.as_ref(), Table::BlockHeaders, &root(i))); - } - // Newest blocks should still exist - for i in 10..total as u64 { - assert!(has_key(backend.as_ref(), Table::BlockHeaders, &root(i))); - } + // Deep non-finality: gap (tip - finalized) > SIGNATURE_PRUNING_RANGE, so + // cutoff = tip - range > finalized → prune nothing. + let tip_slot = SIGNATURE_PRUNING_RANGE + 100; + let finalized_slot = 5; + let pruned = store.prune_old_block_signatures(finalized_slot, tip_slot); + assert_eq!(pruned, 0); + assert_eq!(count_entries(backend.as_ref(), Table::BlockSignatures), 10); } #[test] - fn prune_old_blocks_preserves_protected() { + fn prune_signatures_noop_when_tip_within_window() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); - let total = BLOCKS_TO_KEEP + 10; - for i in 0..total as u64 { + for i in 0..10u64 { insert_header(backend.as_ref(), root(i), i); } - // Protect the two oldest blocks (slots 0 and 1) - let finalized_root = root(0); - let justified_root = root(1); - let pruned = store.prune_old_blocks(&[finalized_root, justified_root]); - - // 10 would be pruned, but 2 are protected - assert_eq!(pruned, 8); - assert!(has_key( - backend.as_ref(), - Table::BlockHeaders, - &finalized_root - )); - assert!(has_key( - backend.as_ref(), - Table::BlockHeaders, - &justified_root - )); - assert!(has_key( - backend.as_ref(), - Table::BlockBodies, - &finalized_root - )); - assert!(has_key( - backend.as_ref(), - Table::BlockSignatures, - &finalized_root - )); + // Early chain: tip < SIGNATURE_PRUNING_RANGE → cutoff saturates to 0, + // so nothing is old enough to prune even though slots are finalized. + let pruned = store.prune_old_block_signatures(9, 9); + assert_eq!(pruned, 0); + assert_eq!(count_entries(backend.as_ref(), Table::BlockSignatures), 10); } // ============ State Pruning Tests ============ @@ -1711,7 +1690,7 @@ mod tests { }, ); - // Insert more than STATES_TO_KEEP headers + states, but fewer than BLOCKS_TO_KEEP + // Insert more than STATES_TO_KEEP headers + states. let total_states = STATES_TO_KEEP + 5; for i in 0..total_states as u64 { insert_header(backend.as_ref(), root(i), i); @@ -1746,7 +1725,7 @@ mod tests { assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); assert!(has_key(backend.as_ref(), Table::States, &justified_root)); - // Blocks: total_states < BLOCKS_TO_KEEP, so no blocks should be pruned + // Headers and bodies are never pruned, so all are retained. assert_eq!( count_entries(backend.as_ref(), Table::BlockHeaders), total_states