Skip to content
4 changes: 3 additions & 1 deletion crates/blockchain/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,12 @@ pub fn aggregate_job(job: AggregationJob) -> Option<AggregatedGroupOutput> {
participants.dedup();

let aggregation_bits = aggregation_bits_from_validator_indices(&participants);
let proof = AggregatedSignatureProof::new(aggregation_bits, proof_data);
metrics::observe_aggregated_proof_size(proof.proof_data.len());

Some(AggregatedGroupOutput {
hashed: job.hashed,
proof: AggregatedSignatureProof::new(aggregation_bits, proof_data),
proof,
participants,
keys_to_delete: job.keys_to_delete,
})
Expand Down
25 changes: 25 additions & 0 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,25 @@ static LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS: std::sync::L
.unwrap()
});

static LEAN_AGGREGATED_PROOF_SIZE_BYTES: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_aggregated_proof_size_bytes",
"Bytes size of an aggregated signature proof's proof_data field",
vec![
1024.0,
4096.0,
16384.0,
65536.0,
131_072.0,
262_144.0,
524_288.0,
1_048_576.0
]
)
.unwrap()
});

static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
Expand Down Expand Up @@ -396,6 +415,7 @@ pub fn init() {
std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_AGGREGATED_PROOF_SIZE_BYTES);
std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH);
// Block production
std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS);
Expand Down Expand Up @@ -530,6 +550,11 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard {
TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS)
}

/// Observe the size of an aggregated signature proof's `proof_data` bytes.
pub fn observe_aggregated_proof_size(bytes: usize) {
LEAN_AGGREGATED_PROOF_SIZE_BYTES.observe(bytes as f64);
}

/// Observe committee-signature aggregation duration. Measured in the
/// off-thread worker and reported back via an `AggregationDone` message, so a
/// drop-guard that crosses the thread boundary is not appropriate here.
Expand Down
7 changes: 4 additions & 3 deletions crates/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ pub mod timing;

// Re-export prometheus types and macros we use
pub use prometheus::{
Encoder, Error as PrometheusError, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
TextEncoder, core::Collector, gather, register_histogram, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
Encoder, Error as PrometheusError, Histogram, HistogramVec, IntCounter, IntCounterVec,
IntGauge, IntGaugeVec, TextEncoder, core::Collector, gather, register_histogram,
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec,
};

// Re-export commonly used items
Expand Down
15 changes: 12 additions & 3 deletions crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
match topic_kind {
Some(BLOCK_TOPIC_KIND) => {
info!(kind = "block", peer_count, "P2P message received");
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped block"))
else {
return;
};
metrics::observe_gossip_block_size(uncompressed_data.len());
metrics::observe_gossip_block_size(uncompressed_data.len(), compressed_len);

let Ok(signed_block) = SignedBlock::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped block"))
Expand Down Expand Up @@ -64,12 +65,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
}
Some(AGGREGATION_TOPIC_KIND) => {
info!(kind = "aggregation", peer_count, "P2P message received");
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation"))
else {
return;
};
metrics::observe_gossip_aggregation_size(uncompressed_data.len());
metrics::observe_gossip_aggregation_size(uncompressed_data.len(), compressed_len);

let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation"))
Expand All @@ -95,12 +97,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
}
Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => {
info!(kind = "attestation", peer_count, "P2P message received");
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation"))
else {
return;
};
metrics::observe_gossip_attestation_size(uncompressed_data.len());
metrics::observe_gossip_attestation_size(uncompressed_data.len(), compressed_len);

let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped attestation"))
Expand Down Expand Up @@ -142,6 +145,8 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte
// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

metrics::observe_gossip_attestation_size(ssz_bytes.len(), compressed.len());

// Look up subscribed topic or construct on-the-fly for gossipsub fanout
let topic = server
.attestation_topics
Expand Down Expand Up @@ -175,6 +180,8 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) {
// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len());

// Publish to gossipsub
server
.swarm_handle
Expand All @@ -201,6 +208,8 @@ pub async fn publish_aggregated_attestation(
// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len());

// Publish to the aggregation topic
server
.swarm_handle
Expand Down
112 changes: 97 additions & 15 deletions crates/net/p2p/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,16 @@ static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock:
});

// --- Gossip Message Size Histograms ---
//
// `compression` label values:
// - `"raw"`: size of SSZ-encoded payload before snappy compression
// - `"snappy"`: size of the on-wire snappy-compressed payload

static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_gossip_block_size_bytes",
"Bytes size of a gossip block message",
&["compression"],
vec![
10_000.0,
50_000.0,
Expand All @@ -97,19 +102,21 @@ static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
.unwrap()
});

static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_gossip_attestation_size_bytes",
"Bytes size of a gossip attestation message",
&["compression"],
vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0]
)
.unwrap()
});

static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_gossip_aggregation_size_bytes",
"Bytes size of a gossip aggregated attestation message",
&["compression"],
vec![
1024.0,
4096.0,
Expand All @@ -124,19 +131,94 @@ static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|
.unwrap()
});

/// Observe the size of a gossip block message.
pub fn observe_gossip_block_size(bytes: usize) {
LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64);
/// Observe the size of a gossip block message, recording both the raw SSZ
/// size and the snappy-compressed on-wire size.
pub fn observe_gossip_block_size(raw: usize, snappy: usize) {
LEAN_GOSSIP_BLOCK_SIZE_BYTES
.with_label_values(&["raw"])
.observe(raw as f64);
LEAN_GOSSIP_BLOCK_SIZE_BYTES
.with_label_values(&["snappy"])
.observe(snappy as f64);
}

/// Observe the size of a gossip attestation message.
pub fn observe_gossip_attestation_size(bytes: usize) {
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64);
/// Observe the size of a gossip attestation message, recording both the raw
/// SSZ size and the snappy-compressed on-wire size.
pub fn observe_gossip_attestation_size(raw: usize, snappy: usize) {
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES
.with_label_values(&["raw"])
.observe(raw as f64);
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES
.with_label_values(&["snappy"])
.observe(snappy as f64);
}

/// Observe the size of a gossip aggregated attestation message.
pub fn observe_gossip_aggregation_size(bytes: usize) {
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64);
/// Observe the size of a gossip aggregated attestation message, recording both
/// the raw SSZ size and the snappy-compressed on-wire size.
pub fn observe_gossip_aggregation_size(raw: usize, snappy: usize) {
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES
.with_label_values(&["raw"])
.observe(raw as f64);
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES
.with_label_values(&["snappy"])
.observe(snappy as f64);
}

// --- Req/Resp Message Size Histograms ---
//
// `protocol` label: `"status"` or `"blocks_by_root"`.
// `compression` label: `"raw"` (SSZ) or `"snappy"` (on-wire, varint-prefixed
// snappy frame bytes only — the response-code byte is not included).

static LEAN_REQRESP_REQUEST_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_reqresp_request_size_bytes",
"Bytes size of a req/resp request",
&["protocol", "compression"],
vec![64.0, 128.0, 256.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0]
)
.unwrap()
});

static LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_reqresp_response_chunk_size_bytes",
"Bytes size of a single req/resp response chunk",
&["protocol", "compression"],
vec![
128.0,
1024.0,
10_000.0,
100_000.0,
500_000.0,
1_000_000.0,
5_000_000.0,
10_000_000.0
]
)
.unwrap()
});

/// Observe the size of a req/resp request, recording both the raw SSZ size
/// and the snappy-compressed on-wire size.
pub fn observe_reqresp_request_size(protocol: &str, raw: usize, snappy: usize) {
LEAN_REQRESP_REQUEST_SIZE_BYTES
.with_label_values(&[protocol, "raw"])
.observe(raw as f64);
LEAN_REQRESP_REQUEST_SIZE_BYTES
.with_label_values(&[protocol, "snappy"])
.observe(snappy as f64);
}

/// Observe the size of a single req/resp response chunk, recording both the
/// raw SSZ size and the snappy-compressed on-wire size.
pub fn observe_reqresp_response_chunk_size(protocol: &str, raw: usize, snappy: usize) {
LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES
.with_label_values(&[protocol, "raw"])
.observe(raw as f64);
LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES
.with_label_values(&[protocol, "snappy"])
.observe(snappy as f64);
}

/// Set the attestation committee subnet gauge.
Expand Down
Loading
Loading