diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 58b44472..515bf71e 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -40,8 +40,9 @@ use crate::{ publish_attestation, publish_block, }, req_resp::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, - STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, + MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, + fetch_block_from_peer, }, swarm_adapter::SwarmHandle, }; @@ -154,6 +155,10 @@ pub fn build_swarm( StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1), request_response::ProtocolSupport::Full, ), + ( + StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), + request_response::ProtocolSupport::Full, + ), ], Default::default(), ); diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index cbd30a70..7805cf6d 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -7,8 +7,8 @@ use tracing::{debug, trace, warn}; use super::{ encoding::{MAX_PAYLOAD_SIZE, decode_payload, write_payload}, messages::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload, - STATUS_PROTOCOL_V1, Status, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, + ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status, }, }; @@ -21,6 +21,7 @@ fn protocol_label(protocol: &str) -> &'static str { match protocol { STATUS_PROTOCOL_V1 => "status", BLOCKS_BY_ROOT_PROTOCOL_V1 => "blocks_by_root", + BLOCKS_BY_RANGE_PROTOCOL_V1 => "blocks_by_range", _ => "unknown", } } @@ -59,6 +60,12 @@ impl libp2p::request_response::Codec for Codec { })?; Ok(Request::BlocksByRoot(request)) } + BLOCKS_BY_RANGE_PROTOCOL_V1 => { + let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + Ok(Request::BlocksByRange(request)) + } _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -77,7 +84,9 @@ impl libp2p::request_response::Codec for Codec { let label = protocol_label(protocol.as_ref()); match protocol.as_ref() { STATUS_PROTOCOL_V1 => decode_status_response(io, label).await, - BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io, label).await, + BLOCKS_BY_ROOT_PROTOCOL_V1 | BLOCKS_BY_RANGE_PROTOCOL_V1 => { + decode_blocks_response(io, label).await + } _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -99,6 +108,7 @@ impl libp2p::request_response::Codec for Codec { let encoded = match req { Request::Status(status) => status.to_ssz(), Request::BlocksByRoot(request) => request.to_ssz(), + Request::BlocksByRange(request) => request.to_ssz(), }; let compressed_size = write_payload(io, &encoded).await?; @@ -132,7 +142,7 @@ impl libp2p::request_response::Codec for Codec { ); Ok(()) } - ResponsePayload::BlocksByRoot(blocks) => { + ResponsePayload::Blocks(blocks) => { // Write each block as a separate chunk. // Encode first, then check size before writing the SUCCESS // code byte. This avoids corrupting the stream if a block @@ -143,7 +153,7 @@ impl libp2p::request_response::Codec for Codec { if encoded.len() > MAX_PAYLOAD_SIZE - 1024 { warn!( size = encoded.len(), - "Skipping oversized block in BlocksByRoot response" + "Skipping oversized block in block response" ); continue; } @@ -230,7 +240,7 @@ where Ok(Response::success(ResponsePayload::Status(status))) } -/// Decodes a BlocksByRoot protocol response from a multi-chunk response stream. +/// Decodes a block protocol response from a multi-chunk response stream. /// /// Reads chunks until EOF, collecting successfully decoded blocks. Each chunk has /// its own response code - chunks with error codes are logged and skipped rather @@ -253,7 +263,7 @@ where /// /// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this /// function to return `Err` - they are logged and skipped. -async fn decode_blocks_by_root_response(io: &mut T, protocol_label: &str) -> io::Result +async fn decode_blocks_response(io: &mut T, protocol_label: &str) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -291,5 +301,5 @@ where blocks.push(block); } - Ok(Response::success(ResponsePayload::BlocksByRoot(blocks))) + Ok(Response::success(ResponsePayload::Blocks(blocks))) } diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 21bbbd10..d7d44614 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use ethlambda_storage::Store; use libp2p::{PeerId, request_response}; @@ -12,7 +12,9 @@ use ethlambda_types::primitives::HashTreeRoot as _; use ethlambda_types::{block::SignedBlock, primitives::H256}; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, Status, + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, + Request, Response, ResponsePayload, Status, + messages::{ResponseCode, error_message}, }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, @@ -42,6 +44,13 @@ pub async fn handle_req_resp_message( ); handle_blocks_by_root_request(server, request, channel, peer).await; } + Request::BlocksByRange(request) => { + info!( + kind = "blocks_by_range_request", + peer_count, "P2P message received" + ); + handle_blocks_by_range_request(server, request, channel, peer).await; + } } } request_response::Message::Response { @@ -55,11 +64,8 @@ pub async fn handle_req_resp_message( info!(kind = "status_response", peer_count, "P2P message received"); handle_status_response(status, peer).await; } - ResponsePayload::BlocksByRoot(blocks) => { - info!( - kind = "blocks_by_root_response", - peer_count, "P2P message received" - ); + ResponsePayload::Blocks(blocks) => { + info!(kind = "blocks_response", peer_count, "P2P message received"); handle_blocks_by_root_response(server, blocks, peer, request_id, ctx) .await; } @@ -136,10 +142,99 @@ async fn handle_blocks_by_root_request( let found = blocks.len(); info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); - let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); + let response = Response::success(ResponsePayload::Blocks(blocks)); + server.swarm_handle.send_response(channel, response); +} + +async fn handle_blocks_by_range_request( + server: &mut P2PServer, + request: BlocksByRangeRequest, + channel: request_response::ResponseChannel, + peer: PeerId, +) { + info!( + %peer, + start_slot = request.start_slot, + count = request.count, + step = request.step, + "Received BlocksByRange request" + ); + + if request.step == 0 || request.count == 0 || request.count > MAX_REQUEST_BLOCKS { + let response = Response::error( + ResponseCode::INVALID_REQUEST, + error_message("invalid BlocksByRange request"), + ); + server.swarm_handle.send_response(channel, response); + return; + } + + let blocks = canonical_blocks_by_range( + &server.store, + request.start_slot, + request.count, + request.step, + ); + + info!( + %peer, + start_slot = request.start_slot, + count = request.count, + step = request.step, + found = blocks.len(), + "Responding to BlocksByRange request" + ); + + let response = Response::success(ResponsePayload::Blocks(blocks)); server.swarm_handle.send_response(channel, response); } +fn canonical_blocks_by_range( + store: &Store, + start_slot: u64, + count: u64, + step: u64, +) -> Vec { + if count == 0 { + return Vec::new(); + } + + let Some(end_slot) = count + .checked_sub(1) + .and_then(|value| value.checked_mul(step)) + .and_then(|last_offset| start_slot.checked_add(last_offset)) + else { + return Vec::new(); + }; + + let mut roots_by_slot = HashMap::new(); + let mut current_root = store.head(); + + while !current_root.is_zero() { + let Some(header) = store.get_block_header(¤t_root) else { + break; + }; + + if header.slot < start_slot { + break; + } + + if header.slot <= end_slot && (header.slot - start_slot) % step == 0 { + roots_by_slot.insert(header.slot, current_root); + } + + current_root = header.parent_root; + } + + (0..count) + .filter_map(|index| { + let slot = start_slot.checked_add(index.checked_mul(step)?)?; + let root = roots_by_slot.get(&slot)?; + store.get_signed_block(root) + }) + .collect() +} + async fn handle_blocks_by_root_response( server: &mut P2PServer, blocks: Vec, @@ -313,3 +408,67 @@ async fn handle_fetch_failure( send_after(backoff, ctx.clone(), p2p_protocol::RetryBlockFetch { root }); } + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_storage::{ForkCheckpoints, backend::InMemoryBackend}; + use ethlambda_types::{ + attestation::XmssSignature, + block::{Block, BlockBody, BlockSignatures}, + signature::SIGNATURE_SIZE, + state::State, + }; + use libssz_types::SszList; + use std::sync::Arc; + + fn signed_block(slot: u64, parent_root: H256) -> SignedBlock { + SignedBlock { + message: Block { + slot, + proposer_index: 0, + parent_root, + state_root: H256::ZERO, + body: BlockBody::default(), + }, + signature: BlockSignatures { + attestation_signatures: SszList::new(), + proposer_signature: XmssSignature::try_from(vec![0u8; SIGNATURE_SIZE]).unwrap(), + }, + } + } + + #[test] + fn blocks_by_range_returns_canonical_blocks_in_requested_order() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store::from_anchor_state(backend, State::from_genesis(0, vec![])); + + let block_1 = signed_block(1, store.head()); + let root_1 = block_1.message.hash_tree_root(); + store.insert_signed_block(root_1, block_1); + + let block_2 = signed_block(2, root_1); + let root_2 = block_2.message.hash_tree_root(); + store.insert_signed_block(root_2, block_2); + + let side_block_3 = signed_block(3, root_1); + let side_root_3 = side_block_3.message.hash_tree_root(); + store.insert_signed_block(side_root_3, side_block_3); + + let block_4 = signed_block(4, root_2); + let root_4 = block_4.message.hash_tree_root(); + store.insert_signed_block(root_4, block_4); + store.update_checkpoints(ForkCheckpoints::head_only(root_4)); + + let blocks = canonical_blocks_by_range(&store, 1, 4, 1); + let slots: Vec<_> = blocks.iter().map(|block| block.message.slot).collect(); + let roots: Vec<_> = blocks + .iter() + .map(|block| block.message.hash_tree_root()) + .collect(); + + assert_eq!(slots, vec![1, 2, 4]); + assert_eq!(roots, vec![root_1, root_2, root_4]); + assert!(!roots.contains(&side_root_3)); + } +} diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index d90b6c91..f64bce7b 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -4,11 +4,14 @@ use libssz_types::SszList; pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy"; pub const BLOCKS_BY_ROOT_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_root/1/ssz_snappy"; +pub const BLOCKS_BY_RANGE_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_range/1/ssz_snappy"; +pub const MAX_REQUEST_BLOCKS: u64 = 1024; // Maximum number of blocks in a single request (1024). #[derive(Debug, Clone)] pub enum Request { Status(Status), BlocksByRoot(BlocksByRootRequest), + BlocksByRange(BlocksByRangeRequest), } #[derive(Debug, Clone)] @@ -88,7 +91,7 @@ impl std::fmt::Debug for ResponseCode { #[allow(clippy::large_enum_variant)] pub enum ResponsePayload { Status(Status), - BlocksByRoot(Vec), + Blocks(Vec), } #[derive(Debug, Clone, SszEncode, SszDecode)] @@ -106,8 +109,6 @@ pub type ErrorMessage = SszList; /// Helper to create an ErrorMessage from a string. /// Debug builds panic if message exceeds 256 bytes (programming error). /// Release builds truncate to 256 bytes. -#[expect(dead_code)] -// TODO: map errors to req/resp error messages pub fn error_message(msg: impl AsRef) -> ErrorMessage { let bytes = msg.as_ref().as_bytes(); debug_assert!( @@ -130,3 +131,10 @@ pub fn error_message(msg: impl AsRef) -> ErrorMessage { pub struct BlocksByRootRequest { pub roots: RequestedBlockRoots, } + +#[derive(Debug, Clone, SszEncode, SszDecode)] +pub struct BlocksByRangeRequest { + pub start_slot: u64, + pub count: u64, + pub step: u64, +} diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index c550ba37..11acb79f 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -7,6 +7,7 @@ pub use codec::Codec; pub use encoding::{MAX_COMPRESSED_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE}; pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; pub use messages::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, RequestedBlockRoots, Response, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, + BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, RequestedBlockRoots, Response, ResponsePayload, STATUS_PROTOCOL_V1, Status, };