feat(p2p): experimental ethp2p erasure-coded broadcast (off by default)#466
feat(p2p): experimental ethp2p erasure-coded broadcast (off by default)#466pablodeymo wants to merge 1 commit into
Conversation
Add an experimental, feature-gated ("ethp2p", off by default) path that
also broadcasts gossip through ethp2p-rs's Reed-Solomon broadcast engine
over a parallel QUIC network, alongside libp2p gossipsub (unchanged).
ethlambda <-> ethlambda only; coexists with gossipsub so mixed-client
devnets keep working.
- crates/net/p2p/src/ethp2p: the Ethp2pBroadcast adapter over ethp2p-rs's
QuicNet transport + broadcast Engine, peer-id derivation
(sha256(secp256k1 pubkey)[..8]), the engine task, and delivery dispatch.
- publish_block/attestation/aggregated tee the same snappy-compressed SSZ
to the engine, keyed by sha256(ssz); reconstructed messages are decoded
and fed into the same blockchain.new_* handlers gossipsub uses (block
import and attestation handling are idempotent, so dual delivery is safe).
- peers are derived from the static bootnodes (ethp2p QUIC port = gossipsub
port + 1); the engine runs in a background task that forwards
reconstructed messages into the P2P actor.
Validated by compilation (feature on and off), clippy -D warnings, fmt, and
an isolated QUIC round-trip test. End-to-end (devnet) validation is a
follow-up. ethp2p-rs (internal repo) is pinned to rev ba3ed8b; building the
feature needs git access to it plus protoc on the build host.
🤖 Kimi Code ReviewReview: Experimental ethp2p broadcast feature (PR #466) This PR introduces an experimental, feature-gated ( 1. Critical: Port arithmetic overflow (Potential runtime misconfiguration)File: Using Recommendation: Use checked arithmetic or explicit bounds checking: let ethp2p_port = bootnode.quic_port.checked_add(1)
.expect("ethp2p port overflow: gossip port must be < 65535");2. Consensus safety: Idempotency assumptionFile: The comment states "Dual delivery is safe: block import and attestation handling are idempotent." This is a critical invariant. If the same attestation arrives via both gossipsub and ethp2p and the import logic is not strictly idempotent (e.g., double-counting in aggregation bits or reward accounting), this could cause consensus failures. Recommendation: Add a debug assertion or metric to detect duplicate deliveries in the blockchain adapter, validating the idempotency claim during testing. 3. Performance: Unnecessary payload cloningFile: The code clones the Recommendation: Since 4. Error handling: Silent dropping of delivered messagesFile: In Recommendation: Consider incrementing a metric (e.g., 5. Code quality: Peer ID derivationFile: bytes.copy_from_slice(&digest[..8]);This panics if Recommendation: Use explicit conversion for clarity: u64::from_be_bytes(digest[..8].try_into().expect("sha256 output length"))6. Code quality: Hex encoding inefficiencyFile: digest.iter().map(|b| format!("{b:02x}")).collect()This allocates a String per byte and is O(n²) due to repeated concatenation. Recommendation: Use 7. Build/Dependency concernsFile: The feature depends on an internal git repo ( Recommendation: Document the 8. Minor: Static channel stringsFile: Channel IDs ("block", "aggregation", "attestation") are hardcoded strings. If these drift from the gossipsub topic names, messages will be delivered to the wrong handlers. Recommendation: Define these as constants shared with the gossipsub module or derive them from the topic generators to ensure they remain synchronized. SummaryThe implementation is sound for an experimental feature. Item 1 (port overflow) should be fixed before any production-adjacent deployment. Item 2 (idempotency) should be verified with a test or assertion. The remaining items are optimizations or hardening suggestions. The feature gating is correct, memory safety is maintained, and the integration with the actor model is idiomatic. Automated review by Kimi (Moonshot AI) · kimi-k2.5 · custom prompt |
🤖 Codex Code ReviewFindings
I did not run a full build in this environment: Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
Greptile SummaryThis PR adds an experimental, feature-gated (
Confidence Score: 4/5Safe to merge for the default (feature-off) build; the ethp2p feature path has a few rough edges worth addressing before enabling in a real devnet. The gossipsub path is byte-for-byte unchanged and the feature gate is consistent throughout. The three issues found — silent port wraparound via wrapping_add(1), double RS-encoding per publish, and the unguarded spin risk on persistent run_one_step errors — all live exclusively inside the ethp2p feature path that is off by default. crates/net/p2p/src/ethp2p/mod.rs (double RS encode, select-loop spin risk) and crates/net/p2p/src/lib.rs (port wraparound in both the local bind address and bootnode peer addresses).
|
| Filename | Overview |
|---|---|
| crates/net/p2p/src/ethp2p/mod.rs | New experimental module wiring ethp2p-rs Reed-Solomon broadcast; contains double RS-encode in publish_bytes and a select! loop with cancel-safety and spin-on-error concerns. |
| crates/net/p2p/src/lib.rs | Wires the ethp2p engine into the P2P actor; uses wrapping_add(1) for QUIC port derivation, which silently wraps to 0 at port 65535. |
| crates/net/p2p/src/gossipsub/handler.rs | Adds tee of snappy-compressed SSZ to ethp2p engine via cfg-gated blocks in publish_block/attestation/aggregated_attestation; gossipsub path is byte-for-byte unchanged. |
| crates/net/p2p/Cargo.toml | Adds optional ethp2p feature with git-pinned ethp2p-broadcast/transport deps; prost version-matched to ethp2p-broadcast to avoid duplication; no changes to default build. |
| Cargo.lock | Lockfile updated with new ethp2p-rs crates and transitive deps (quinn, rcgen, rustls, reed-solomon-erasure, prost 0.13.5); existing deps disambiguated with version suffixes. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant App as Application
participant GH as gossipsub/handler.rs
participant P2PS as P2PServer (actor)
participant ETX as ethp2p_tx (UnboundedSender)
participant ET as run_engine_task
participant ENG as Ethp2pBroadcast (Engine)
participant QUIC as QUIC Network
participant ACT as P2PServer actor (delivery)
participant BC as Blockchain
App->>P2PS: PublishBlock / PublishAttestation / PublishAggregatedAttestation
P2PS->>GH: publish_block / publish_attestation / publish_aggregated_attestation
GH->>GH: SSZ encode + snappy compress
GH-->>QUIC: swarm_handle.publish() (gossipsub, unchanged)
GH->>ETX: "send(PublishCmd{channel, message_id, payload})"
ETX-->>ET: publish_rx.recv()
ET->>ENG: publish_bytes(channel, msg_id, payload)
ENG->>ENG: rs_encode (preamble) + RsStrategy::new_origin (encode again)
ENG->>QUIC: broadcast RS shards via QUIC
QUIC-->>ENG: inbound RS shards from peers
ET->>ENG: run_one_step() [drives engine]
ENG-->>ET: DeliveredMessage (on delivery_rx)
ET->>ACT: actor.send(WrappedEthp2pDelivery)
ACT->>ACT: dispatch_delivered: decompress + SSZ decode
ACT->>BC: blockchain.new_block / new_attestation / new_aggregated_attestation (idempotent)
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant App as Application
participant GH as gossipsub/handler.rs
participant P2PS as P2PServer (actor)
participant ETX as ethp2p_tx (UnboundedSender)
participant ET as run_engine_task
participant ENG as Ethp2pBroadcast (Engine)
participant QUIC as QUIC Network
participant ACT as P2PServer actor (delivery)
participant BC as Blockchain
App->>P2PS: PublishBlock / PublishAttestation / PublishAggregatedAttestation
P2PS->>GH: publish_block / publish_attestation / publish_aggregated_attestation
GH->>GH: SSZ encode + snappy compress
GH-->>QUIC: swarm_handle.publish() (gossipsub, unchanged)
GH->>ETX: "send(PublishCmd{channel, message_id, payload})"
ETX-->>ET: publish_rx.recv()
ET->>ENG: publish_bytes(channel, msg_id, payload)
ENG->>ENG: rs_encode (preamble) + RsStrategy::new_origin (encode again)
ENG->>QUIC: broadcast RS shards via QUIC
QUIC-->>ENG: inbound RS shards from peers
ET->>ENG: run_one_step() [drives engine]
ENG-->>ET: DeliveredMessage (on delivery_rx)
ET->>ACT: actor.send(WrappedEthp2pDelivery)
ACT->>ACT: dispatch_delivered: decompress + SSZ decode
ACT->>BC: blockchain.new_block / new_attestation / new_aggregated_attestation (idempotent)
Comments Outside Diff (2)
-
crates/net/p2p/src/lib.rs, line 1287-1290 (link)Silent port-zero wraparound for QUIC bind and dial addresses
Both the local bind address and each bootnode's dial address use
wrapping_add(1)on au16port. If a gossipsub port is configured as65535, the ethp2p port becomes0. For the local bind (SocketAddr::new(ip, 0)), the OS interprets port 0 as "assign an ephemeral port", so the node silently listens on an unknown, unannounced port. For bootnode dial addresses, port 0 as a destination will fail to connect. Both failures are silent — no error is returned frombuild_swarm. Achecked_add(1)with an explicit panic or propagated error would make misconfiguration immediately visible rather than degrading quietly.Prompt To Fix With AI
This is a comment left during a code review. Path: crates/net/p2p/src/lib.rs Line: 1287-1290 Comment: **Silent port-zero wraparound for QUIC bind and dial addresses** Both the local bind address and each bootnode's dial address use `wrapping_add(1)` on a `u16` port. If a gossipsub port is configured as `65535`, the ethp2p port becomes `0`. For the local bind (`SocketAddr::new(ip, 0)`), the OS interprets port 0 as "assign an ephemeral port", so the node silently listens on an unknown, unannounced port. For bootnode dial addresses, port 0 as a destination will fail to connect. Both failures are silent — no error is returned from `build_swarm`. A `checked_add(1)` with an explicit panic or propagated error would make misconfiguration immediately visible rather than degrading quietly. How can I resolve this? If you propose a fix, please make it concise.
-
crates/net/p2p/src/ethp2p/mod.rs, line 989-1031 (link)Two concerns in the
select!loop: cancel safety and persistent error spinFirst,
tokio::select!cancels the non-selected futures on each iteration. Ifrun_one_stepis not cancel-safe — for example, if it reads a QUIC chunk and then yields before integrating it into the engine state — cancellation when aPublishCmdor a delivered message arrives could silently drop in-flight chunks and corrupt RS reassembly.tokio::select!documentation requires callers to verify cancel-safety; this should be confirmed against theethp2p-rsengine's guarantee.Second, if
run_one_stepreturns errors synchronously (e.g., after the QUIC transport is permanently broken), the loop will spin at full CPU speed while printingwarn!on every iteration, because there is nobreakor backoff path for repeated errors — only thepublish_rxanddelivered_rxNonebranches exit the loop. Adding a consecutive-error counter with a break or an exponential back-off would avoid CPU saturation on transport failure.Prompt To Fix With AI
This is a comment left during a code review. Path: crates/net/p2p/src/ethp2p/mod.rs Line: 989-1031 Comment: **Two concerns in the `select!` loop: cancel safety and persistent error spin** First, `tokio::select!` cancels the non-selected futures on each iteration. If `run_one_step` is not cancel-safe — for example, if it reads a QUIC chunk and then yields before integrating it into the engine state — cancellation when a `PublishCmd` or a delivered message arrives could silently drop in-flight chunks and corrupt RS reassembly. `tokio::select!` documentation requires callers to verify cancel-safety; this should be confirmed against the `ethp2p-rs` engine's guarantee. Second, if `run_one_step` returns errors synchronously (e.g., after the QUIC transport is permanently broken), the loop will spin at full CPU speed while printing `warn!` on every iteration, because there is no `break` or backoff path for repeated errors — only the `publish_rx` and `delivered_rx` `None` branches exit the loop. Adding a consecutive-error counter with a break or an exponential back-off would avoid CPU saturation on transport failure. How can I resolve this? If you propose a fix, please make it concise.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
crates/net/p2p/src/lib.rs:1287-1290
**Silent port-zero wraparound for QUIC bind and dial addresses**
Both the local bind address and each bootnode's dial address use `wrapping_add(1)` on a `u16` port. If a gossipsub port is configured as `65535`, the ethp2p port becomes `0`. For the local bind (`SocketAddr::new(ip, 0)`), the OS interprets port 0 as "assign an ephemeral port", so the node silently listens on an unknown, unannounced port. For bootnode dial addresses, port 0 as a destination will fail to connect. Both failures are silent — no error is returned from `build_swarm`. A `checked_add(1)` with an explicit panic or propagated error would make misconfiguration immediately visible rather than degrading quietly.
### Issue 2 of 3
crates/net/p2p/src/ethp2p/mod.rs:188-199
**Payload is Reed-Solomon encoded twice per `publish_bytes` call**
`rs_encode(payload, &self.config)` encodes the full payload into preamble + shards, but only `preamble` is used — `_shards` is dropped. Then `RsStrategy::new_origin(payload, self.config)` is called on the same `payload`, which internally performs the same RS encoding again to produce the shards it will actually broadcast. For a 64 KiB block this doubles the encoding CPU cost per publish. Consider whether `ethp2p-rs` exposes a constructor that accepts an already-encoded `(preamble, shards)` pair so the work can be shared, or whether `new_origin` can return the preamble directly.
### Issue 3 of 3
crates/net/p2p/src/ethp2p/mod.rs:989-1031
**Two concerns in the `select!` loop: cancel safety and persistent error spin**
First, `tokio::select!` cancels the non-selected futures on each iteration. If `run_one_step` is not cancel-safe — for example, if it reads a QUIC chunk and then yields before integrating it into the engine state — cancellation when a `PublishCmd` or a delivered message arrives could silently drop in-flight chunks and corrupt RS reassembly. `tokio::select!` documentation requires callers to verify cancel-safety; this should be confirmed against the `ethp2p-rs` engine's guarantee.
Second, if `run_one_step` returns errors synchronously (e.g., after the QUIC transport is permanently broken), the loop will spin at full CPU speed while printing `warn!` on every iteration, because there is no `break` or backoff path for repeated errors — only the `publish_rx` and `delivered_rx` `None` branches exit the loop. Adding a consecutive-error counter with a break or an exponential back-off would avoid CPU saturation on transport failure.
Reviews (1): Last reviewed commit: "feat(p2p): experimental ethp2p erasure-c..." | Re-trigger Greptile
| let (preamble, _shards) = | ||
| rs_encode(payload, &self.config).map_err(|e| Ethp2pError::Encode(format!("{e:?}")))?; | ||
| let mut preamble_bytes = Vec::with_capacity(preamble.encoded_len()); | ||
| preamble | ||
| .encode(&mut preamble_bytes) | ||
| .map_err(|e| Ethp2pError::Encode(e.to_string()))?; | ||
| let strategy = RsStrategy::new_origin(payload, self.config) | ||
| .map_err(|e| Ethp2pError::Encode(format!("{e:?}")))?; | ||
| self.engine | ||
| .publish( | ||
| &channel.to_string(), | ||
| message_id.to_string(), |
There was a problem hiding this comment.
Payload is Reed-Solomon encoded twice per
publish_bytes call
rs_encode(payload, &self.config) encodes the full payload into preamble + shards, but only preamble is used — _shards is dropped. Then RsStrategy::new_origin(payload, self.config) is called on the same payload, which internally performs the same RS encoding again to produce the shards it will actually broadcast. For a 64 KiB block this doubles the encoding CPU cost per publish. Consider whether ethp2p-rs exposes a constructor that accepts an already-encoded (preamble, shards) pair so the work can be shared, or whether new_origin can return the preamble directly.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/ethp2p/mod.rs
Line: 188-199
Comment:
**Payload is Reed-Solomon encoded twice per `publish_bytes` call**
`rs_encode(payload, &self.config)` encodes the full payload into preamble + shards, but only `preamble` is used — `_shards` is dropped. Then `RsStrategy::new_origin(payload, self.config)` is called on the same `payload`, which internally performs the same RS encoding again to produce the shards it will actually broadcast. For a 64 KiB block this doubles the encoding CPU cost per publish. Consider whether `ethp2p-rs` exposes a constructor that accepts an already-encoded `(preamble, shards)` pair so the work can be shared, or whether `new_origin` can return the preamble directly.
How can I resolve this? If you propose a fix, please make it concise.
🤖 Claude Code ReviewNow I have enough data to synthesize. Let me produce the final review. Review: feat(p2p): experimental ethp2p erasure-coded broadcast (off by default)This PR adds a feature-gated ( Four findings survive verification:
When
Both the local bind address and each peer's dial address are derived as
All three publish tee paths do: let _ = tx.send(crate::ethp2p::PublishCmd { ... });
CLAUDE.md (project root) is explicit: "AVOID: Using tx.send(crate::ethp2p::PublishCmd { ... })
.inspect_err(|_| warn!("ethp2p: publish channel closed, engine stopped"));Minor — info!(
local_peer = params.local_peer,
peers = params.peers.len(),
...CLAUDE.md quotes let local_peer = params.local_peer;
let peer_count = params.peers.len();
info!(local_peer, peer_count, bind = %params.bind_addr, "ethp2p broadcast engine started");Automated review by Claude (Anthropic) · sonnet · custom prompt |
🗒️ Description / Motivation
Adds an experimental, feature-gated path that also broadcasts gossip through ethp2p-rs's Reed–Solomon erasure-coded broadcast engine over a parallel QUIC network, alongside libp2p gossipsub. It's a step toward evaluating erasure-coded broadcast for block/data propagation in ethlambda.
ethp2pcargo feature, off by default — the normal build/CI is unaffected.Follows the phased plan in
ethp2p-rs/docs/ethlambda-integration-plan.md(Phases 1+2).What Changed
crates/net/p2p/src/ethp2p/(new):Ethp2pBroadcastadapter over ethp2p-rs'sQuicNettransport + broadcastEngine;derive_peer_id(sha256(secp256k1 pubkey)[..8]); the engine task (run_engine_task);dispatch_delivered; channel constants +message_id(hex(sha256(ssz))).crates/net/p2p/src/lib.rs:WrappedEthp2pDeliveryactor message +Handler;P2PServer.ethp2p_publish;build_swarmderives the ethp2p peer mesh from the static bootnodes (ethp2p QUIC port = gossipsub port + 1);P2P::spawnspawns the engine task.crates/net/p2p/src/gossipsub/handler.rs:publish_block/publish_attestation/publish_aggregated_attestationeach tee the same snappy-compressed SSZ to the engine.crates/net/p2p/Cargo.toml/Cargo.lock: theethp2pfeature + optional git deps (ethp2p-broadcast,ethp2p-transport,prost), pinned to ethp2p-rsba3ed8b.Correctness / Behavior Guarantees
#[cfg(feature = "ethp2p")]; gossipsub is byte-for-byte unchanged.blockchain.new_block/new_attestation/new_aggregated_attestationhandlers gossipsub uses, and block import (on_block_core:if store.has_state(&block_root) { return Ok(()) }) and attestation handling are idempotent.Tests Added / Run
ethp2p::tests): peer-id derivation stability + a 64 KiB block-sized round trip over real QUIC.cargo test -p ethlambda-p2p --features ethp2p— green.make fmt,make lint, and build all pass feature-on and feature-off.Related Issues / PRs
ethp2p-rs/docs/ethlambda-integration-plan.md).✅ Verification Checklist
make fmt— cleanmake lint(clippy with-D warnings) — clean (feature on and off)cargo build/test -p ethlambda-p2p— passing (feature on and off)Notes for reviewers
ethp2p-rsis an internal repo, so building with theethp2pfeature needs git credentials for it (CI already setsCARGO_NET_GIT_FETCH_WITH_CLI=true) plusprotocon the build host. The default feature-off build needs neither. Reviewer focus: the feature-gating hygiene and the publish-tee / receive-inject symmetry.