diff --git a/Cargo.lock b/Cargo.lock index 764d89b0..888814ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2134,6 +2134,7 @@ dependencies = [ "ethlambda-storage", "ethlambda-test-fixtures", "ethlambda-types", + "futures-core", "hex", "http-body-util", "jemalloc_pprof", @@ -2141,6 +2142,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "tokio-util", "tower", "tracing", diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index b26481f2..e9f466a1 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -288,12 +288,20 @@ async fn main() -> eyre::Result<()> { // and the API server (which exposes GET/POST admin endpoints). let aggregator = AggregatorController::new(options.is_aggregator); + // Chain-event broadcast channel: the blockchain actor is the sole sender; + // each SSE client (`GET /lean/v0/events`) subscribes its own receiver. The + // initial receiver is dropped — subscribers attach on demand and a fully + // unsubscribed channel just drops events. + let (chain_events, _) = + tokio::sync::broadcast::channel(ethlambda_blockchain::CHAIN_EVENT_CHANNEL_CAPACITY); + let blockchain = BlockChain::spawn( store.clone(), validator_keys, aggregator.clone(), attestation_committee_count, !options.disable_duty_sync_gate, + chain_events.clone(), ); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the @@ -333,9 +341,15 @@ async fn main() -> eyre::Result<()> { let rpc_shutdown = shutdown_token.clone(); let rpc_handle = tokio::spawn(async move { - let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator, rpc_shutdown) - .await - .inspect_err(|err| error!(%err, "RPC server failed")); + let _ = ethlambda_rpc::start_rpc_server( + rpc_config, + store, + aggregator, + chain_events, + rpc_shutdown, + ) + .await + .inspect_err(|err| error!(%err, "RPC server failed")); }); info!("Node initialized"); diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index f25234cd..a08262eb 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -29,12 +29,12 @@ tokio-util = { version = "0.7", default-features = false } rayon.workspace = true thiserror.workspace = true tracing.workspace = true +serde.workspace = true hex.workspace = true [dev-dependencies] ethlambda-test-fixtures.workspace = true -serde = { workspace = true } serde_json = { workspace = true } hex = { workspace = true } libssz.workspace = true diff --git a/crates/blockchain/src/events.rs b/crates/blockchain/src/events.rs new file mode 100644 index 00000000..0281c4d8 --- /dev/null +++ b/crates/blockchain/src/events.rs @@ -0,0 +1,56 @@ +//! Chain events emitted by the [`crate::BlockChainServer`] actor and streamed +//! to RPC clients over Server-Sent Events (`GET /lean/v0/events`). +//! +//! The flow is strictly one-directional: the actor (the sole writer) publishes +//! events on a [`broadcast`] channel, and the read-only RPC handler subscribes. +//! RPC never writes back into the actor. + +use ethlambda_types::primitives::H256; +use serde::Serialize; +use tokio::sync::broadcast; + +/// A consensus event broadcast to SSE subscribers. +/// +/// Serialized with an external `event`/`data` tag so the JSON payload mirrors +/// the SSE framing (`event: head\ndata: {...}`). +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "event", content = "data", rename_all = "snake_case")] +pub enum ChainEvent { + /// Fork choice selected a new head. + Head { + slot: u64, + root: H256, + parent_root: H256, + }, + /// A block was imported into the store. + Block { slot: u64, root: H256 }, + /// The finalized checkpoint advanced. + FinalizedCheckpoint { slot: u64, root: H256 }, +} + +/// Sender half of the chain-event broadcast channel, owned by the actor. +pub type ChainEventTx = broadcast::Sender; + +/// Capacity chosen so a briefly-stalled SSE client is dropped (lagged) rather +/// than back-pressuring the actor. Lagged clients re-sync via backfill. +pub const CHAIN_EVENT_CHANNEL_CAPACITY: usize = 256; + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn channel_delivers_head_event() { + let (tx, mut rx) = broadcast::channel::(CHAIN_EVENT_CHANNEL_CAPACITY); + tx.send(ChainEvent::Head { + slot: 7, + root: H256::ZERO, + parent_root: H256::ZERO, + }) + .unwrap(); + match rx.recv().await.unwrap() { + ChainEvent::Head { slot, .. } => assert_eq!(slot, 7), + other => panic!("unexpected: {other:?}"), + } + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 4360377d..c6c6345b 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -28,9 +28,12 @@ use tracing::{debug, error, info, trace, warn}; use crate::store::StoreError; +pub use events::{CHAIN_EVENT_CHANNEL_CAPACITY, ChainEvent, ChainEventTx}; + pub mod aggregation; pub mod block_builder; pub(crate) mod coverage; +pub mod events; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -82,6 +85,7 @@ impl BlockChain { aggregator: AggregatorController, attestation_committee_count: u64, gate_duties: bool, + chain_events: ChainEventTx, ) -> BlockChain { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); @@ -108,6 +112,7 @@ impl BlockChain { attestation_committee_count, pre_merge_coverage: None, sync_status: SyncStatusTracker::new(gate_duties), + chain_events, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -177,6 +182,11 @@ pub struct BlockChainServer { /// validator duties while syncing, unless that gating was disabled at /// startup via `--disable-duty-sync-gate` (then it is metric-only). sync_status: SyncStatusTracker, + + /// Broadcast sender for chain events streamed to SSE subscribers + /// (`GET /lean/v0/events`). The actor is the sole publisher; the RPC + /// handler only subscribes, preserving the one-directional write flow. + chain_events: ChainEventTx, } impl BlockChainServer { @@ -258,7 +268,12 @@ impl BlockChainServer { let is_proposer = scheduled_proposer.is_some(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick(&mut self.store, timestamp_ms, is_proposer); + store::on_tick( + &mut self.store, + timestamp_ms, + is_proposer, + Some(&self.chain_events), + ); // ==== interval 0 ==== @@ -593,7 +608,7 @@ impl BlockChainServer { /// Run block import and refresh metrics. fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> { - store::on_block(&mut self.store, signed_block)?; + store::on_block(&mut self.store, signed_block, Some(&self.chain_events))?; metrics::update_head_slot(self.store.head_slot()); metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 8f7807d2..92e4d242 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -20,27 +20,33 @@ use crate::{ GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, block_builder::{PostBlockCheckpoints, build_block}, + events::{ChainEvent, ChainEventTx}, metrics, }; const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; /// Accept new aggregated payloads, promoting them to known for fork choice. -fn accept_new_attestations(store: &mut Store, log_tree: bool) { +fn accept_new_attestations(store: &mut Store, log_tree: bool, events: Option<&ChainEventTx>) { store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); - update_head(store, log_tree); + update_head(store, log_tree, events); } /// Update the head based on the fork choice rule. /// /// When `log_tree` is true, also computes block weights and logs an ASCII /// fork choice tree to the terminal. -pub fn update_head(store: &mut Store, log_tree: bool) { +/// +/// When `events` is `Some`, emits a [`ChainEvent::Head`] whenever the head +/// changes and a [`ChainEvent::FinalizedCheckpoint`] whenever finalization +/// advances. Send errors (no subscribers) are ignored. +pub fn update_head(store: &mut Store, log_tree: bool, events: Option<&ChainEventTx>) { let blocks = store.get_live_chain(); let attestations = store.extract_latest_known_attestations(); let old_head = store.head(); + let old_finalized = store.latest_finalized(); let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -60,6 +66,31 @@ pub fn update_head(store: &mut Store, log_tree: bool) { .filter(|finalized| store.get_block_header(&finalized.root).is_some()); store.update_checkpoints(ForkCheckpoints::new(new_head, None, finalized)); + if let Some(events) = events { + // Emit the new head whenever fork choice moved it. Read the header once + // and reuse it for slot and parent_root so they stay consistent. + if old_head != new_head { + if let Some(new_header) = store.get_block_header(&new_head) { + let _ = events.send(ChainEvent::Head { + slot: new_header.slot, + root: new_head, + parent_root: new_header.parent_root, + }); + } else { + tracing::warn!("head header missing while emitting Head event; skipping"); + } + } + + // Emit a finalized-checkpoint event only when finalization advanced. + let new_finalized = store.latest_finalized(); + if new_finalized.slot > old_finalized.slot || new_finalized.root != old_finalized.root { + let _ = events.send(ChainEvent::FinalizedCheckpoint { + slot: new_finalized.slot, + root: new_finalized.root, + }); + } + } + if old_head != new_head { let old_slot = store .get_block_header(&old_head) @@ -254,7 +285,12 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { +pub fn on_tick( + store: &mut Store, + timestamp_ms: u64, + has_proposal: bool, + events: Option<&ChainEventTx>, +) { // Convert UNIX timestamp (ms) to interval count since genesis let genesis_time_ms = store.config().genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -287,7 +323,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { 0 => { // Start of slot - process attestations if proposal exists if should_signal_proposal { - accept_new_attestations(store, false); + accept_new_attestations(store, false, events); } } 1 => { @@ -302,7 +338,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { } 4 => { // End of slot - accept accumulated attestations and log tree - accept_new_attestations(store, true); + accept_new_attestations(store, true, events); } _ => unreachable!("slots only have 5 intervals"), } @@ -481,8 +517,12 @@ fn on_gossip_aggregated_attestation_core( /// /// This is the safe default: it always verifies cryptographic signatures /// and stores them for future block building. Use this for all production paths. -pub fn on_block(store: &mut Store, signed_block: SignedBlock) -> Result<(), StoreError> { - on_block_core(store, signed_block, true) +pub fn on_block( + store: &mut Store, + signed_block: SignedBlock, + events: Option<&ChainEventTx>, +) -> Result<(), StoreError> { + on_block_core(store, signed_block, true, events) } /// Process a new block without signature verification. @@ -493,7 +533,7 @@ pub fn on_block_without_verification( store: &mut Store, signed_block: SignedBlock, ) -> Result<(), StoreError> { - on_block_core(store, signed_block, false) + on_block_core(store, signed_block, false, None) } /// Core block processing logic. @@ -504,6 +544,7 @@ fn on_block_core( store: &mut Store, signed_block: SignedBlock, verify: bool, + events: Option<&ChainEventTx>, ) -> Result<(), StoreError> { let _timing = metrics::time_fork_choice_block_processing(); let block_start = std::time::Instant::now(); @@ -586,8 +627,17 @@ fn on_block_core( metrics::inc_attestations_valid(count); } + // Emit the imported block before fork choice runs, so subscribers see the + // `block` event ahead of any `head` move it triggers. + if let Some(events) = events { + let _ = events.send(ChainEvent::Block { + slot, + root: block_root, + }); + } + // Update forkchoice head based on new block and attestations - update_head(store, false); + update_head(store, false, events); let block_total = block_start.elapsed(); info!( @@ -758,11 +808,16 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; - // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true); + // Advance time to current slot (ticking intervals). + // + // No event sender here: this is the proposer's pre-build catch-up, and the + // block it produces is imported via `on_block` (which emits the resulting + // `Block`/`Head`). Emitting from here would surface a head move before the + // block exists. + on_tick(store, slot_time_ms, true, None); // Process any pending attestations before proposal - accept_new_attestations(store, false); + accept_new_attestations(store, false, None); store.head() } diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 1169dbd1..99a70d40 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -103,7 +103,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { if step.tick_to_slot { let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut store, block_time_ms, true); + store::on_tick(&mut store, block_time_ms, true, None); } let result = store::on_block_without_verification(&mut store, signed_block); let import_ok = result.is_ok(); @@ -137,7 +137,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // on_block already ran the head update before these votes // existed; recompute so the head reflects the block's own // attestations, matching the proposer-view store. - store::update_head(&mut store, false); + store::update_head(&mut store, false, None); } } "tick" => { @@ -152,7 +152,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { (None, None) => panic!("tick step missing both time and interval"), }; let has_proposal = step.has_proposal.unwrap_or(false); - store::on_tick(&mut store, timestamp_ms, has_proposal); + store::on_tick(&mut store, timestamp_ms, has_proposal, None); } "attestation" => { let att_data = step diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index a11a58cc..aba2adc4 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -61,10 +61,10 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut st, block_time_ms, true); + store::on_tick(&mut st, block_time_ms, true, None); // Process the block (this includes signature verification) - let result = store::on_block(&mut st, signed_block); + let result = store::on_block(&mut st, signed_block, None); // Step 3: Check that it succeeded or failed as expected match (result.is_ok(), test.expect_exception.as_ref()) { diff --git a/crates/net/rpc/Cargo.toml b/crates/net/rpc/Cargo.toml index bebd3218..cee21b9d 100644 --- a/crates/net/rpc/Cargo.toml +++ b/crates/net/rpc/Cargo.toml @@ -26,6 +26,8 @@ serde_json.workspace = true hex.workspace = true tracing.workspace = true jemalloc_pprof.workspace = true +tokio-stream = { version = "0.1", features = ["sync"] } +futures-core = "0.3" [dev-dependencies] ethlambda-types.workspace = true diff --git a/crates/net/rpc/src/admin.rs b/crates/net/rpc/src/admin.rs index 43804d02..679a636d 100644 --- a/crates/net/rpc/src/admin.rs +++ b/crates/net/rpc/src/admin.rs @@ -12,10 +12,12 @@ //! the role (hot-standby model). See leanSpec PR #636 for the full rationale. use axum::{ - Extension, Json, + Extension, Json, Router, http::StatusCode, response::{IntoResponse, Response}, + routing::get, }; +use ethlambda_storage::Store; use ethlambda_types::aggregator::AggregatorController; use serde::Serialize; use serde_json::Value; @@ -23,6 +25,13 @@ use tracing::info; use crate::json_response; +pub(crate) fn routes() -> Router { + Router::new().route( + "/lean/v0/admin/aggregator", + get(get_aggregator).post(post_aggregator), + ) +} + #[derive(Serialize)] struct StatusResponse { is_aggregator: bool, @@ -44,7 +53,9 @@ struct ToggleResponse { /// `Extension` would cause axum to short-circuit with a 500 when the /// extension is missing, whereas `Option` yields `None` and lets us return /// a clean 503 with a useful message. -pub async fn get_aggregator(controller: Option>) -> Response { +pub(crate) async fn get_aggregator( + controller: Option>, +) -> Response { match controller { Some(Extension(controller)) => json_response(StatusResponse { is_aggregator: controller.is_enabled(), @@ -62,7 +73,7 @@ pub async fn get_aggregator(controller: Option>) /// `Extension` would cause axum to short-circuit with a 500 when the /// extension is missing, whereas `Option` yields `None` and lets us return /// a clean 503 with a useful message. -pub async fn post_aggregator( +pub(crate) async fn post_aggregator( controller: Option>, body: Option>, ) -> Response { diff --git a/crates/net/rpc/src/blocks.rs b/crates/net/rpc/src/blocks.rs index edfa1737..8110b99b 100644 --- a/crates/net/rpc/src/blocks.rs +++ b/crates/net/rpc/src/blocks.rs @@ -1,7 +1,9 @@ use axum::{ + Router, extract::{Path, State}, http::StatusCode, response::IntoResponse, + routing::get, }; use ethlambda_storage::Store; use ethlambda_types::primitives::H256; @@ -9,10 +11,16 @@ use serde_json::json; use crate::json_response; +pub(crate) fn routes() -> Router { + Router::new() + .route("/lean/v0/blocks/{block_id}", get(get_block)) + .route("/lean/v0/blocks/{block_id}/header", get(get_block_header)) +} + /// `GET /lean/v0/blocks/:block_id` — returns the block as JSON. /// /// `block_id` can be a `0x`-prefixed 32-byte hex root or a decimal slot. -pub async fn get_block( +pub(crate) async fn get_block( Path(block_id): Path, State(store): State, ) -> impl IntoResponse { @@ -28,7 +36,7 @@ pub async fn get_block( } /// `GET /lean/v0/blocks/:block_id/header` — returns the block header as JSON. -pub async fn get_block_header( +pub(crate) async fn get_block_header( Path(block_id): Path, State(store): State, ) -> impl IntoResponse { diff --git a/crates/net/rpc/src/core.rs b/crates/net/rpc/src/core.rs new file mode 100644 index 00000000..4ccd1f9a --- /dev/null +++ b/crates/net/rpc/src/core.rs @@ -0,0 +1,73 @@ +use axum::{ + Json, Router, + http::{HeaderValue, header}, + response::IntoResponse, + routing::get, +}; +use ethlambda_storage::Store; +use ethlambda_types::primitives::H256; +use libssz::SszEncode; + +pub(crate) fn routes() -> Router { + Router::new() + .route("/lean/v0/health", get(crate::metrics::get_health)) + .route("/lean/v0/states/finalized", get(get_latest_finalized_state)) + .route("/lean/v0/blocks/finalized", get(get_latest_finalized_block)) + .route( + "/lean/v0/checkpoints/justified", + get(get_latest_justified_state), + ) +} + +pub(crate) async fn get_latest_finalized_state( + axum::extract::State(store): axum::extract::State, +) -> impl IntoResponse { + let finalized = store.latest_finalized(); + let mut state = store + .get_state(&finalized.root) + .expect("finalized state exists"); + + // Zero state_root to match the canonical post-state representation. + // The spec's state_transition sets state_root to zero during process_block_header, + // and only fills it in lazily at the next slot's process_slots. + // Serving the canonical form ensures checkpoint sync interoperability. + state.latest_block_header.state_root = H256::ZERO; + + ssz_response(state.to_ssz()) +} + +pub(crate) async fn get_latest_finalized_block( + axum::extract::State(store): axum::extract::State, +) -> impl IntoResponse { + let finalized = store.latest_finalized(); + // Returns 404 for genesis since it doesn't have a valid signature + match store.get_signed_block(&finalized.root) { + Some(block) => ssz_response(block.to_ssz()), + None => axum::http::StatusCode::NOT_FOUND.into_response(), + } +} + +pub(crate) async fn get_latest_justified_state( + axum::extract::State(store): axum::extract::State, +) -> impl IntoResponse { + let checkpoint = store.latest_justified(); + json_response(checkpoint) +} + +pub(crate) fn json_response(value: T) -> axum::response::Response { + let mut response = Json(value).into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static(crate::JSON_CONTENT_TYPE), + ); + response +} + +fn ssz_response(bytes: Vec) -> axum::response::Response { + let mut response = bytes.into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static(crate::SSZ_CONTENT_TYPE), + ); + response +} diff --git a/crates/net/rpc/src/events.rs b/crates/net/rpc/src/events.rs new file mode 100644 index 00000000..4966c0d1 --- /dev/null +++ b/crates/net/rpc/src/events.rs @@ -0,0 +1,100 @@ +//! `GET /lean/v0/events` — Server-Sent Events stream of chain events. +//! +//! The [`ethlambda_blockchain::BlockChainServer`] actor publishes +//! [`ChainEvent`]s on a broadcast channel; this read-only handler subscribes a +//! new receiver per connection and forwards each event as an SSE message. The +//! flow is strictly one-directional (actor → broadcast → SSE), so RPC never +//! writes into the actor. + +use std::convert::Infallible; + +use axum::{ + Extension, Router, + response::{Sse, sse::Event}, + routing::get, +}; +use ethlambda_blockchain::ChainEvent; +use ethlambda_storage::Store; +use futures_core::Stream; +use tokio::sync::broadcast; +use tokio_stream::{ + StreamExt, + wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}, +}; + +async fn get_events( + Extension(tx): Extension>, +) -> Sse>> { + let stream = BroadcastStream::new(tx.subscribe()).filter_map(|res| { + // A slow client falls behind and the broadcast channel overwrites + // events it never read. Surface that rather than silently dropping. + let ev = match res { + Ok(ev) => ev, + Err(BroadcastStreamRecvError::Lagged(skipped)) => { + tracing::debug!(skipped, "SSE client lagged; dropped chain events"); + return None; + } + }; + let name = match &ev { + ChainEvent::Head { .. } => "head", + ChainEvent::Block { .. } => "block", + ChainEvent::FinalizedCheckpoint { .. } => "finalized_checkpoint", + }; + Some(Ok(Event::default() + .event(name) + .json_data(ev) + .inspect_err(|err| tracing::warn!(%err, "failed to serialize SSE chain event")) + .ok()?)) + }); + Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()) +} + +pub(crate) fn routes() -> Router { + Router::new().route("/lean/v0/events", get(get_events)) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{body::Body, http::Request}; + use ethlambda_storage::{Store, backend::InMemoryBackend}; + use std::sync::Arc; + use tower::ServiceExt; + + use crate::test_utils::create_test_state; + + #[tokio::test] + async fn events_streams_head() { + let (tx, _) = broadcast::channel::(16); + let store = Store::from_anchor_state(Arc::new(InMemoryBackend::new()), create_test_state()); + let app = crate::build_api_router(store).layer(Extension(tx.clone())); + + // Issue the request first so the handler subscribes its receiver before + // we publish — `broadcast::send` errors if there are no live receivers. + let resp = app + .oneshot( + Request::builder() + .uri("/lean/v0/events") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), axum::http::StatusCode::OK); + + tx.send(ChainEvent::Head { + slot: 3, + root: Default::default(), + parent_root: Default::default(), + }) + .unwrap(); + + let mut body = resp.into_body().into_data_stream(); + let chunk = tokio_stream::StreamExt::next(&mut body) + .await + .unwrap() + .unwrap(); + let text = String::from_utf8_lossy(&chunk); + assert!(text.contains("event:head") || text.contains("event: head")); + } +} diff --git a/crates/net/rpc/src/fork_choice.rs b/crates/net/rpc/src/fork_choice.rs index 75fb2702..19f368d4 100644 --- a/crates/net/rpc/src/fork_choice.rs +++ b/crates/net/rpc/src/fork_choice.rs @@ -1,10 +1,16 @@ -use axum::{http::HeaderValue, http::header, response::IntoResponse}; +use axum::{Router, http::HeaderValue, http::header, response::IntoResponse, routing::get}; use ethlambda_storage::Store; use ethlambda_types::{checkpoint::Checkpoint, primitives::H256}; use serde::Serialize; use crate::json_response; +pub(crate) fn routes() -> Router { + Router::new() + .route("/lean/v0/fork_choice", get(get_fork_choice)) + .route("/lean/v0/fork_choice/ui", get(get_fork_choice_ui)) +} + const HTML_CONTENT_TYPE: &str = "text/html; charset=utf-8"; const FORK_CHOICE_HTML: &str = include_str!("../static/fork_choice.html"); @@ -27,7 +33,7 @@ pub struct ForkChoiceNode { weight: u64, } -pub async fn get_fork_choice( +pub(crate) async fn get_fork_choice( axum::extract::State(store): axum::extract::State, ) -> impl IntoResponse { let blocks = store.get_live_chain(); @@ -75,7 +81,7 @@ pub async fn get_fork_choice( json_response(response) } -pub async fn get_fork_choice_ui() -> impl IntoResponse { +pub(crate) async fn get_fork_choice_ui() -> impl IntoResponse { let mut response = FORK_CHOICE_HTML.into_response(); response.headers_mut().insert( header::CONTENT_TYPE, @@ -87,7 +93,7 @@ pub async fn get_fork_choice_ui() -> impl IntoResponse { #[cfg(test)] mod tests { use super::*; - use axum::{Router, body::Body, http::Request, http::StatusCode, routing::get}; + use axum::{Router, body::Body, http::Request, http::StatusCode}; use ethlambda_storage::{Store, backend::InMemoryBackend}; use http_body_util::BodyExt; use std::sync::Arc; @@ -96,10 +102,7 @@ mod tests { use crate::test_utils::create_test_state; fn build_test_router(store: Store) -> Router { - Router::new() - .route("/lean/v0/fork_choice", get(get_fork_choice)) - .route("/lean/v0/fork_choice/ui", get(get_fork_choice_ui)) - .with_state(store) + routes().with_state(store) } #[tokio::test] diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 9906bfb9..02cab1b7 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -1,15 +1,9 @@ use std::net::{IpAddr, SocketAddr}; -use axum::{ - Extension, Json, Router, - http::{HeaderValue, StatusCode, header}, - response::IntoResponse, - routing::get, -}; +use axum::{Extension, Router}; +use ethlambda_blockchain::ChainEventTx; use ethlambda_storage::Store; use ethlambda_types::aggregator::AggregatorController; -use ethlambda_types::primitives::H256; -use libssz::SszEncode; use tokio_util::sync::CancellationToken; pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; @@ -17,11 +11,15 @@ pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; mod admin; mod blocks; +mod core; +mod events; mod fork_choice; mod heap_profiling; pub mod metrics; pub mod test_driver; +pub(crate) use core::json_response; + #[derive(Debug, Clone)] pub struct RpcConfig { pub http_address: IpAddr, @@ -55,9 +53,12 @@ pub async fn start_rpc_server( config: RpcConfig, store: Store, aggregator: AggregatorController, + chain_events: ChainEventTx, shutdown: CancellationToken, ) -> Result<(), std::io::Error> { - let api_router = build_api_router(store).layer(Extension(aggregator)); + let api_router = build_api_router(store) + .layer(Extension(aggregator)) + .layer(Extension(chain_events)); let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); @@ -100,32 +101,17 @@ pub async fn start_rpc_server( /// know about it and admin handlers extract it independently. fn build_api_router(store: Store) -> Router { Router::new() - .route("/lean/v0/health", get(metrics::get_health)) - .route("/lean/v0/states/finalized", get(get_latest_finalized_state)) - .route("/lean/v0/blocks/finalized", get(get_latest_finalized_block)) - .route( - "/lean/v0/checkpoints/justified", - get(get_latest_justified_state), - ) - .route("/lean/v0/fork_choice", get(fork_choice::get_fork_choice)) - .route( - "/lean/v0/fork_choice/ui", - get(fork_choice::get_fork_choice_ui), - ) - .route("/lean/v0/blocks/{block_id}", get(blocks::get_block)) - .route( - "/lean/v0/blocks/{block_id}/header", - get(blocks::get_block_header), - ) - .route( - "/lean/v0/admin/aggregator", - get(admin::get_aggregator).post(admin::post_aggregator), - ) + .merge(core::routes()) + .merge(blocks::routes()) + .merge(events::routes()) + .merge(fork_choice::routes()) + .merge(admin::routes()) .with_state(store) } /// Build the debug router for profiling endpoints. fn build_debug_router() -> Router { + use axum::routing::get; Router::new() .route("/debug/pprof/allocs", get(heap_profiling::handle_get_heap)) .route( @@ -134,59 +120,6 @@ fn build_debug_router() -> Router { ) } -async fn get_latest_finalized_state( - axum::extract::State(store): axum::extract::State, -) -> impl IntoResponse { - let finalized = store.latest_finalized(); - let mut state = store - .get_state(&finalized.root) - .expect("finalized state exists"); - - // Zero state_root to match the canonical post-state representation. - // The spec's state_transition sets state_root to zero during process_block_header, - // and only fills it in lazily at the next slot's process_slots. - // Serving the canonical form ensures checkpoint sync interoperability. - state.latest_block_header.state_root = H256::ZERO; - - ssz_response(state.to_ssz()) -} - -async fn get_latest_finalized_block( - axum::extract::State(store): axum::extract::State, -) -> impl IntoResponse { - let finalized = store.latest_finalized(); - // Returns 404 for genesis since it doesn't have a valid signature - match store.get_signed_block(&finalized.root) { - Some(block) => ssz_response(block.to_ssz()), - None => StatusCode::NOT_FOUND.into_response(), - } -} - -async fn get_latest_justified_state( - axum::extract::State(store): axum::extract::State, -) -> impl IntoResponse { - let checkpoint = store.latest_justified(); - json_response(checkpoint) -} - -fn json_response(value: T) -> axum::response::Response { - let mut response = Json(value).into_response(); - response.headers_mut().insert( - header::CONTENT_TYPE, - HeaderValue::from_static(JSON_CONTENT_TYPE), - ); - response -} - -fn ssz_response(bytes: Vec) -> axum::response::Response { - let mut response = bytes.into_response(); - response.headers_mut().insert( - header::CONTENT_TYPE, - HeaderValue::from_static(SSZ_CONTENT_TYPE), - ); - response -} - #[cfg(test)] pub(crate) mod test_utils { use ethlambda_storage::{StorageBackend, Table}; @@ -267,7 +200,7 @@ pub(crate) mod test_utils { #[cfg(test)] mod tests { use super::*; - use axum::{body::Body, http::Request}; + use axum::{body::Body, http::Request, http::StatusCode, http::header}; use ethlambda_storage::{ForkCheckpoints, Store, backend::InMemoryBackend}; use http_body_util::BodyExt; use serde_json::json; diff --git a/crates/net/rpc/src/test_driver.rs b/crates/net/rpc/src/test_driver.rs index c963d1ff..ec7d9047 100644 --- a/crates/net/rpc/src/test_driver.rs +++ b/crates/net/rpc/src/test_driver.rs @@ -347,7 +347,12 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { } (None, None) => return Err("tick step missing time and interval".to_string()), }; - store::on_tick(store, timestamp_ms, step.has_proposal.unwrap_or(false)); + store::on_tick( + store, + timestamp_ms, + step.has_proposal.unwrap_or(false), + None, + ); Ok(()) } "block" => { @@ -361,7 +366,7 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { if step.tick_to_slot { let block_time_ms = store.config().genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(store, block_time_ms, true); + store::on_tick(store, block_time_ms, true, None); } store::on_block_without_verification(store, signed_block).map_err(|e| e.to_string()) }