Skip to content

Commit

Permalink
Add delays to network retries.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehuss committed Mar 23, 2023
1 parent 61d3819 commit db653f2
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 88 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ os_info = "3.5.0"
pasetors = { version = "0.6.4", features = ["v3", "paserk", "std", "serde"] }
pathdiff = "0.2"
pretty_env_logger = { version = "0.4", optional = true }
rand = "0.8.5"
rustfix = "0.6.0"
semver = { version = "1.0.3", features = ["serde"] }
serde = { version = "1.0.123", features = ["derive"] }
Expand Down
118 changes: 69 additions & 49 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::ops;
use crate::util::config::PackageCacheLock;
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::interning::InternedString;
use crate::util::network::retry::Retry;
use crate::util::network::retry::{Retry, RetryResult};
use crate::util::network::sleep::SleepTracker;
use crate::util::{self, internal, Config, Progress, ProgressStyle};

pub const MANIFEST_PREAMBLE: &str = "\
Expand Down Expand Up @@ -319,6 +320,8 @@ pub struct Downloads<'a, 'cfg> {
/// Set of packages currently being downloaded. This should stay in sync
/// with `pending`.
pending_ids: HashSet<PackageId>,
/// Downloads that have failed and are waiting to retry again later.
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
/// The final result of each download. A pair `(token, result)`. This is a
/// temporary holding area, needed because curl can report multiple
/// downloads at once, but the main loop (`wait`) is written to only
Expand Down Expand Up @@ -442,6 +445,7 @@ impl<'cfg> PackageSet<'cfg> {
next: 0,
pending: HashMap::new(),
pending_ids: HashSet::new(),
sleeping: SleepTracker::new(),
results: Vec::new(),
progress: RefCell::new(Some(Progress::with_style(
"Downloading",
Expand Down Expand Up @@ -800,7 +804,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {

/// Returns the number of crates that are still downloading.
pub fn remaining(&self) -> usize {
self.pending.len()
self.pending.len() + self.sleeping.len()
}

/// Blocks the current thread waiting for a package to finish downloading.
Expand Down Expand Up @@ -831,51 +835,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let ret = {
let timed_out = &dl.timed_out;
let url = &dl.url;
dl.retry
.r#try(|| {
if let Err(e) = result {
// If this error is "aborted by callback" then that's
// probably because our progress callback aborted due to
// a timeout. We'll find out by looking at the
// `timed_out` field, looking for a descriptive message.
// If one is found we switch the error code (to ensure
// it's flagged as spurious) and then attach our extra
// information to the error.
if !e.is_aborted_by_callback() {
return Err(e.into());
}
dl.retry.r#try(|| {
if let Err(e) = result {
// If this error is "aborted by callback" then that's
// probably because our progress callback aborted due to
// a timeout. We'll find out by looking at the
// `timed_out` field, looking for a descriptive message.
// If one is found we switch the error code (to ensure
// it's flagged as spurious) and then attach our extra
// information to the error.
if !e.is_aborted_by_callback() {
return Err(e.into());
}

return Err(match timed_out.replace(None) {
Some(msg) => {
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
let mut err = curl::Error::new(code);
err.set_extra(msg);
err
}
None => e,
return Err(match timed_out.replace(None) {
Some(msg) => {
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
let mut err = curl::Error::new(code);
err.set_extra(msg);
err
}
.into());
None => e,
}
.into());
}

let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNotSuccessful {
code,
url: url.to_string(),
body: data,
}
.into());
let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNotSuccessful {
code,
url: url.to_string(),
body: data,
}
Ok(data)
})
.with_context(|| format!("failed to download from `{}`", dl.url))?
.into());
}
Ok(data)
})
};
match ret {
Some(data) => break (dl, data),
None => {
self.pending_ids.insert(dl.id);
self.enqueue(dl, handle)?
RetryResult::Success(data) => break (dl, data),
RetryResult::Err(e) => {
return Err(e.context(format!("failed to download from `{}`", dl.url)))
}
RetryResult::Retry(sleep) => {
debug!("download retry {} for {sleep}ms", dl.url);
self.sleeping.push(sleep, (dl, handle));
}
}
};
Expand Down Expand Up @@ -963,6 +968,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// actually block waiting for I/O to happen, which we achieve with the
// `wait` method on `multi`.
loop {
self.add_sleepers()?;
let n = tls::set(self, || {
self.set
.multi
Expand All @@ -985,17 +991,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
if let Some(pair) = results.pop() {
break Ok(pair);
}
assert!(!self.pending.is_empty());
let min_timeout = Duration::new(1, 0);
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
let timeout = timeout.min(min_timeout);
self.set
.multi
.wait(&mut [], timeout)
.with_context(|| "failed to wait on curl `Multi`")?;
assert_ne!(self.remaining(), 0);
if self.pending.is_empty() {
let delay = self.sleeping.time_to_next().unwrap();
debug!("sleeping main thread for {delay:?}");
std::thread::sleep(delay);
} else {
let min_timeout = Duration::new(1, 0);
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
let timeout = timeout.min(min_timeout);
self.set
.multi
.wait(&mut [], timeout)
.with_context(|| "failed to wait on curl `Multi`")?;
}
}
}

fn add_sleepers(&mut self) -> CargoResult<()> {
for (dl, handle) in self.sleeping.to_retry() {
self.pending_ids.insert(dl.id);
self.enqueue(dl, handle)?;
}
Ok(())
}

fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
let dl = &self.pending[&token].0;
dl.total.set(total);
Expand Down Expand Up @@ -1061,7 +1081,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
return Ok(());
}
}
let pending = self.pending.len();
let pending = self.remaining();
let mut msg = if pending == 1 {
format!("{} crate", pending)
} else {
Expand Down
69 changes: 49 additions & 20 deletions src/cargo/sources/registry/http_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use crate::sources::registry::download;
use crate::sources::registry::MaybeLock;
use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::network::retry::Retry;
use crate::util::network::retry::{Retry, RetryResult};
use crate::util::network::sleep::SleepTracker;
use crate::util::{auth, Config, Filesystem, IntoUrl, Progress, ProgressStyle};
use anyhow::Context;
use cargo_util::paths;
use curl::easy::{HttpVersion, List};
use curl::easy::{Easy, HttpVersion, List};
use curl::multi::{EasyHandle, Multi};
use log::{debug, trace, warn};
use std::cell::RefCell;
Expand Down Expand Up @@ -103,6 +104,8 @@ struct Downloads<'cfg> {
/// Set of paths currently being downloaded.
/// This should stay in sync with `pending`.
pending_paths: HashSet<PathBuf>,
/// Downloads that have failed and are waiting to retry again later.
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
/// The final result of each download.
results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
/// The next ID to use for creating a token (see `Download::token`).
Expand Down Expand Up @@ -184,6 +187,7 @@ impl<'cfg> HttpRegistry<'cfg> {
next: 0,
pending: HashMap::new(),
pending_paths: HashSet::new(),
sleeping: SleepTracker::new(),
results: HashMap::new(),
progress: RefCell::new(Some(Progress::with_style(
"Fetch",
Expand Down Expand Up @@ -265,6 +269,7 @@ impl<'cfg> HttpRegistry<'cfg> {
};
for (token, result) in results {
let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
assert!(self.downloads.pending_paths.remove(&download.path));
let mut handle = self.multi.remove(handle)?;
let data = download.data.take();
let url = self.full_url(&download.path);
Expand All @@ -289,21 +294,19 @@ impl<'cfg> HttpRegistry<'cfg> {
};
Ok((data, code))
}) {
Ok(Some((data, code))) => Ok(CompletedDownload {
RetryResult::Success((data, code)) => Ok(CompletedDownload {
response_code: code,
data,
header_map: download.header_map.take(),
}),
Ok(None) => {
// retry the operation
let handle = self.multi.add(handle)?;
self.downloads.pending.insert(token, (download, handle));
RetryResult::Err(e) => Err(e),
RetryResult::Retry(sleep) => {
debug!("download retry {:?} for {sleep}ms", download.path);
self.downloads.sleeping.push(sleep, (download, handle));
continue;
}
Err(e) => Err(e),
};

assert!(self.downloads.pending_paths.remove(&download.path));
self.downloads.results.insert(download.path, result);
self.downloads.downloads_finished += 1;
}
Expand Down Expand Up @@ -395,6 +398,25 @@ impl<'cfg> HttpRegistry<'cfg> {
))),
}
}

fn add_sleepers(&mut self) -> CargoResult<()> {
for (dl, handle) in self.downloads.sleeping.to_retry() {
let mut handle = self.multi.add(handle)?;
handle.set_token(dl.token)?;
assert!(
self.downloads.pending_paths.insert(dl.path.to_path_buf()),
"path queued for download more than once"
);
assert!(
self.downloads
.pending
.insert(dl.token, (dl, handle))
.is_none(),
"dl token queued more than once"
);
}
Ok(())
}
}

impl<'cfg> RegistryData for HttpRegistry<'cfg> {
Expand Down Expand Up @@ -730,6 +752,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {

loop {
self.handle_completed_downloads()?;
self.add_sleepers()?;

let remaining_in_multi = tls::set(&self.downloads, || {
self.multi
Expand All @@ -738,19 +761,25 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
})?;
trace!("{} transfers remaining", remaining_in_multi);

if remaining_in_multi == 0 {
if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 {
return Ok(());
}

// We have no more replies to provide the caller with,
// so we need to wait until cURL has something new for us.
let timeout = self
.multi
.get_timeout()?
.unwrap_or_else(|| Duration::new(1, 0));
self.multi
.wait(&mut [], timeout)
.with_context(|| "failed to wait on curl `Multi`")?;
if self.downloads.pending.is_empty() {
let delay = self.downloads.sleeping.time_to_next().unwrap();
debug!("sleeping main thread for {delay:?}");
std::thread::sleep(delay);
} else {
// We have no more replies to provide the caller with,
// so we need to wait until cURL has something new for us.
let timeout = self
.multi
.get_timeout()?
.unwrap_or_else(|| Duration::new(1, 0));
self.multi
.wait(&mut [], timeout)
.with_context(|| "failed to wait on curl `Multi`")?;
}
}
}
}
Expand Down Expand Up @@ -779,7 +808,7 @@ impl<'cfg> Downloads<'cfg> {
&format!(
" {} complete; {} pending",
self.downloads_finished,
self.pending.len()
self.pending.len() + self.sleeping.len()
),
)
}
Expand Down
1 change: 1 addition & 0 deletions src/cargo/util/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::task::Poll;

pub mod retry;
pub mod sleep;

pub trait PollExt<T> {
fn expect(self, msg: &str) -> T;
Expand Down
Loading

0 comments on commit db653f2

Please sign in to comment.