Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 175 additions & 14 deletions src/electrum/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl DiscoveryManager {
pub fn add_server_request(&self, added_by: IpAddr, features: ServerFeatures) -> Result<()> {
self.verify_compatibility(&features)?;

let mut queue = self.queue.write().unwrap();
let mut queue = self.queue.write().unwrap_or_else(|e| e.into_inner());
ensure!(queue.len() < MAX_QUEUE_SIZE, "queue size exceeded");

// TODO optimize
Expand Down Expand Up @@ -205,7 +205,7 @@ impl DiscoveryManager {
/// before being removed due to unavailability.
pub fn add_default_server(&self, hostname: Hostname, services: Vec<Service>) -> Result<()> {
let addr = ServerAddr::resolve(&hostname)?;
let mut queue = self.queue.write().unwrap();
let mut queue = self.queue.write().unwrap_or_else(|e| e.into_inner());
queue.extend(
services
.into_iter()
Expand All @@ -219,7 +219,7 @@ impl DiscoveryManager {
// XXX return a random sample instead of everything?
self.healthy
.read()
.unwrap()
.unwrap_or_else(|e| e.into_inner())
.iter()
.map(|(addr, server)| {
ServerEntry(addr.clone(), server.hostname.clone(), server.feature_strs())
Expand All @@ -234,14 +234,19 @@ impl DiscoveryManager {
/// Run the next health check in the queue (a single one)
fn run_health_check(&self) -> Result<()> {
// abort if there are no entries in the queue, or its still too early for the next one up
if self.queue.read().unwrap().peek().map_or(true, |next| {
if self.queue.read().unwrap_or_else(|e| e.into_inner()).peek().map_or(true, |next| {
next.last_check
.map_or(false, |t| t.elapsed() < HEALTH_CHECK_FREQ)
}) {
return Ok(());
}

let mut job = self.queue.write().unwrap().pop().unwrap();
// Only spawn_jobs_thread calls pop(), so this is the sole consumer; the
// let-else guards against the (currently unreachable) race where the queue
// drains between the peek above and this pop.
let Some(mut job) = self.queue.write().unwrap_or_else(|e| e.into_inner()).pop() else {
return Ok(());
};
debug!("processing {:?}", job);

let was_healthy = job.is_healthy();
Expand All @@ -259,7 +264,7 @@ impl DiscoveryManager {
job.last_healthy = job.last_check;
job.consecutive_failures = 0;
// schedule the next health check
self.queue.write().unwrap().push(job);
self.queue.write().unwrap_or_else(|e| e.into_inner()).push(job);

Ok(())
}
Expand All @@ -275,7 +280,7 @@ impl DiscoveryManager {
job.consecutive_failures += 1;

if job.should_retry() {
self.queue.write().unwrap().push(job);
self.queue.write().unwrap_or_else(|e| e.into_inner()).push(job);
} else {
debug!("giving up on {:?}", job);
}
Expand All @@ -288,7 +293,11 @@ impl DiscoveryManager {
/// Upsert the server/service into the healthy set
fn save_healthy_service(&self, job: &HealthCheck, features: ServerFeatures) {
let addr = job.addr.clone();
let mut healthy = self.healthy.write().unwrap();
debug!(
"saving healthy service hostname='{}' addr='{}' service={:?}",
job.hostname, addr, job.service
);
let mut healthy = self.healthy.write().unwrap_or_else(|e| e.into_inner());
healthy
.entry(addr)
.or_insert_with(|| Server::new(job.hostname.clone(), features))
Expand All @@ -299,16 +308,29 @@ impl DiscoveryManager {
/// Remove the service, and remove the server entirely if it has no other reamining healthy services
fn remove_unhealthy_service(&self, job: &HealthCheck) {
let addr = job.addr.clone();
let mut healthy = self.healthy.write().unwrap();
debug!(
"removing unhealthy service hostname='{}' addr='{}' service={:?}",
job.hostname, addr, job.service
);
let mut healthy = self.healthy.write().unwrap_or_else(|e| e.into_inner());
if let Entry::Occupied(mut entry) = healthy.entry(addr) {
let server = entry.get_mut();
assert!(server.services.remove(&job.service));
if !server.services.remove(&job.service) {
warn!(
"service={:?} hostname='{}' missing from healthy set, corrupted state",
job.service,
job.hostname
);
}
if server.services.is_empty() {
entry.remove_entry();
}
} else {
// FIXME This was an unreachable but it was reached.
log::warn!("missing expected server, corrupted state");
warn!(
"hostname='{}' addr='{}' missing from healthy map, corrupted state",
job.hostname,
job.addr
);
}
}

Expand Down Expand Up @@ -376,9 +398,28 @@ impl DiscoveryManager {
}

pub fn spawn_jobs_thread(manager: Arc<DiscoveryManager>) {
// Two-layer panic hardening:
// 1. catch_unwind prevents a panic in any single iteration from killing this thread.
// 2. All RwLock accesses use unwrap_or_else(|e| e.into_inner()) so that if a panic
// fires while a lock is held (poisoning it), the next iteration recovers the inner
// value and continues rather than dying on the poisoned lock. The recovered state
// may be partially-modified, but for best-effort peer discovery that is acceptable.
// These two defenses are co-dependent: catch_unwind prevents thread death;
// unwrap_or_else prevents the *next* iteration from dying due to the poisoned lock
// left behind by the previous panic.
spawn_thread("discovery-jobs", move || loop {
if let Err(e) = manager.run_health_check() {
debug!("health check failed: {:?}", e);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Err(e) = manager.run_health_check() {
debug!("health check failed: {:?}", e);
}
}));
if let Err(panic) = result {
let msg = panic
.downcast_ref::<&str>()
.copied()
.or_else(|| panic.downcast_ref::<String>().map(String::as_str))
.unwrap_or("unknown payload");
error!("discovery-jobs panicked reason='{}', continuing after delay", msg);
}
// XXX use a dynamic JOB_INTERVAL, adjusted according to the queue size and HEALTH_CHECK_FREQ?
thread::sleep(JOB_INTERVAL);
Expand Down Expand Up @@ -528,7 +569,127 @@ mod tests {

const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4);

fn test_features() -> ServerFeatures {
ServerFeatures {
hosts: serde_json::from_str("{}").unwrap(),
server_version: "test 1.0".into(),
genesis_hash: genesis_hash(Network::Testnet),
protocol_min: PROTOCOL_VERSION,
protocol_max: PROTOCOL_VERSION,
hash_function: "sha256".into(),
pruning: None,
}
}

// Construct a DiscoveryManager with no queue entries and no DNS lookups.
fn test_manager() -> DiscoveryManager {
DiscoveryManager {
our_addrs: HashSet::new(),
our_version: PROTOCOL_VERSION,
our_features: test_features(),
announce: false,
tor_proxy: None,
healthy: Default::default(),
queue: Default::default(),
}
}

fn make_job(ip: &str, service: Service) -> HealthCheck {
HealthCheck {
addr: ServerAddr::Clearnet(ip.parse().unwrap()),
hostname: "peer.example".into(),
service,
is_default: false,
added_by: None,
last_check: None,
last_healthy: None,
consecutive_failures: 0,
}
}

fn insert_healthy(manager: &DiscoveryManager, ip: &str, services: Vec<Service>) {
let addr = ServerAddr::Clearnet(ip.parse().unwrap());
let mut healthy = manager.healthy.write().unwrap();
let server = healthy
.entry(addr)
.or_insert_with(|| Server::new("peer.example".into(), test_features()));
for svc in services {
server.services.insert(svc);
}
}

// The old assert!(server.services.remove(&job.service)) panicked when the service
// was not present in the healthy set. Verify it no longer panics.
#[test]
fn remove_unhealthy_service_missing_service_does_not_panic() {
let manager = test_manager();
insert_healthy(&manager, "1.2.3.4", vec![Service::Tcp(50001)]);
let job = make_job("1.2.3.4", Service::Ssl(50002));
manager.remove_unhealthy_service(&job);
let healthy = manager.healthy.read().unwrap();
let addr = ServerAddr::Clearnet("1.2.3.4".parse().unwrap());
assert!(healthy[&addr].services.contains(&Service::Tcp(50001)));
}

// The else branch was a FIXME "unreachable but it was reached" in production.
// Verify it does not panic.
#[test]
fn remove_unhealthy_service_missing_addr_does_not_panic() {
let manager = test_manager();
let job = make_job("1.2.3.4", Service::Tcp(50001));
manager.remove_unhealthy_service(&job);
}

#[test]
fn remove_unhealthy_service_removes_server_when_last_service_gone() {
let manager = test_manager();
insert_healthy(&manager, "1.2.3.4", vec![Service::Tcp(50001)]);
let job = make_job("1.2.3.4", Service::Tcp(50001));
manager.remove_unhealthy_service(&job);
let healthy = manager.healthy.read().unwrap();
assert!(!healthy.contains_key(&ServerAddr::Clearnet("1.2.3.4".parse().unwrap())));
}

#[test]
fn remove_unhealthy_service_keeps_server_when_other_services_remain() {
let manager = test_manager();
insert_healthy(&manager, "1.2.3.4", vec![Service::Tcp(50001), Service::Ssl(50002)]);
let job = make_job("1.2.3.4", Service::Tcp(50001));
manager.remove_unhealthy_service(&job);
let healthy = manager.healthy.read().unwrap();
let addr = ServerAddr::Clearnet("1.2.3.4".parse().unwrap());
assert!(healthy.contains_key(&addr));
assert!(healthy[&addr].services.contains(&Service::Ssl(50002)));
assert!(!healthy[&addr].services.contains(&Service::Tcp(50001)));
}

#[test]
fn poisoned_healthy_lock_does_not_panic_on_get_servers() {
let manager = Arc::new(test_manager());
let m = Arc::clone(&manager);
let _ = std::thread::spawn(move || {
let _guard = m.healthy.write().unwrap();
panic!("intentional lock poison");
})
.join();
let servers = manager.get_servers();
assert!(servers.is_empty());
}

#[test]
fn poisoned_queue_lock_does_not_panic_on_run_health_check() {
let manager = Arc::new(test_manager());
let m = Arc::clone(&manager);
let _ = std::thread::spawn(move || {
let _guard = m.queue.write().unwrap();
panic!("intentional lock poison");
})
.join();
assert!(manager.run_health_check().is_ok());
}

#[test]
#[ignore = "makes live network connections to testnet Electrum servers"]
fn test() -> Result<()> {
stderrlog::new().verbosity(4).init().unwrap();

Expand Down
Loading