From 28cec062fd76d120f271d1569b4a3c94350be6af Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 22 May 2026 22:20:53 -0500 Subject: [PATCH 1/2] Bound streaming gRPC frame sizes Reject oversized server-streaming gRPC messages as soon as their frame header is available. Also reject compressed stream frames consistently and avoid unchecked frame length arithmetic before slicing buffers. --- ldk-server-client/src/client.rs | 80 +++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 4 deletions(-) diff --git a/ldk-server-client/src/client.rs b/ldk-server-client/src/client.rs index 4c08d738..6dbdf9d3 100644 --- a/ldk-server-client/src/client.rs +++ b/ldk-server-client/src/client.rs @@ -71,6 +71,12 @@ use crate::error::LdkServerErrorCode::{ type StreamingClient = HyperClient, HyperBody>; +const GRPC_FRAME_HEADER_LEN: usize = 5; + +// Applies to each server-streaming gRPC message. Graph RPCs use the unary client path and are not +// constrained by this limit. +const MAX_GRPC_STREAM_MESSAGE_LEN: usize = 4 * 1024 * 1024; + /// Client to access a hosted instance of LDK Server via gRPC. /// /// The client requires the server's TLS certificate to be provided for verification. @@ -568,19 +574,43 @@ impl GrpcStream { pub async fn next_message(&mut self) -> Option> { loop { // Try to decode a complete gRPC frame from the buffer - if self.buf.len() >= 5 { + if self.buf.len() >= GRPC_FRAME_HEADER_LEN { + if self.buf[0] != 0 { + return Some(Err(LdkServerError::new( + InternalError, + "gRPC stream compression is not supported", + ))); + } let msg_len = u32::from_be_bytes([self.buf[1], self.buf[2], self.buf[3], self.buf[4]]) as usize; - if self.buf.len() >= 5 + msg_len { - let proto_bytes = &self.buf[5..5 + msg_len]; + if msg_len > MAX_GRPC_STREAM_MESSAGE_LEN { + return Some(Err(LdkServerError::new( + InternalError, + format!( + "gRPC stream message exceeds maximum size of {} bytes", + MAX_GRPC_STREAM_MESSAGE_LEN + ), + ))); + } + let frame_len = match GRPC_FRAME_HEADER_LEN.checked_add(msg_len) { + Some(frame_len) => frame_len, + None => { + return Some(Err(LdkServerError::new( + InternalError, + "gRPC stream frame length overflow", + ))); + }, + }; + if self.buf.len() >= frame_len { + let proto_bytes = &self.buf[GRPC_FRAME_HEADER_LEN..frame_len]; let result = M::decode(proto_bytes).map_err(|e| { LdkServerError::new( InternalError, format!("Failed to decode gRPC stream message: {}", e), ) }); - self.buf.drain(..5 + msg_len); + self.buf.drain(..frame_len); return Some(result); } } @@ -713,6 +743,48 @@ mod tests { assert!(stream.next_message().await.is_none()); } + #[tokio::test] + async fn test_event_stream_rejects_oversized_frame_header() { + let (mut sender, body) = Body::channel(); + sender.send_data(vec![0u8, 0xff, 0xff, 0xff, 0xff].into()).await.unwrap(); + drop(sender); + + let mut stream: EventStream = GrpcStream { + body, + buf: Vec::new(), + trailers_checked: false, + _marker: std::marker::PhantomData, + }; + + let result = stream.next_message().await.unwrap().unwrap_err(); + assert_eq!(result.error_code, InternalError); + assert_eq!( + result.message, + format!( + "gRPC stream message exceeds maximum size of {} bytes", + MAX_GRPC_STREAM_MESSAGE_LEN + ) + ); + } + + #[tokio::test] + async fn test_event_stream_rejects_compressed_frame() { + let (mut sender, body) = Body::channel(); + sender.send_data(vec![1u8, 0, 0, 0, 0].into()).await.unwrap(); + drop(sender); + + let mut stream: EventStream = GrpcStream { + body, + buf: Vec::new(), + trailers_checked: false, + _marker: std::marker::PhantomData, + }; + + let result = stream.next_message().await.unwrap().unwrap_err(); + assert_eq!(result.error_code, InternalError); + assert_eq!(result.message, "gRPC stream compression is not supported"); + } + #[test] fn test_grpc_code_to_error_all_known_codes() { let cases = [ From 1ce6b292c2987d2cffd0078c2939a2e9b1111530 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 22 May 2026 22:29:45 -0500 Subject: [PATCH 2/2] Bound unary gRPC response reads Read unary client responses incrementally and reject bodies above the configured limit before protobuf decoding. Preallocate only after a valid Content-Length has been checked. --- ldk-server-client/src/client.rs | 64 ++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/ldk-server-client/src/client.rs b/ldk-server-client/src/client.rs index 6dbdf9d3..3e9f53e1 100644 --- a/ldk-server-client/src/client.rs +++ b/ldk-server-client/src/client.rs @@ -73,6 +73,10 @@ type StreamingClient = HyperClient, const GRPC_FRAME_HEADER_LEN: usize = 5; +// Applies to complete unary gRPC responses. The server applies the same cap to unary request +// bodies before protobuf decoding. +const MAX_GRPC_UNARY_RESPONSE_LEN: usize = 10 * 1024 * 1024; + // Applies to each server-streaming gRPC message. Graph RPCs use the unary client path and are not // constrained by this limit. const MAX_GRPC_STREAM_MESSAGE_LEN: usize = 4 * 1024 * 1024; @@ -456,10 +460,7 @@ impl LdkServerClient { return Err(error); } - // Read the response body - let payload = response.bytes().await.map_err(|e| { - LdkServerError::new(InternalError, format!("Failed to read response body: {}", e)) - })?; + let payload = read_grpc_unary_response_body(response).await?; let proto_bytes = decode_grpc_body(&payload) .map_err(|e| LdkServerError::new(InternalError, e.message))?; @@ -514,6 +515,42 @@ impl LdkServerClient { } } +async fn read_grpc_unary_response_body( + mut response: reqwest::Response, +) -> Result, LdkServerError> { + let capacity = if let Some(content_length) = response.content_length() { + check_grpc_unary_response_len(content_length)?; + content_length as usize + } else { + 0 + }; + + let mut payload = Vec::with_capacity(capacity); + while let Some(chunk) = response.chunk().await.map_err(|e| { + LdkServerError::new(InternalError, format!("Failed to read response body: {}", e)) + })? { + let len = payload.len().checked_add(chunk.len()).ok_or_else(|| { + LdkServerError::new(InternalError, "gRPC unary response body length overflow") + })?; + check_grpc_unary_response_len(len as u64)?; + payload.extend_from_slice(&chunk); + } + Ok(payload) +} + +fn check_grpc_unary_response_len(len: u64) -> Result<(), LdkServerError> { + if len > MAX_GRPC_UNARY_RESPONSE_LEN as u64 { + return Err(LdkServerError::new( + InternalError, + format!( + "gRPC unary response exceeds maximum size of {} bytes", + MAX_GRPC_UNARY_RESPONSE_LEN + ), + )); + } + Ok(()) +} + /// Map a gRPC status code to an LdkServerError. fn grpc_code_to_error(code: u32, message: String) -> LdkServerError { match code { @@ -721,6 +758,25 @@ mod tests { assert_eq!(err.message, "gRPC stream became unavailable: server shutting down"); } + #[test] + fn test_grpc_unary_response_len_allows_limit() { + assert!(check_grpc_unary_response_len(MAX_GRPC_UNARY_RESPONSE_LEN as u64).is_ok()); + } + + #[test] + fn test_grpc_unary_response_len_rejects_oversized() { + let err = + check_grpc_unary_response_len(MAX_GRPC_UNARY_RESPONSE_LEN as u64 + 1).unwrap_err(); + assert_eq!(err.error_code, InternalError); + assert_eq!( + err.message, + format!( + "gRPC unary response exceeds maximum size of {} bytes", + MAX_GRPC_UNARY_RESPONSE_LEN + ) + ); + } + #[tokio::test] async fn test_event_stream_surfaces_terminal_grpc_status() { let (mut sender, body) = Body::channel();