Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 17 additions & 3 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,20 @@ async fn main() -> eyre::Result<()> {
// and the API server (which exposes GET/POST admin endpoints).
let aggregator = AggregatorController::new(options.is_aggregator);

// Chain-event broadcast channel: the blockchain actor is the sole sender;
// each SSE client (`GET /lean/v0/events`) subscribes its own receiver. The
// initial receiver is dropped — subscribers attach on demand and a fully
// unsubscribed channel just drops events.
let (chain_events, _) =
tokio::sync::broadcast::channel(ethlambda_blockchain::CHAIN_EVENT_CHANNEL_CAPACITY);

let blockchain = BlockChain::spawn(
store.clone(),
validator_keys,
aggregator.clone(),
attestation_committee_count,
!options.disable_duty_sync_gate,
chain_events.clone(),
);

// Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the
Expand Down Expand Up @@ -333,9 +341,15 @@ async fn main() -> eyre::Result<()> {
let rpc_shutdown = shutdown_token.clone();

let rpc_handle = tokio::spawn(async move {
let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator, rpc_shutdown)
.await
.inspect_err(|err| error!(%err, "RPC server failed"));
let _ = ethlambda_rpc::start_rpc_server(
rpc_config,
store,
aggregator,
chain_events,
rpc_shutdown,
)
.await
.inspect_err(|err| error!(%err, "RPC server failed"));
});

info!("Node initialized");
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ tokio-util = { version = "0.7", default-features = false }
rayon.workspace = true
thiserror.workspace = true
tracing.workspace = true
serde.workspace = true

hex.workspace = true

[dev-dependencies]
ethlambda-test-fixtures.workspace = true
serde = { workspace = true }
serde_json = { workspace = true }
hex = { workspace = true }
libssz.workspace = true
Expand Down
56 changes: 56 additions & 0 deletions crates/blockchain/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Chain events emitted by the [`crate::BlockChainServer`] actor and streamed
//! to RPC clients over Server-Sent Events (`GET /lean/v0/events`).
//!
//! The flow is strictly one-directional: the actor (the sole writer) publishes
//! events on a [`broadcast`] channel, and the read-only RPC handler subscribes.
//! RPC never writes back into the actor.

use ethlambda_types::primitives::H256;
use serde::Serialize;
use tokio::sync::broadcast;

/// A consensus event broadcast to SSE subscribers.
///
/// Serialized with an external `event`/`data` tag so the JSON payload mirrors
/// the SSE framing (`event: head\ndata: {...}`).
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "event", content = "data", rename_all = "snake_case")]
pub enum ChainEvent {
/// Fork choice selected a new head.
Head {
slot: u64,
root: H256,
parent_root: H256,
},
/// A block was imported into the store.
Block { slot: u64, root: H256 },
/// The finalized checkpoint advanced.
FinalizedCheckpoint { slot: u64, root: H256 },
}

/// Sender half of the chain-event broadcast channel, owned by the actor.
pub type ChainEventTx = broadcast::Sender<ChainEvent>;

/// Capacity chosen so a briefly-stalled SSE client is dropped (lagged) rather
/// than back-pressuring the actor. Lagged clients re-sync via backfill.
pub const CHAIN_EVENT_CHANNEL_CAPACITY: usize = 256;

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn channel_delivers_head_event() {
let (tx, mut rx) = broadcast::channel::<ChainEvent>(CHAIN_EVENT_CHANNEL_CAPACITY);
tx.send(ChainEvent::Head {
slot: 7,
root: H256::ZERO,
parent_root: H256::ZERO,
})
.unwrap();
match rx.recv().await.unwrap() {
ChainEvent::Head { slot, .. } => assert_eq!(slot, 7),
other => panic!("unexpected: {other:?}"),
}
}
}
19 changes: 17 additions & 2 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ use tracing::{debug, error, info, trace, warn};

use crate::store::StoreError;

pub use events::{CHAIN_EVENT_CHANNEL_CAPACITY, ChainEvent, ChainEventTx};

pub mod aggregation;
pub mod block_builder;
pub(crate) mod coverage;
pub mod events;
pub(crate) mod fork_choice_tree;
pub mod key_manager;
pub mod metrics;
Expand Down Expand Up @@ -82,6 +85,7 @@ impl BlockChain {
aggregator: AggregatorController,
attestation_committee_count: u64,
gate_duties: bool,
chain_events: ChainEventTx,
) -> BlockChain {
metrics::set_is_aggregator(aggregator.is_enabled());
metrics::set_node_sync_status(metrics::SyncStatus::Idle);
Expand All @@ -108,6 +112,7 @@ impl BlockChain {
attestation_committee_count,
pre_merge_coverage: None,
sync_status: SyncStatusTracker::new(gate_duties),
chain_events,
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -177,6 +182,11 @@ pub struct BlockChainServer {
/// validator duties while syncing, unless that gating was disabled at
/// startup via `--disable-duty-sync-gate` (then it is metric-only).
sync_status: SyncStatusTracker,

/// Broadcast sender for chain events streamed to SSE subscribers
/// (`GET /lean/v0/events`). The actor is the sole publisher; the RPC
/// handler only subscribes, preserving the one-directional write flow.
chain_events: ChainEventTx,
}

impl BlockChainServer {
Expand Down Expand Up @@ -258,7 +268,12 @@ impl BlockChainServer {
let is_proposer = scheduled_proposer.is_some();

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
store::on_tick(&mut self.store, timestamp_ms, is_proposer);
store::on_tick(
&mut self.store,
timestamp_ms,
is_proposer,
Some(&self.chain_events),
);

// ==== interval 0 ====

Expand Down Expand Up @@ -593,7 +608,7 @@ impl BlockChainServer {

/// Run block import and refresh metrics.
fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> {
store::on_block(&mut self.store, signed_block)?;
store::on_block(&mut self.store, signed_block, Some(&self.chain_events))?;
metrics::update_head_slot(self.store.head_slot());
metrics::update_latest_justified_slot(self.store.latest_justified().slot);
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
Expand Down
81 changes: 68 additions & 13 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,33 @@ use crate::{
GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA,
MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT,
block_builder::{PostBlockCheckpoints, build_block},
events::{ChainEvent, ChainEventTx},
metrics,
};

const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3;

/// Accept new aggregated payloads, promoting them to known for fork choice.
fn accept_new_attestations(store: &mut Store, log_tree: bool) {
fn accept_new_attestations(store: &mut Store, log_tree: bool, events: Option<&ChainEventTx>) {
store.promote_new_aggregated_payloads();
metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count());
metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count());
update_head(store, log_tree);
update_head(store, log_tree, events);
}

/// Update the head based on the fork choice rule.
///
/// When `log_tree` is true, also computes block weights and logs an ASCII
/// fork choice tree to the terminal.
pub fn update_head(store: &mut Store, log_tree: bool) {
///
/// When `events` is `Some`, emits a [`ChainEvent::Head`] whenever the head
/// changes and a [`ChainEvent::FinalizedCheckpoint`] whenever finalization
/// advances. Send errors (no subscribers) are ignored.
pub fn update_head(store: &mut Store, log_tree: bool, events: Option<&ChainEventTx>) {
let blocks = store.get_live_chain();
let attestations = store.extract_latest_known_attestations();
let old_head = store.head();
let old_finalized = store.latest_finalized();
let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
&blocks,
Expand All @@ -60,6 +66,31 @@ pub fn update_head(store: &mut Store, log_tree: bool) {
.filter(|finalized| store.get_block_header(&finalized.root).is_some());
store.update_checkpoints(ForkCheckpoints::new(new_head, None, finalized));

if let Some(events) = events {
// Emit the new head whenever fork choice moved it. Read the header once
// and reuse it for slot and parent_root so they stay consistent.
if old_head != new_head {
if let Some(new_header) = store.get_block_header(&new_head) {
let _ = events.send(ChainEvent::Head {
slot: new_header.slot,
root: new_head,
parent_root: new_header.parent_root,
});
} else {
tracing::warn!("head header missing while emitting Head event; skipping");
}
}

// Emit a finalized-checkpoint event only when finalization advanced.
let new_finalized = store.latest_finalized();
if new_finalized.slot > old_finalized.slot || new_finalized.root != old_finalized.root {
let _ = events.send(ChainEvent::FinalizedCheckpoint {
slot: new_finalized.slot,
root: new_finalized.root,
});
}
}

if old_head != new_head {
let old_slot = store
.get_block_header(&old_head)
Expand Down Expand Up @@ -254,7 +285,12 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()
/// 800ms interval. Slot and interval-within-slot are derived as:
/// slot = store.time() / INTERVALS_PER_SLOT
/// interval = store.time() % INTERVALS_PER_SLOT
pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) {
pub fn on_tick(
store: &mut Store,
timestamp_ms: u64,
has_proposal: bool,
events: Option<&ChainEventTx>,
) {
// Convert UNIX timestamp (ms) to interval count since genesis
let genesis_time_ms = store.config().genesis_time * 1000;
let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms);
Expand Down Expand Up @@ -287,7 +323,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) {
0 => {
// Start of slot - process attestations if proposal exists
if should_signal_proposal {
accept_new_attestations(store, false);
accept_new_attestations(store, false, events);
}
}
1 => {
Expand All @@ -302,7 +338,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) {
}
4 => {
// End of slot - accept accumulated attestations and log tree
accept_new_attestations(store, true);
accept_new_attestations(store, true, events);
}
_ => unreachable!("slots only have 5 intervals"),
}
Expand Down Expand Up @@ -481,8 +517,12 @@ fn on_gossip_aggregated_attestation_core(
///
/// This is the safe default: it always verifies cryptographic signatures
/// and stores them for future block building. Use this for all production paths.
pub fn on_block(store: &mut Store, signed_block: SignedBlock) -> Result<(), StoreError> {
on_block_core(store, signed_block, true)
pub fn on_block(
store: &mut Store,
signed_block: SignedBlock,
events: Option<&ChainEventTx>,
) -> Result<(), StoreError> {
on_block_core(store, signed_block, true, events)
}

/// Process a new block without signature verification.
Expand All @@ -493,7 +533,7 @@ pub fn on_block_without_verification(
store: &mut Store,
signed_block: SignedBlock,
) -> Result<(), StoreError> {
on_block_core(store, signed_block, false)
on_block_core(store, signed_block, false, None)
}

/// Core block processing logic.
Expand All @@ -504,6 +544,7 @@ fn on_block_core(
store: &mut Store,
signed_block: SignedBlock,
verify: bool,
events: Option<&ChainEventTx>,
) -> Result<(), StoreError> {
let _timing = metrics::time_fork_choice_block_processing();
let block_start = std::time::Instant::now();
Expand Down Expand Up @@ -586,8 +627,17 @@ fn on_block_core(
metrics::inc_attestations_valid(count);
}

// Emit the imported block before fork choice runs, so subscribers see the
// `block` event ahead of any `head` move it triggers.
if let Some(events) = events {
let _ = events.send(ChainEvent::Block {
slot,
root: block_root,
});
}

// Update forkchoice head based on new block and attestations
update_head(store, false);
update_head(store, false, events);

let block_total = block_start.elapsed();
info!(
Expand Down Expand Up @@ -758,11 +808,16 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 {
// Calculate time corresponding to this slot
let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT;

// Advance time to current slot (ticking intervals)
on_tick(store, slot_time_ms, true);
// Advance time to current slot (ticking intervals).
//
// No event sender here: this is the proposer's pre-build catch-up, and the
// block it produces is imported via `on_block` (which emits the resulting
// `Block`/`Head`). Emitting from here would surface a head move before the
// block exists.
on_tick(store, slot_time_ms, true, None);

// Process any pending attestations before proposal
accept_new_attestations(store, false);
accept_new_attestations(store, false, None);

store.head()
}
Expand Down
6 changes: 3 additions & 3 deletions crates/blockchain/tests/forkchoice_spectests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
if step.tick_to_slot {
let block_time_ms =
genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT;
store::on_tick(&mut store, block_time_ms, true);
store::on_tick(&mut store, block_time_ms, true, None);
}
let result = store::on_block_without_verification(&mut store, signed_block);
let import_ok = result.is_ok();
Expand Down Expand Up @@ -137,7 +137,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
// on_block already ran the head update before these votes
// existed; recompute so the head reflects the block's own
// attestations, matching the proposer-view store.
store::update_head(&mut store, false);
store::update_head(&mut store, false, None);
}
}
"tick" => {
Expand All @@ -152,7 +152,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
(None, None) => panic!("tick step missing both time and interval"),
};
let has_proposal = step.has_proposal.unwrap_or(false);
store::on_tick(&mut store, timestamp_ms, has_proposal);
store::on_tick(&mut store, timestamp_ms, has_proposal, None);
}
"attestation" => {
let att_data = step
Expand Down
Loading
Loading