Skip to content

Commit

Permalink
client: improve data flow, reduce initial io, guess renamed packages
Browse files Browse the repository at this point in the history
  • Loading branch information
djugei committed May 1, 2024
1 parent 23d865c commit 8c2fc3b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 50 deletions.
2 changes: 2 additions & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ indicatif-log-bridge = "*"
anyhow = "*"
memmap2 = "*"
bytesize = "1.3.0"
strsim = "0.11"
dialoguer = "*"

[[bin]]
name = "deltaclient"
Expand Down
124 changes: 83 additions & 41 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ use log::{debug, error, info};
use parsing::Package;
use reqwest::Url;
use std::{
collections::HashMap,
fs::OpenOptions,
io::{BufRead, Cursor},
path::{Path, PathBuf},
process::{Command, Stdio},
str::FromStr,
time::Duration,
};
use tokio::{io::AsyncWriteExt, task::JoinSet};
use tokio::task::JoinSet;

static PACMAN_CACHE: &str = "/var/cache/pacman/pkg/";

#[derive(Parser, Debug)]
#[command(version, about)]
Expand Down Expand Up @@ -137,37 +140,48 @@ fn upgrade(
multi: MultiProgress,
) -> anyhow::Result<(u64, u64)> {
let upgrades = Command::new("pacman").args(["-Sup"]).output()?.stdout;
let mut packageversions = build_package_versions().expect("io error on local disk");
let mut lines: Vec<_> = upgrades
.lines()
.map(|l| l.unwrap())
.map(|l| l.expect("pacman abborted output???"))
.filter(|l| !l.starts_with("file"))
.inspect(|l| debug!("{}", l))
.filter_map(|line| {
let (_, filename) = line.rsplit_once('/').unwrap();
let pkg = Package::try_from(filename).unwrap();
let name = pkg.get_name();
if blacklist.iter().any(|e| **e == *name) {
info!("{name} is blacklisted, skipping");
return None;
} else {
debug!("{name} is not blacklisted, continuing");
}
if let Some((oldname, oldpath)) = newest_cached(filename).expect("io error on local disk") {
if let Some((oldpkg, oldpath)) = newest_cached(&packageversions, &pkg.get_name()).or_else(|| {
let (alternative, _) = packageversions
.keys()
.map(|name| (name, strsim::levenshtein(name, pkg.get_name())))
.filter(|(_name, sim)| sim <= &2)
.min()?;
let prompt =
format!("could not find cached package for {name}, {alternative} is similar, use that instead?");
if dialoguer::Confirm::new().with_prompt(prompt).interact().unwrap() {
newest_cached(&mut packageversions, &"lol")
} else {
None
}
}) {
// try to find the decompressed size for better progress monitoring
use memmap2::Mmap;
let orig = std::fs::File::open(oldpath).expect("io error on local disk");
let oldfile = std::fs::File::open(oldpath).expect("io error on local disk");
// safety: i promise to not open the same file as writable at the same time
let orig = unsafe { Mmap::map(&orig).expect("mmap failed") };
let oldfile = unsafe { Mmap::map(&oldfile).expect("mmap failed") };
// 16 megabytes seems like an ok average size
// todo: find the actual average size of a decompressed package
let default_size = 16 * 1024 * 1024;
// due to pacman packages being compressed in streaming mode
// zstd does not have an exact decompressed size and heavily overestimates
let dec_size = zstd::bulk::Decompressor::upper_bound(&orig).unwrap_or_else(|| {
debug!("using default size for {oldname}");
let dec_size = zstd::bulk::Decompressor::upper_bound(&oldfile).unwrap_or_else(|| {
debug!("using default size for {name}");
default_size
});
Some((line, oldname, orig, (dec_size as u64)))
Some((pkg, oldpkg, oldfile, (dec_size as u64)))
} else {
info!("no cached package found, leaving {} for pacman", filename);
None
Expand Down Expand Up @@ -201,7 +215,7 @@ fn upgrade(
total_pg.enable_steady_tick(Duration::from_millis(100));

let mut set = JoinSet::new();
for (line, oldname, orig, mut dec_size) in lines {
for (newpkg, oldpkg, oldfile, mut dec_size) in lines {
let client = client.clone();
let maxpar = maxpar.clone();
let maxpar_req = maxpar_req.clone();
Expand All @@ -211,18 +225,15 @@ fn upgrade(
let delta_cache = delta_cache.clone();
let total_pg = total_pg.clone();
set.spawn(async move {
debug!("spawned task for {}", &line);
let (_, filename) = line.rsplit_once('/').unwrap();
debug!("spawned task for {}", newpkg.get_name());
let filefut = async {
total_pg.inc_length(dec_size);

// delta download
let mut file_name = delta_cache.clone();
file_name.push(filename);
file_name.push(format!("{newpkg}"));

let oldpkg = Package::try_from(oldname.as_str())?;
let newpkg = Package::try_from(filename)?;
let delta = parsing::Delta::try_from((oldpkg, newpkg.clone()))?;
let delta = parsing::Delta::try_from((oldpkg.clone(), newpkg.clone()))?;

let mut delta = delta.to_string();
delta.push_str(".delta");
Expand All @@ -231,7 +242,7 @@ fn upgrade(

let mut deltafile = tokio::fs::File::create(deltafile_name.clone()).await?;

let url = format!("{server}/{oldname}/{filename}");
let url = format!("{server}/{oldpkg}/{newpkg}");

let inflight_guard = maxpar_inflight.acquire().await;
let pg = util::do_download(
Expand Down Expand Up @@ -260,9 +271,9 @@ fn upgrade(
pg.finish_and_clear();
multi.remove(&pg);
tokio::task::spawn_blocking(move || -> Result<_, _> {
let orig = Cursor::new(orig);
let orig = zstd::decode_all(orig).unwrap();
apply_patch(&orig, &deltafile_name, &file_name, p_pg)
let oldfile = Cursor::new(oldfile);
let oldfile = zstd::decode_all(oldfile).unwrap();
apply_patch(&oldfile, &deltafile_name, &file_name, p_pg)
})
.await??;
}
Expand All @@ -276,8 +287,9 @@ fn upgrade(
};

let sigfut = async {
/*
let mut sigfile = delta_cache.clone();
sigfile.push(filename);
sigfile.push(format!("{newpkg}"));
sigfile.as_mut_os_string().push(".sig");
let mut sigfile = tokio::fs::File::create(sigfile).await?;
Expand All @@ -288,6 +300,7 @@ fn upgrade(
while let Some(chunk) = res.chunk().await? {
sigfile.write_all(&chunk).await?
}
*/
Ok::<(), anyhow::Error>(())
};

Expand Down Expand Up @@ -331,26 +344,55 @@ impl<T, E> ResultSwap<T, E> for Result<T, E> {
}
}

type Str = Box<str>;
type PackageVersions = HashMap<Str, Vec<(Str, Str, Str, PathBuf)>>;

/// {package -> [(version, arch, trailer, path)]}
fn build_package_versions() -> std::io::Result<PackageVersions> {
let mut package_versions: PackageVersions = HashMap::new();
for line in std::fs::read_dir(PACMAN_CACHE)? {
let line = line?;
if !line.file_type()?.is_file() {
continue;
}
let filename = line.file_name();
let filename = filename.to_string_lossy();
if !filename.ends_with(".zst") {
continue;
}
let path = line.path().into();
let package = Package::try_from(&*filename).expect("non-pkg zstd file in pacman cache dir?");
let (name, version, arch, trailer) = package.destructure();

match package_versions.entry(name) {
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((version, arch, trailer, path));
}
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(version, arch, trailer, path)]);
}
}
}
package_versions.shrink_to_fit();

for e in package_versions.values_mut() {
e.sort()
}

Ok(package_versions)
}

/// returns the newest package in /var/cache/pacman/pkg that
/// fits the passed package and is not identical to it in version
fn newest_cached(filename: &str) -> std::io::Result<Option<(String, PathBuf)>> {
let package = Package::try_from(filename).unwrap();
let mut local: Vec<_> = std::fs::read_dir("/var/cache/pacman/pkg/")?
.map(|e| e.unwrap())
.filter(|e| e.file_type().unwrap().is_file())
.map(|e| (e.file_name().into_string().unwrap(), e.path()))
.filter(|e| e.0.ends_with(".zst"))
.filter(|e| {
let p = Package::try_from(e.0.as_str()).unwrap();
p.get_name() == package.get_name()
/// fits the passed package
fn newest_cached(pv: &PackageVersions, package_name: &str) -> Option<(Package, PathBuf)> {
pv.get(package_name)
.map(|e| e.last().expect("each entry has at least one version"))
.map(|(version, arch, trailer, path)| {
(
Package::from_parts((package_name.into(), version.clone(), arch.clone(), trailer.clone())),
path.clone(),
)
})
.collect();
local.sort();
if local.iter().any(|(name, _)| *name == *filename) {
info!("{} already downloaded", filename);
return Ok(None);
}
Ok(local.pop())
}

#[cfg(feature = "diff")]
Expand Down
30 changes: 21 additions & 9 deletions parsing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use thiserror::Error;

type Str = Box<str>;

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
#[derive(Debug, Eq, PartialOrd, Ord, Hash, Clone)]
pub struct Package {
name: Str,
version: Str,
Expand All @@ -17,6 +17,24 @@ impl Package {
pub fn get_version(&self) -> &str {
&self.version
}
// name, version, arch, trailer
pub fn destructure(self) -> (Str, Str, Str, Str) {
(self.name, self.version, self.arch, self.trailer)
}
pub fn from_parts((name, version, arch, trailer): (Str, Str, Str, Str)) -> Self {
Self {
name,
version,
arch,
trailer,
}
}
}

impl PartialEq for Package {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.version == other.version && self.arch == other.arch
}
}

#[derive(Error, Debug)]
Expand All @@ -39,9 +57,7 @@ impl<'s> TryFrom<&'s str> for Package {
if value.contains('/') {
return Err(PackageParseError::Invalid);
}
let (left, trailer) = value
.rsplit_once('-')
.ok_or("name, version | arch trailer")?;
let (left, trailer) = value.rsplit_once('-').ok_or("name, version | arch trailer")?;
let mut idx = left.rmatch_indices('-');
let _ = idx.next();
let (idx, _) = idx.next().ok_or("name | version")?;
Expand All @@ -60,11 +76,7 @@ impl<'s> TryFrom<&'s str> for Package {

impl std::fmt::Display for Package {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}-{}-{}.{}",
self.name, self.version, self.arch, self.trailer
)
write!(f, "{}-{}-{}.{}", self.name, self.version, self.arch, self.trailer)
}
}

Expand Down

0 comments on commit 8c2fc3b

Please sign in to comment.