diff --git a/src/evaluate.rs b/src/evaluate.rs index d6947b0c8..56352b056 100644 --- a/src/evaluate.rs +++ b/src/evaluate.rs @@ -2,6 +2,8 @@ //! Works asynchronously when possible use atomicmin::AtomicMin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; use deflate; use Deadline; use png::PngData; @@ -13,24 +15,40 @@ use png::STD_WINDOW; use rayon::prelude::*; use std::sync::mpsc::*; use std::sync::Arc; -use std::sync::Mutex; use std::thread; +use rayon; + +struct Candidate { + image: PngData, + // compressed size multiplier. Fudge factor to prefer more promising formats. + bias: f32, + // if false, that's baseline file to throw away + is_reduction: bool, + filter: u8, + // first wins tie-breaker + nth: usize, +} /// Collect image versions and pick one that compresses best pub(crate) struct Evaluator { + deadline: Arc, + nth: AtomicUsize, + best_candidate_size: Arc, /// images are sent to the thread for evaluation - eval_send: Option, f32, bool)>>, + eval_send: Option>, // the thread helps evaluate images asynchronously eval_thread: thread::JoinHandle>, } impl Evaluator { pub fn new(deadline: Arc) -> Self { - // queue size ensures we're not using too much memory for pending reductions let (tx, rx) = sync_channel(4); Self { + deadline, + best_candidate_size: Arc::new(AtomicMin::new(None)), + nth: AtomicUsize::new(0), eval_send: Some(tx), - eval_thread: thread::spawn(move || Self::evaluate_images(rx, deadline)), + eval_thread: thread::spawn(move || Self::evaluate_images(rx)), } } @@ -54,60 +72,78 @@ impl Evaluator { } fn try_image_inner(&self, image: Arc, bias: f32, is_reduction: bool) { - self.eval_send.as_ref().expect("not finished yet").send((image, bias, is_reduction)).expect("send") - } - - /// Main loop of evaluation thread - fn evaluate_images(from_channel: Receiver<(Arc, f32, bool)>, deadline: Arc) -> Option { - let best_candidate_size = AtomicMin::new(None); - let best_result: Mutex> = Mutex::new(None); - // ends when sender is dropped - for (nth, (image, bias, is_reduction)) in from_channel.iter().enumerate() { + let nth = self.nth.fetch_add(1, SeqCst); + // These clones are only cheap refcounts + let deadline = self.deadline.clone(); + let best_candidate_size = self.best_candidate_size.clone(); + // sends it off asynchronously for compression, + // but results will be collected via the message queue + let eval_send = self.eval_send.clone(); + rayon::spawn(move || { let filters_iter = STD_FILTERS.par_iter().with_max_len(1); - filters_iter.for_each(|&f| { + // Updating of best result inside the parallel loop would require locks, + // which are dangerous to do in side Rayon's loop. + // Instead, only update (atomic) best size in real time, + // and the best result later without need for locks. + filters_iter.for_each(|&filter| { if deadline.passed() { return; } if let Ok(idat_data) = deflate::deflate( - &image.filter_image(f), + &image.filter_image(filter), STD_COMPRESSION, STD_STRATEGY, STD_WINDOW, &best_candidate_size, &deadline, ) { - let mut res = best_result.lock().unwrap(); - if best_candidate_size.get().map_or(true, |old_best_len| { - let new_len = (idat_data.len() as f64 * bias as f64) as usize; - // a tie-breaker is required to make evaluation deterministic - if let Some(res) = res.as_ref() { - // choose smallest compressed, or if compresses the same, smallest uncompressed, or cheaper filter - let old_img = &res.0.raw; - let new = (new_len, image.data.len(), image.ihdr.bit_depth, f, nth); - let old = (old_best_len, old_img.data.len(), old_img.ihdr.bit_depth, res.1, res.2); - new < old - } else if new_len < old_best_len { - true - } else { - false - } - }) { - best_candidate_size.set_min(idat_data.len()); - *res = if is_reduction { - Some((PngData { - idat_data, - raw: Arc::clone(&image), - }, f, nth)) - } else { - None - }; - } + best_candidate_size.set_min(idat_data.len()); + // the rest is shipped to the evavluation/collection thread + eval_send.as_ref().expect("not finished yet").send(Candidate { + image: PngData { + idat_data, + raw: Arc::clone(&image), + }, + bias, + filter, + is_reduction, + nth, + }).expect("send"); } }); + }); + } + + /// Main loop of evaluation thread + fn evaluate_images(from_channel: Receiver) -> Option { + let mut best_result: Option = None; + // ends when the last sender is dropped + for new in from_channel.iter() { + // a tie-breaker is required to make evaluation deterministic + let is_best = if let Some(ref old) = best_result { + // ordering is important - later file gets to use bias over earlier, but not the other way + // (this way bias=0 replaces, but doesn't forbid later optimizations) + let new_len = (new.image.idat_data.len() as f64 * if new.nth > old.nth {new.bias as f64} else {1.0}) as usize; + let old_len = (old.image.idat_data.len() as f64 * if new.nth < old.nth {old.bias as f64} else {1.0}) as usize; + // choose smallest compressed, or if compresses the same, smallest uncompressed, or cheaper filter + let new = (new_len, new.image.raw.data.len(), new.image.raw.ihdr.bit_depth, new.filter, new.nth); + let old = (old_len, old.image.raw.data.len(), old.image.raw.ihdr.bit_depth, old.filter, old.nth); + // <= instead of < is important, because best_candidate_size has been set already, + // so the current result may be comparing its size with itself + new <= old + } else { + true + }; + if is_best { + best_result = if new.is_reduction { + Some(new) + } else { + None + }; + } } - best_result.into_inner().expect("filters should be done") - .map(|(img, _, _)| img) + best_result.map(|res| res.image) } } diff --git a/src/rayon.rs b/src/rayon.rs index a8e662f81..0a5f1cab1 100644 --- a/src/rayon.rs +++ b/src/rayon.rs @@ -51,3 +51,7 @@ impl ParallelIterator for I { pub fn join(a: impl FnOnce() -> A, b: impl FnOnce() -> B) -> (A, B) { (a(), b()) } + +pub fn spawn(a: impl FnOnce() -> A) -> A { + a() +}