Skip to content

Commit

Permalink
a bit of error context
Browse files Browse the repository at this point in the history
  • Loading branch information
phiresky committed Feb 21, 2023
1 parent 78caa43 commit eef7c2b
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 17 deletions.
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ version = "1.0.0-alpha.2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.32"
anyhow = {version = "1.0.32", features = ["backtrace"]}
async-compression = {version = "0.3.15", features = ["all", "all-algorithms", "tokio"]}
async-stream = "0.3.3"
async-trait = "0.1.64"
Expand Down
14 changes: 6 additions & 8 deletions src/adapters/custom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,13 @@ pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> anyhow:
}
}

fn proc_wait(mut child: Child) -> impl AsyncRead {
fn proc_wait(mut child: Child, context: impl FnOnce() -> String) -> impl AsyncRead {
let s = stream! {
let res = child.wait().await?;
if res.success() {
yield std::io::Result::Ok(Bytes::new());
} else {
yield std::io::Result::Err(to_io_err(
format_err!("subprocess failed: {:?}", res),
));
Err(format_err!("{:?}", res)).with_context(context).map_err(to_io_err)?;
}
};
StreamReader::new(s)
Expand All @@ -164,6 +162,7 @@ pub fn pipe_output(
exe_name: &str,
help: &str,
) -> Result<ReadBox> {
let cmd_log = format!("{:?}", cmd); // todo: perf
let mut cmd = cmd
.stdin(Stdio::piped())
.stdout(Stdio::piped())
Expand All @@ -177,10 +176,9 @@ pub fn pipe_output(
tokio::io::copy(&mut z, &mut stdi).await?;
std::io::Result::Ok(())
});

Ok(Box::pin(
stdo.chain(proc_wait(cmd).chain(join_handle_to_stream(join))),
))
Ok(Box::pin(stdo.chain(
proc_wait(cmd, move || format!("subprocess: {cmd_log}")).chain(join_handle_to_stream(join)),
)))
}

pub struct CustomSpawningFileAdapter {
Expand Down
14 changes: 9 additions & 5 deletions src/adapters/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ fn synchronous_dump_sqlite(ai: AdaptInfo, mut s: impl Write) -> Result<()> {
return Ok(());
}
let inp_fname = filepath_hint;

let conn = Connection::open_with_flags(inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
let conn = Connection::open_with_flags(&inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)
.with_context(|| format!("opening sqlite connection to {}", inp_fname.display()))?;
let tables: Vec<String> = conn
.prepare("select name from sqlite_master where type='table'")?
.query_map([], |r| r.get::<_, String>(0))?
.prepare("select name from sqlite_master where type='table'")
.context("while preparing query")?
.query_map([], |r| r.get::<_, String>(0))
.context("while executing query")?
.filter_map(|e| e.ok())
.collect();
debug!("db has {} tables", tables.len());
Expand Down Expand Up @@ -121,7 +123,9 @@ impl WritingFileAdapter for SqliteAdapter {
oup: Pin<Box<dyn AsyncWrite + Send>>,
) -> Result<()> {
let oup_sync = SyncIoBridge::new(oup);
tokio::task::spawn_blocking(|| synchronous_dump_sqlite(ai, oup_sync)).await??;
tokio::task::spawn_blocking(|| synchronous_dump_sqlite(ai, oup_sync))
.await?
.context("in synchronous sqlite task")?;
Ok(())
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/adapters/writing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::pin::Pin;
use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err};

use super::{AdaptInfo, FileAdapter, GetMetadata};
use anyhow::Result;
use anyhow::{Context, Result};
use async_trait::async_trait;
use tokio::io::{AsyncReadExt, AsyncWrite};

Expand Down Expand Up @@ -51,6 +51,7 @@ where
a: super::AdaptInfo,
detection_reason: &crate::matching::FileMatcher,
) -> Result<crate::adapted_iter::AdaptedFilesIterBox> {
let name = self.metadata().name.clone();
let (w, r) = tokio::io::duplex(128 * 1024);
let d2 = detection_reason.clone();
let archive_recursion_depth = a.archive_recursion_depth + 1;
Expand All @@ -60,7 +61,10 @@ where
let config = a.config.clone();
let joiner = tokio::spawn(async move {
let x = d2;
T::adapt_write(a, &x, Box::pin(w)).await.map_err(to_io_err)
T::adapt_write(a, &x, Box::pin(w))
.await
.with_context(|| format!("in {}.adapt_write", name))
.map_err(to_io_err)
});

Ok(one_file(AdaptInfo {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/rga-preproc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() -> anyhow::Result<()> {
// happens if e.g. ripgrep detects binary data in the pipe so it cancels reading
debug!("output cancelled (broken pipe)");
} else {
Err(e).context("copying adapter output to stdout {}")?;
Err(e).context("copying adapter output to stdout")?;
}
}
debug!("running adapter took {} total", print_dur(start));
Expand Down

0 comments on commit eef7c2b

Please sign in to comment.