Skip to content

Commit

Permalink
Add custom encoding callback
Browse files Browse the repository at this point in the history
Fixes: #59.
Signed-off-by: Otavio Salvador <otavio@ossystems.com.br>
  • Loading branch information
orannge authored and otavio committed Jul 24, 2022
1 parent 2543215 commit 1b610a3
Show file tree
Hide file tree
Showing 16 changed files with 465 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
## [Unreleased] - ReleaseDate

* Avoid failing uncompressing files in case of ARCHIVE_WARN returns [#85]
* Add `_with_encoding` suffix method. [#59]

[#59]: https://github.com/OSSystems/compress-tools-rs/pull/59
[#85]: https://github.com/OSSystems/compress-tools-rs/issues/85

## [0.12.3] - 2022-06-22
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ tempfile = "3.1"
argh = "0.1"
async-std = { version = "1.6.3", features = ["attributes"] }
tokio = { version = "1.0.0", features = ["fs", "net"] }
encoding = "0.2"

[target.'cfg(target_env = "msvc")'.build-dependencies]
vcpkg = "0.2"
Expand Down
63 changes: 62 additions & 1 deletion src/async_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! Generic async support with which you can use you own thread pool by
//! implementing the [`BlockingExecutor`] trait.

use crate::{Ownership, Result, READER_BUFFER_SIZE};
use crate::{DecodeCallback, Ownership, Result, READER_BUFFER_SIZE};
use async_trait::async_trait;
use futures_channel::mpsc::{channel, Receiver, Sender};
use futures_core::FusedStream;
Expand Down Expand Up @@ -149,6 +149,24 @@ where
join.2
}

/// Async version of
/// [`list_archive_files_with_encoding`](crate::
/// list_archive_files_with_encoding).
pub async fn list_archive_files_with_encoding<B, R>(
blocking_executor: B,
source: R,
decode: DecodeCallback,
) -> Result<Vec<String>>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
{
wrap_async_read(blocking_executor, source, move |source| {
crate::list_archive_files_with_encoding(source, decode)
})
.await?
}

/// Async version of [`list_archive_files`](crate::list_archive_files).
pub async fn list_archive_files<B, R>(blocking_executor: B, source: R) -> Result<Vec<String>>
where
Expand All @@ -171,6 +189,27 @@ where
.await?
}

/// Async version of
/// [`uncompress_archive_with_encoding`](crate::
/// uncompress_archive_with_encoding).
pub async fn uncompress_archive_with_encoding<B, R>(
blocking_executor: B,
source: R,
dest: &Path,
ownership: Ownership,
decode: DecodeCallback,
) -> Result<()>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
{
let dest = dest.to_owned();
wrap_async_read(blocking_executor, source, move |source| {
crate::uncompress_archive_with_encoding(source, &dest, ownership, decode)
})
.await?
}

/// Async version of [`uncompress_archive`](crate::uncompress_archive).
pub async fn uncompress_archive<B, R>(
blocking_executor: B,
Expand All @@ -189,6 +228,28 @@ where
.await?
}

/// Async version of
/// [`uncompress_archive_file_with_encoding`](crate::
/// uncompress_archive_file_with_encoding).
pub async fn uncompress_archive_file_with_encoding<B, R, W>(
blocking_executor: B,
source: R,
target: W,
path: &str,
decode: DecodeCallback,
) -> Result<usize>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let path = path.to_owned();
wrap_async_read_and_write(blocking_executor, source, target, move |source, target| {
crate::uncompress_archive_file_with_encoding(source, target, &path, decode)
})
.await?
}

/// Async version of
/// [`uncompress_archive_file`](crate::uncompress_archive_file).
pub async fn uncompress_archive_file<B, R, W>(
Expand Down
7 changes: 6 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::ffi;
use derive_more::{Display, Error, From};
use std::{ffi::CStr, io};
use std::{borrow::Cow, ffi::CStr, io};

pub type Result<T> = std::result::Result<T, Error>;

Expand All @@ -16,6 +16,11 @@ pub enum Error {

Io(io::Error),

Utf(std::str::Utf8Error),

#[display(fmt = "Encoding error: '{}'", _0)]
Encoding(#[error(not(source))] Cow<'static, str>),

#[cfg(feature = "tokio_support")]
JoinError(tokio::task::JoinError),

Expand Down
24 changes: 21 additions & 3 deletions src/ffi/locale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

/// Change from the C to system locale, allowing libarchive to handle filenames
/// in UTF-8. We restrict to change LC_CTYPE only, since libarchive only needs
/// the charset set.
/// the charset set. The timing of locale setting for Unix and Windows is
/// different, handle them separately.
///
/// See on libarchive Website for a more complete description of the issue:
///
/// https://github.com/libarchive/libarchive/issues/587
/// https://github.com/libarchive/libarchive/wiki/Filenames
pub(crate) use inner::UTF8LocaleGuard;
pub(crate) use inner::WindowsUTF8LocaleGuard;

#[cfg(unix)]
mod inner {
Expand All @@ -19,6 +21,8 @@ mod inner {
utf8_locale: libc::locale_t,
}

pub(crate) struct WindowsUTF8LocaleGuard {}

impl UTF8LocaleGuard {
pub(crate) fn new() -> Self {
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -55,6 +59,12 @@ mod inner {
};
}
}

impl WindowsUTF8LocaleGuard {
pub(crate) fn new() -> Self {
Self {}
}
}
}

#[cfg(windows)]
Expand All @@ -64,12 +74,20 @@ mod inner {
}
const _ENABLE_PER_THREAD_LOCALE: std::os::raw::c_int = 1;

pub(crate) struct UTF8LocaleGuard {
pub(crate) struct UTF8LocaleGuard {}

pub(crate) struct WindowsUTF8LocaleGuard {
save: Option<std::ffi::CString>,
save_thread_config: ::std::os::raw::c_int,
}

impl UTF8LocaleGuard {
pub(crate) fn new() -> Self {
Self {}
}
}

impl WindowsUTF8LocaleGuard {
pub(crate) fn new() -> Self {
let locale = b".UTF-8\0";

Expand Down Expand Up @@ -100,7 +118,7 @@ mod inner {
}
}

impl Drop for UTF8LocaleGuard {
impl Drop for WindowsUTF8LocaleGuard {
fn drop(&mut self) {
if let Some(locale) = &self.save {
unsafe { libc::setlocale(libc::LC_CTYPE, locale.as_ptr()) };
Expand Down
2 changes: 1 addition & 1 deletion src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ mod generated;
mod locale;

pub(crate) use crate::ffi::generated::*;
pub(crate) use locale::UTF8LocaleGuard;
pub(crate) use locale::{UTF8LocaleGuard, WindowsUTF8LocaleGuard};
60 changes: 59 additions & 1 deletion src/futures_support.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Async support with a built-in thread pool.

use crate::{async_support, async_support::BlockingExecutor, Ownership, Result};
use crate::{async_support, async_support::BlockingExecutor, DecodeCallback, Ownership, Result};
use async_trait::async_trait;
use futures_io::{AsyncRead, AsyncWrite};
use std::path::Path;
Expand All @@ -20,6 +20,19 @@ impl BlockingExecutor for FuturesBlockingExecutor {

const FUTURES_BLOCKING_EXECUTOR: FuturesBlockingExecutor = FuturesBlockingExecutor {};

/// Async version of
/// [`list_archive_files_with_encoding`](crate::
/// list_archive_files_with_encoding).
pub async fn list_archive_files_with_encoding<R>(
source: R,
decode: DecodeCallback,
) -> Result<Vec<String>>
where
R: AsyncRead + Unpin,
{
async_support::list_archive_files_with_encoding(FUTURES_BLOCKING_EXECUTOR, source, decode).await
}

/// Async version of [`list_archive_files`](crate::list_archive_files).
pub async fn list_archive_files<R>(source: R) -> Result<Vec<String>>
where
Expand All @@ -37,6 +50,28 @@ where
async_support::uncompress_data(FUTURES_BLOCKING_EXECUTOR, source, target).await
}

/// Async version of
/// [`uncompress_archive_with_encoding`](crate::
/// uncompress_archive_with_encoding).
pub async fn uncompress_archive_with_encoding<R>(
source: R,
dest: &Path,
ownership: Ownership,
decode: DecodeCallback,
) -> Result<()>
where
R: AsyncRead + Unpin,
{
async_support::uncompress_archive_with_encoding(
FUTURES_BLOCKING_EXECUTOR,
source,
dest,
ownership,
decode,
)
.await
}

/// Async version of [`uncompress_archive`](crate::uncompress_archive).
pub async fn uncompress_archive<R>(source: R, dest: &Path, ownership: Ownership) -> Result<()>
where
Expand All @@ -45,6 +80,29 @@ where
async_support::uncompress_archive(FUTURES_BLOCKING_EXECUTOR, source, dest, ownership).await
}

/// Async version of
/// [`uncompress_archive_file_with_encoding`](crate::
/// uncompress_archive_file_with_encoding).
pub async fn uncompress_archive_file_with_encoding<R, W>(
source: R,
target: W,
path: &str,
decode: DecodeCallback,
) -> Result<usize>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
async_support::uncompress_archive_file_with_encoding(
FUTURES_BLOCKING_EXECUTOR,
source,
target,
path,
decode,
)
.await
}

/// Async version of
/// [`uncompress_archive_file`](crate::uncompress_archive_file).
pub async fn uncompress_archive_file<R, W>(source: R, target: W, path: &str) -> Result<usize>
Expand Down
57 changes: 51 additions & 6 deletions src/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{

use libc::{c_int, c_void};

use crate::{error::archive_result, ffi, ffi::UTF8LocaleGuard, Error, Result, READER_BUFFER_SIZE};
use crate::{
error::archive_result, ffi, ffi::UTF8LocaleGuard, DecodeCallback, Error, Result,
READER_BUFFER_SIZE,
};

struct HeapReadSeekerPipe<R: Read + Seek> {
reader: R,
Expand Down Expand Up @@ -39,6 +42,7 @@ pub struct ArchiveIterator<R: Read + Seek> {
archive_entry: *mut ffi::archive_entry,
archive_reader: *mut ffi::archive,

decode: DecodeCallback,
in_file: bool,
closed: bool,
error: bool,
Expand Down Expand Up @@ -99,7 +103,8 @@ impl<R: Read + Seek> ArchiveIterator<R> {
///
/// let mut name = String::default();
/// let mut size = 0;
/// let mut iter = ArchiveIterator::from_read(file)?;
/// let decode_utf8 = |bytes: &[u8]| Ok(std::str::from_utf8(bytes)?.to_owned());
/// let mut iter = ArchiveIterator::from_read_with_encoding(file, decode_utf8)?;
///
/// for content in &mut iter {
/// match content {
Expand All @@ -119,7 +124,7 @@ impl<R: Read + Seek> ArchiveIterator<R> {
/// # Ok(())
/// # }
/// ```
pub fn from_read(source: R) -> Result<ArchiveIterator<R>>
pub fn from_read_with_encoding(source: R, decode: DecodeCallback) -> Result<ArchiveIterator<R>>
where
R: Read + Seek + 'static,
{
Expand Down Expand Up @@ -178,6 +183,7 @@ impl<R: Read + Seek> ArchiveIterator<R> {
archive_entry,
archive_reader,

decode,
in_file: false,
closed: false,
error: false,
Expand All @@ -191,6 +197,45 @@ impl<R: Read + Seek> ArchiveIterator<R> {
}
}

/// Iterate over the contents of an archive, streaming the contents of each
/// entry in small chunks.
///
/// ```no_run
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use compress_tools::*;
/// use std::fs::File;
///
/// let file = File::open("tree.tar")?;
///
/// let mut name = String::default();
/// let mut size = 0;
/// let mut iter = ArchiveIterator::from_read(file)?;
///
/// for content in &mut iter {
/// match content {
/// ArchiveContents::StartOfEntry(s) => name = s,
/// ArchiveContents::DataChunk(v) => size += v.len(),
/// ArchiveContents::EndOfEntry => {
/// println!("Entry {} was {} bytes", name, size);
/// size = 0;
/// }
/// ArchiveContents::Err(e) => {
/// Err(e)?;
/// }
/// }
/// }
///
/// iter.close()?;
/// # Ok(())
/// # }
/// ```
pub fn from_read(source: R) -> Result<ArchiveIterator<R>>
where
R: Read + Seek + 'static,
{
ArchiveIterator::from_read_with_encoding(source, crate::decode_utf8)
}

/// Close the iterator, freeing up the associated resources.
///
/// Resources will be freed on drop if this is not called, but any errors
Expand Down Expand Up @@ -222,9 +267,9 @@ impl<R: Read + Seek> ArchiveIterator<R> {
match ffi::archive_read_next_header(self.archive_reader, &mut self.archive_entry) {
ffi::ARCHIVE_EOF => ArchiveContents::EndOfEntry,
ffi::ARCHIVE_OK => {
let file_name = CStr::from_ptr(ffi::archive_entry_pathname(self.archive_entry))
.to_string_lossy()
.into_owned();
let _utf8_guard = ffi::WindowsUTF8LocaleGuard::new();
let cstr = CStr::from_ptr(ffi::archive_entry_pathname(self.archive_entry));
let file_name = (self.decode)(cstr.to_bytes()).unwrap();
ArchiveContents::StartOfEntry(file_name)
}
_ => ArchiveContents::Err(Error::from(self.archive_reader)),
Expand Down
Loading

0 comments on commit 1b610a3

Please sign in to comment.