Skip to content

Commit

Permalink
fixed and ran rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
djugei committed Mar 13, 2024
1 parent 631c7b8 commit 1af8a4c
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 156 deletions.
233 changes: 111 additions & 122 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ enum Commands {

fn main() {
// set up a logger that does not conflict with progress bars
let logger =
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
let logger = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
let multi = MultiProgress::new();

indicatif_log_bridge::LogWrapper::new(multi.clone(), logger)
Expand Down Expand Up @@ -104,10 +103,7 @@ fn main() {
panic!("pacman -Su failed, aborting. Fix errors and try again");
}
}
Commands::Download {
server,
delta_cache,
} => {
Commands::Download { server, delta_cache } => {
std::fs::create_dir_all(&delta_cache).unwrap();
upgrade(server, delta_cache, multi).unwrap()
}
Expand Down Expand Up @@ -135,14 +131,17 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R
let client = reqwest::Client::new();

let total_pg = ProgressBar::new(0)
.with_style(ProgressStyle::with_template("#### {msg} [{wide_bar}] {bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####").unwrap())
.with_message("total progress:")
;
.with_style(
ProgressStyle::with_template(
"#### {msg} [{wide_bar}] {bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####",
)
.unwrap(),
)
.with_message("total progress:");
total_pg.enable_steady_tick(Duration::from_millis(100));
let total_pg = multi.add(total_pg);
total_pg.tick();


let mut set = JoinSet::new();
for line in lines {
let client = client.clone();
Expand All @@ -156,119 +155,124 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R
debug!("spawned thread for {}", &line);
let (_, filename) = line.rsplit_once('/').unwrap();
let filefut = async {
if let Some((oldname, oldpath)) = newest_cached(filename)? {
// try to find the decompressed size for better progress monitoring
use memmap2::Mmap;
let orig = std::fs::File::open(oldpath)?;
// i promise to not open the same file as writable at the same time
let orig = unsafe { Mmap::map(&orig)?};
// 16 megabytes seems like an ok average size
// todo: find the actual average size of a decompressed package
let default_size = 16*1024*1024;
let dec_size = zstd::bulk::Decompressor::upper_bound(&orig)
.unwrap_or_else(|| {
if let Some((oldname, oldpath)) = newest_cached(filename)? {
// try to find the decompressed size for better progress monitoring
use memmap2::Mmap;
let orig = std::fs::File::open(oldpath)?;
// i promise to not open the same file as writable at the same time
let orig = unsafe { Mmap::map(&orig)? };
// 16 megabytes seems like an ok average size
// todo: find the actual average size of a decompressed package
let default_size = 16 * 1024 * 1024;
let dec_size = zstd::bulk::Decompressor::upper_bound(&orig).unwrap_or_else(|| {
debug!("using default size for {oldname}");
default_size
});
total_pg.inc_length(dec_size.try_into().unwrap());

// delta download
let mut file_name = delta_cache.clone();
file_name.push(filename);
let mut deltafile_name = file_name.clone();
deltafile_name.as_mut_os_string().push(".delta");

let mut deltafile = tokio::fs::File::create(deltafile_name.clone())
.await?;

let style = ProgressStyle::with_template(
"{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}").unwrap()
.progress_chars("█▇▆▅▄▃▂▁ ");

let guard = maxpar_req.acquire().await;
let pg = ProgressBar::new(0)
.with_style(style)
.with_prefix(format!("deltadownload {}", filename))
.with_message(": waiting for server, this may take up to a few minutes")
;
let pg = multi.add(pg);
pg.tick();

let mut url: String = server.into();
url.push('/');
url.push_str(&oldname);
url.push('/');
url.push_str(filename);
let mut delta = {
loop {
// catch both client and server timeouts and simply retry
match client.get(&url).send().await {
Ok(d) => { match d.status() {
StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {debug!("retrying request {}", url)}
status if !status.is_success() => panic!("request failed with{}", status),
_=> break d,
total_pg.inc_length(dec_size.try_into().unwrap());

// delta download
let mut file_name = delta_cache.clone();
file_name.push(filename);
let mut deltafile_name = file_name.clone();
deltafile_name.as_mut_os_string().push(".delta");

let mut deltafile = tokio::fs::File::create(deltafile_name.clone()).await?;

let style = ProgressStyle::with_template(
"{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}",
)
.unwrap()
.progress_chars("█▇▆▅▄▃▂▁ ");

let guard = maxpar_req.acquire().await;
let pg = ProgressBar::new(0)
.with_style(style)
.with_prefix(format!("deltadownload {}", filename))
.with_message(": waiting for server, this may take up to a few minutes");
let pg = multi.add(pg);
pg.tick();

let mut url: String = server.into();
url.push('/');
url.push_str(&oldname);
url.push('/');
url.push_str(filename);
let mut delta = {
loop {
// catch both client and server timeouts and simply retry
match client.get(&url).send().await {
Ok(d) => match d.status() {
StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
debug!("timeout; retrying {}", url)
}
status if !status.is_success() => panic!("request failed with {}", status),
_ => break d,
},
Err(e) => {
if !e.is_timeout() {
panic!("{}", e);
}
}
}
Err(e) => if !e.is_timeout(){
panic!("{}", e);
},
}
debug!("timeout; retrying {}", &url);
}
};
std::mem::drop(guard);
pg.set_length(delta.content_length().unwrap_or(0));
pg.set_message("");
pg.tick();

// acquire guard after sending request but before using the body
// so the deltas can get generated on the server as parallel as possible
// but the download does not get fragmented/overwhelmed
let guard = maxpar.acquire().await;
let mut tries = 0;
}
};
std::mem::drop(guard);
pg.set_length(delta.content_length().unwrap_or(0));
pg.set_message("");
pg.tick();

// acquire guard after sending request but before using the body
// so the deltas can get generated on the server as parallel as possible
// but the download does not get fragmented/overwhelmed
let guard = maxpar.acquire().await;
let mut tries = 0;
'retry: loop {
loop {
match delta.chunk().await {
Ok(Some(chunk)) => {
let len = chunk.len();
deltafile.write_all(&chunk).await?;
pg.inc(len.try_into().unwrap());
},
}
Ok(None) => break 'retry,
Err(e) => {
if tries <3 {
if tries < 3 {
deltafile.seek(std::io::SeekFrom::Start(0)).await?;
pg.set_position(0);
debug!("download failed {tries}/3");
tries +=1;
tries += 1;
continue 'retry;
} else {
anyhow::bail!(e);
}
},
}
}
}
}
drop(guard);
info!(
"downloaded {} in {} seconds",
deltafile_name.display(),
pg.elapsed().as_secs_f64()
);

{
let p_pg = ProgressBar::new(0);
let p_pg = multi.insert_after(&pg, p_pg);
pg.finish_and_clear();
multi.remove(&pg);
tokio::task::spawn_blocking(move || -> Result<_, _> {
let orig = Cursor::new(orig);
let orig = zstd::decode_all(orig).unwrap();
apply_patch(&orig, &deltafile_name, &file_name, p_pg)
})
.await??;
}
total_pg.inc(dec_size.try_into().unwrap());
} else {
info!("no cached package found, leaving {} for pacman", filename);
}
drop(guard);
info!("downloaded {} in {} seconds", deltafile_name.display(), pg.elapsed().as_secs_f64());

{
let p_pg = ProgressBar::new(0);
let p_pg = multi.insert_after(&pg, p_pg);
pg.finish_and_clear();
multi.remove(&pg);
tokio::task::spawn_blocking(move || -> Result<_,_> {
let orig = Cursor::new(orig);
let orig = zstd::decode_all(orig).unwrap();
apply_patch(&orig, &deltafile_name, &file_name, p_pg)
})
.await??;
}
total_pg.inc(dec_size.try_into().unwrap());
} else {
info!("no cached package found, leaving {} for pacman", filename);
}
Ok::<(),anyhow::Error>(())
Ok::<(), anyhow::Error>(())
};

let sigfut = async {
Expand All @@ -284,21 +288,20 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R
while let Some(chunk) = res.chunk().await? {
sigfile.write_all(&chunk).await?
}
Ok::<(),anyhow::Error>(())
Ok::<(), anyhow::Error>(())
};

let (f, s) = tokio::join!(filefut, sigfut);
f.context("creating delat file failed")?;
s.context("creating signature file failed")?;
Ok::<(),anyhow::Error>(())
Ok::<(), anyhow::Error>(())
});
}

while let Some(res) = set.join_next().await {
if let Err(e) = res {
error!("{}", e);
error!("if the error is temporary, you can try running the command again");

}
}
});
Expand Down Expand Up @@ -334,11 +337,7 @@ fn gen_delta(orig: &Path, new: &Path, patch: &Path) -> Result<(), std::io::Error
let new = OpenOptions::new().read(true).open(new).unwrap();
let mut new = zstd::Decoder::new(new)?;

let patch = OpenOptions::new()
.write(true)
.create(true)
.open(patch)
.unwrap();
let patch = OpenOptions::new().write(true).create(true).open(patch).unwrap();
let mut patch = zstd::Encoder::new(patch, 22)?;

ddelta::generate_chunked(&mut orig, &mut new, &mut patch, None, |_| {}).unwrap();
Expand All @@ -347,23 +346,13 @@ fn gen_delta(orig: &Path, new: &Path, patch: &Path) -> Result<(), std::io::Error
Ok(())
}

fn apply_patch(
orig: &[u8],
patch: &Path,
new: &Path,
pb: ProgressBar,
) -> Result<(), std::io::Error> {
let style = ProgressStyle::with_template(
"{prefix} {wide_bar} {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}",
)
.unwrap()
.progress_chars("█▇▆▅▄▃▂▁ ");
fn apply_patch(orig: &[u8], patch: &Path, new: &Path, pb: ProgressBar) -> Result<(), std::io::Error> {
let style = ProgressStyle::with_template("{prefix} {wide_bar} {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}")
.unwrap()
.progress_chars("█▇▆▅▄▃▂▁ ");

pb.set_style(style);
pb.set_prefix(format!(
"patching {}",
new.file_name().unwrap().to_string_lossy()
));
pb.set_prefix(format!("patching {}", new.file_name().unwrap().to_string_lossy()));

pb.set_length(orig.len().try_into().unwrap());
let orig = Cursor::new(orig);
Expand Down
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
max_width = 120
Loading

0 comments on commit 1af8a4c

Please sign in to comment.