From f2e74533210aa8f81e20d93b5b356f3491c5328b Mon Sep 17 00:00:00 2001 From: Desuwa Date: Mon, 26 Jul 2021 21:58:28 -0700 Subject: [PATCH] Add an ArchiveIterator type --- CHANGES.md | 2 + src/iterator.rs | 300 ++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + tests/integration_test.rs | 88 +++++++++++ 4 files changed, 392 insertions(+) create mode 100644 src/iterator.rs diff --git a/CHANGES.md b/CHANGES.md index 9e56eea..7127913 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,9 +6,11 @@ * Use "lossy" strings for invalid filenames. [#59] * Fix memory leak when dropping locale guard. [#64] +* Add `ArchiveIterator` type. [#65] [#59]: https://github.com/OSSystems/compress-tools-rs/issues/59 [#64]: https://github.com/OSSystems/compress-tools-rs/issues/64 +[#65]: https://github.com/OSSystems/compress-tools-rs/issues/65 ## [0.11.2] - 2021-05-29 diff --git a/src/iterator.rs b/src/iterator.rs new file mode 100644 index 0000000..7ad56d9 --- /dev/null +++ b/src/iterator.rs @@ -0,0 +1,300 @@ +use std::{ + ffi::{CStr, CString}, + io::{Read, Seek, SeekFrom, Write}, + ops::DerefMut, + slice, +}; + +use libc::{c_int, c_void}; + +use crate::{error::archive_result, ffi, ffi::UTF8LocaleGuard, Error, Result, READER_BUFFER_SIZE}; + +struct HeapReadSeekerPipe { + reader: R, + buffer: [u8; READER_BUFFER_SIZE], +} + +/// The contents of an archive, yielded in order from the beginning to the end +/// of the archive. +/// +/// Each entry, file or directory, will have a +/// [`ArchiveContents::StartOfEntry`], zero or more +/// [`ArchiveContents::DataChunk`], and then a corresponding +/// [`ArchiveContents::EndOfEntry`] to mark that the entry has been read to +/// completion. +pub enum ArchiveContents { + /// Marks the start of an entry, either a file or a directory. + StartOfEntry(String), + /// A chunk of uncompressed data from the entry. Entries may have zero or + /// more chunks. + DataChunk(Vec), + /// Marks the end of the entry that was started by the previous + /// StartOfEntry. + EndOfEntry, + Err(Error), +} + +/// An iterator over the contents of an archive. +pub struct ArchiveIterator { + archive_entry: *mut ffi::archive_entry, + archive_reader: *mut ffi::archive, + + in_file: bool, + closed: bool, + error: bool, + + _pipe: Box>, + _utf8_guard: UTF8LocaleGuard, +} + +impl Iterator for ArchiveIterator { + type Item = ArchiveContents; + + fn next(&mut self) -> Option { + if self.error { + return None; + } + + let next = if self.in_file { + unsafe { self.next_data_chunk() } + } else { + unsafe { self.next_header() } + }; + + match &next { + ArchiveContents::StartOfEntry(_) => { + self.in_file = true; + Some(next) + } + ArchiveContents::DataChunk(_) => Some(next), + ArchiveContents::EndOfEntry if self.in_file => { + self.in_file = false; + Some(next) + } + ArchiveContents::EndOfEntry => None, + ArchiveContents::Err(_) => { + self.error = true; + Some(next) + } + } + } +} + +impl Drop for ArchiveIterator { + fn drop(&mut self) { + drop(self.free()); + } +} + +impl ArchiveIterator { + /// Iterate over the contents of an archive, streaming the contents of each + /// entry in small chunks. + /// + /// ```no_run + /// # fn main() -> Result<(), Box> { + /// use compress_tools::*; + /// use std::fs::File; + /// + /// let file = File::open("tree.tar")?; + /// + /// let mut name = String::default(); + /// let mut size = 0; + /// let mut iter = ArchiveIterator::from_read(file)?; + /// + /// for content in &mut iter { + /// match content { + /// ArchiveContents::StartOfEntry(s) => name = s, + /// ArchiveContents::DataChunk(v) => size += v.len(), + /// ArchiveContents::EndOfEntry => { + /// println!("Entry {} was {} bytes", name, size); + /// size = 0; + /// } + /// ArchiveContents::Err(e) => { + /// Err(e)?; + /// } + /// } + /// } + /// + /// iter.close()?; + /// # Ok(()) + /// # } + /// ``` + pub fn from_read(source: R) -> Result> + where + R: Read + Seek + 'static, + { + let utf8_guard = ffi::UTF8LocaleGuard::new(); + let reader = source; + let buffer = [0; READER_BUFFER_SIZE]; + let mut pipe = Box::new(HeapReadSeekerPipe { reader, buffer }); + + unsafe { + let archive_entry: *mut ffi::archive_entry = std::ptr::null_mut(); + let archive_reader = ffi::archive_read_new(); + + let res = (|| { + archive_result( + ffi::archive_read_support_filter_all(archive_reader), + archive_reader, + )?; + + archive_result( + ffi::archive_read_support_format_raw(archive_reader), + archive_reader, + )?; + + archive_result( + ffi::archive_read_set_seek_callback( + archive_reader, + Some(libarchive_heap_seek_callback::), + ), + archive_reader, + )?; + + if archive_reader.is_null() { + return Err(Error::NullArchive); + } + + archive_result( + ffi::archive_read_support_format_all(archive_reader), + archive_reader, + )?; + + archive_result( + ffi::archive_read_open( + archive_reader, + (pipe.deref_mut() as *mut HeapReadSeekerPipe) as *mut c_void, + None, + Some(libarchive_heap_seekableread_callback::), + None, + ), + archive_reader, + )?; + + Ok(()) + })(); + + let iter = ArchiveIterator { + archive_entry, + archive_reader, + + in_file: false, + closed: false, + error: false, + + _pipe: pipe, + _utf8_guard: utf8_guard, + }; + + res?; + Ok(iter) + } + } + + /// Close the iterator, freeing up the associated resources. + /// + /// Resources will be freed on drop if this is not called, but any errors + /// during freeing on drop will be lost. + pub fn close(mut self) -> Result<()> { + self.free() + } + + fn free(&mut self) -> Result<()> { + if self.closed { + return Ok(()); + } + + self.closed = true; + unsafe { + archive_result( + ffi::archive_read_close(self.archive_reader), + self.archive_reader, + )?; + archive_result( + ffi::archive_read_free(self.archive_reader), + self.archive_reader, + )?; + } + Ok(()) + } + + unsafe fn next_header(&mut self) -> ArchiveContents { + match ffi::archive_read_next_header(self.archive_reader, &mut self.archive_entry) { + ffi::ARCHIVE_EOF => ArchiveContents::EndOfEntry, + ffi::ARCHIVE_OK => { + let file_name = CStr::from_ptr(ffi::archive_entry_pathname(self.archive_entry)) + .to_string_lossy() + .into_owned(); + ArchiveContents::StartOfEntry(file_name) + } + _ => ArchiveContents::Err(Error::from(self.archive_reader)), + } + } + + unsafe fn next_data_chunk(&mut self) -> ArchiveContents { + let mut buffer = std::ptr::null(); + let mut offset = 0; + let mut size = 0; + let mut target = Vec::with_capacity(READER_BUFFER_SIZE); + + match ffi::archive_read_data_block(self.archive_reader, &mut buffer, &mut size, &mut offset) + { + ffi::ARCHIVE_EOF => ArchiveContents::EndOfEntry, + ffi::ARCHIVE_OK => { + let content = slice::from_raw_parts(buffer as *const u8, size); + let write = target.write_all(content); + if let Err(e) = write { + ArchiveContents::Err(e.into()) + } else { + ArchiveContents::DataChunk(target) + } + } + _ => ArchiveContents::Err(Error::from(self.archive_reader)), + } + } +} + +unsafe extern "C" fn libarchive_heap_seek_callback( + _: *mut ffi::archive, + client_data: *mut c_void, + offset: ffi::la_int64_t, + whence: c_int, +) -> i64 { + let pipe = (client_data as *mut HeapReadSeekerPipe) + .as_mut() + .unwrap(); + let whence = match whence { + 0 => SeekFrom::Start(offset as u64), + 1 => SeekFrom::Current(offset), + 2 => SeekFrom::End(offset), + _ => return -1, + }; + + match pipe.reader.seek(whence) { + Ok(offset) => offset as i64, + Err(_) => -1, + } +} + +unsafe extern "C" fn libarchive_heap_seekableread_callback( + archive: *mut ffi::archive, + client_data: *mut c_void, + buffer: *mut *const c_void, +) -> ffi::la_ssize_t { + let pipe = (client_data as *mut HeapReadSeekerPipe) + .as_mut() + .unwrap(); + + *buffer = pipe.buffer.as_ptr() as *const c_void; + + match pipe.reader.read(&mut pipe.buffer) { + Ok(size) => size as ffi::la_ssize_t, + Err(e) => { + let description = CString::new(e.to_string()).unwrap(); + + ffi::archive_set_error(archive, e.raw_os_error().unwrap_or(0), description.as_ptr()); + + -1 + } + } +} diff --git a/src/lib.rs b/src/lib.rs index d5553a9..3f1caec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,12 +55,14 @@ mod error; mod ffi; #[cfg(feature = "futures_support")] pub mod futures_support; +mod iterator; #[cfg(feature = "tokio_support")] pub mod tokio_support; use error::archive_result; pub use error::{Error, Result}; use io::{Seek, SeekFrom}; +pub use iterator::{ArchiveContents, ArchiveIterator}; use std::{ ffi::{CStr, CString}, io::{self, Read, Write}, diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 9ece272..4ed0fa6 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -457,3 +457,91 @@ async fn uncompress_truncated_archive_tokio() { Err(Error::Unknown) )); } + +fn collect_iterate_results(source: std::fs::File) -> Vec<(String, usize)> { + let mut results = Vec::new(); + let mut name = String::default(); + let mut size = 0; + + let mut iter = ArchiveIterator::from_read(source).expect("Failed to get the file"); + + for content in &mut iter { + match content { + ArchiveContents::StartOfEntry(file_name) => { + assert!(name.is_empty()); + assert_eq!(size, 0); + name = file_name; + } + ArchiveContents::DataChunk(data) => { + assert!(!name.is_empty()); + size += data.len(); + } + ArchiveContents::EndOfEntry => { + assert!(!name.is_empty()); + results.push((name, size)); + name = String::default(); + size = 0; + } + ArchiveContents::Err(e) => panic!("{:?}", e), + } + } + + assert!(iter.close().is_ok()); + assert!(name.is_empty()); + assert_eq!(size, 0); + + results +} + +#[test] +fn iterate_tar() { + let source = std::fs::File::open("tests/fixtures/tree.tar").unwrap(); + + let contents = collect_iterate_results(source); + + let expected: Vec<(String, usize)> = vec![ + ("tree/", 0), + ("tree/branch1/", 0), + ("tree/branch1/leaf", 12), + ("tree/branch2/", 0), + ("tree/branch2/leaf", 14), + ] + .into_iter() + .map(|(a, b)| (a.into(), b)) + .collect(); + + assert_eq!(contents, expected); +} + +#[test] +fn iterate_7z() { + let source = std::fs::File::open("tests/fixtures/tree.7z").unwrap(); + + let contents = collect_iterate_results(source); + + let expected: Vec<(String, usize)> = vec![ + ("tree/", 0), + ("tree/branch1/", 0), + ("tree/branch2/", 0), + ("tree/branch1/leaf", 12), + ("tree/branch2/leaf", 14), + ] + .into_iter() + .map(|(a, b)| (a.into(), b)) + .collect(); + + assert_eq!(contents, expected); +} + +#[test] +fn iterate_truncated_archive() { + let source = std::fs::File::open("tests/fixtures/truncated.log.gz").unwrap(); + + for content in ArchiveIterator::from_read(source).unwrap() { + if let ArchiveContents::Err(Error::Unknown) = content { + return; + } + } + + panic!("Did not find expected error"); +}