diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index 4c2257f4..90166d46 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -4,7 +4,7 @@ use anyhow::Context; use buffers::ByteBufOwned; use futures::{stream::FuturesUnordered, Stream, StreamExt}; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; -use tracing::debug; +use tracing::{debug, error_span, Instrument}; use crate::{ peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner, @@ -46,6 +46,7 @@ pub async fn read_metainfo_from_peer_receiver + Unp peer_connection_options, BlockingSpawner::new(true), ) + .instrument(error_span!("read_metainfo_from_peer", ?addr)) .await .with_context(|| format!("error reading metainfo from {addr}")); drop(token); diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 2801d3c1..65b4c303 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -19,7 +19,7 @@ use peer_binary_protocol::{ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use tokio::time::timeout; -use tracing::trace; +use tracing::{debug, trace}; use crate::{read_buf::ReadBuf, spawn_utils::BlockingSpawner}; @@ -262,10 +262,12 @@ impl PeerConnection { trace!("sent bitfield"); } + let mut broadcast_closed = false; + loop { let req = loop { break tokio::select! { - r = have_broadcast.recv() => match r { + r = have_broadcast.recv(), if !broadcast_closed => match r { Ok(id) => { if self.handler.should_transmit_have(id) { WriterRequest::Message(MessageOwned::Have(id.get())) @@ -273,7 +275,11 @@ impl PeerConnection { continue } }, - Err(tokio::sync::broadcast::error::RecvError::Closed) => anyhow::bail!("closing writer, broadcast channel closed"), + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + broadcast_closed = true; + debug!("broadcast channel closed, will not poll it anymore"); + continue + }, _ => continue }, r = timeout(keep_alive_interval, outgoing_chan.recv()) => match r { @@ -389,11 +395,11 @@ impl PeerConnection { tokio::select! { r = reader => { - trace!("reader is done, exiting"); + trace!(result=?r, "reader is done, exiting"); r } r = writer => { - trace!("writer is done, exiting"); + trace!(result=?r, "writer is done, exiting"); r } } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index c9ddeae5..242c5139 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1239,7 +1239,7 @@ impl tracker_comms::TorrentStatsProvider for PeerRxTorrentInfo { let mt = match mt { Some(mt) => mt, None => { - warn!(info_hash=?self.info_hash, "can't find torrent in the session"); + trace!(info_hash=?self.info_hash, "can't find torrent in the session, using default stats"); return Default::default(); } };