Skip to content

Commit

Permalink
Revert "fix: potential fix for the docker leaking the errors and such (
Browse files Browse the repository at this point in the history
…#1496)"

This reverts commit f234f89.
  • Loading branch information
dr-bonez authored Jun 7, 2022
1 parent f234f89 commit 8216c05
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 238 deletions.
22 changes: 0 additions & 22 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ unstable = ["patch-db/unstable"]
[dependencies]
aes = { version = "0.7.5", features = ["ctr"] }
async-trait = "0.1.51"
async-stream = "0.3.3"
avahi-sys = { git = "https://github.com/Start9Labs/avahi-sys", version = "0.10.0", branch = "feature/dynamic-linking", features = [
"dynamic",
], optional = true }
Expand Down
264 changes: 49 additions & 215 deletions backend/src/procedure/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ use std::net::Ipv4Addr;
use std::path::PathBuf;
use std::time::Duration;

use async_stream::stream;
use bollard::container::RemoveContainerOptions;
use color_eyre::eyre::eyre;
use color_eyre::Report;
use futures::future::Either as EitherFuture;
use futures::TryStreamExt;
use helpers::NonDetachingJoinHandle;
use nix::sys::signal;
use nix::unistd::Pid;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::VecDeque;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
use tracing::instrument;

use crate::context::RpcContext;
Expand Down Expand Up @@ -177,63 +170,14 @@ impl DockerProcedure {
Done(T),
TimedOut,
}

let io_format = self.io_format;
let output = BufReader::new(
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stout"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
if let Some(format) = io_format {
let buffer = max_buffer(output, None).await?;
return Ok::<Value, Error>(match format.from_reader(&*buffer) {
Ok(a) => a,
Err(e) => {
tracing::warn!(
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
format,
e
);
String::from_utf8(buffer)
.with_kind(crate::ErrorKind::Deserialization)?
.into()
}
});
}

let lines = buf_reader_to_lines(output, 1000).await?;
if lines.is_empty() {
return Ok(Value::Null);
}

let joined_output = lines.join("\n");
Ok(joined_output.into())
}));
let err_output = BufReader::new(
handle
.stderr
.take()
.ok_or_else(|| eyre!("Can't takeout std err"))
.with_kind(crate::ErrorKind::Docker)?,
);

let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
let lines = buf_reader_to_lines(err_output, 1000).await?;
let joined_output = lines.join("\n");
Ok::<_, Error>(joined_output)
}));

let res = tokio::select! {
res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?),
res = handle.wait_with_output() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?),
res = timeout_fut => {
res?;
Race::TimedOut
},
};
let exit_status = match res {
let res = match res {
Race::Done(x) => x,
Race::TimedOut => {
if let Some(id) = id {
Expand All @@ -243,19 +187,32 @@ impl DockerProcedure {
return Ok(Err((143, "Timed out. Retrying soon...".to_owned())));
}
};
Ok(
if exit_status.success() || exit_status.code() == Some(143) {
Ok(
serde_json::from_value(output.await.with_kind(crate::ErrorKind::Unknown)??)
.with_kind(crate::ErrorKind::Deserialization)?,
)
Ok(if res.status.success() || res.status.code() == Some(143) {
Ok(if let Some(format) = self.io_format {
match format.from_slice(&res.stdout) {
Ok(a) => a,
Err(e) => {
tracing::warn!(
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
format,
e
);
serde_json::from_value(String::from_utf8(res.stdout)?.into())
.with_kind(crate::ErrorKind::Deserialization)?
}
}
} else if res.stdout.is_empty() {
serde_json::from_value(Value::Null).with_kind(crate::ErrorKind::Deserialization)?
} else {
Err((
exit_status.code().unwrap_or_default(),
err_output.await.with_kind(crate::ErrorKind::Unknown)??,
))
},
)
serde_json::from_value(String::from_utf8(res.stdout)?.into())
.with_kind(crate::ErrorKind::Deserialization)?
})
} else {
Err((
res.status.code().unwrap_or_default(),
String::from_utf8(res.stderr)?,
))
})
}

#[instrument(skip(ctx, input))]
Expand Down Expand Up @@ -290,69 +247,36 @@ impl DockerProcedure {
.await
.with_kind(crate::ErrorKind::Docker)?;
}

let err_output = BufReader::new(
handle
.stderr
.take()
.ok_or_else(|| eyre!("Can't takeout std err"))
.with_kind(crate::ErrorKind::Docker)?,
);
let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
let lines = buf_reader_to_lines(err_output, 1000).await?;
let joined_output = lines.join("\n");
Ok::<_, Error>(joined_output)
}));

let io_format = self.io_format;
let output = BufReader::new(
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stout"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
if let Some(format) = io_format {
let buffer = max_buffer(output, None).await?;
return Ok::<Value, Error>(match format.from_reader(&*buffer) {
let res = handle
.wait_with_output()
.await
.with_kind(crate::ErrorKind::Docker)?;
Ok(if res.status.success() || res.status.code() == Some(143) {
Ok(if let Some(format) = &self.io_format {
match format.from_slice(&res.stdout) {
Ok(a) => a,
Err(e) => {
tracing::warn!(
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
format,
e
);
String::from_utf8(buffer)
serde_json::from_value(String::from_utf8(res.stdout)?.into())
.with_kind(crate::ErrorKind::Deserialization)?
.into()
}
});
}

let lines = buf_reader_to_lines(output, 1000).await?;
if lines.is_empty() {
return Ok(Value::Null);
}

let joined_output = lines.join("\n");
Ok(joined_output.into())
}));

let exit_status = handle.wait().await.with_kind(crate::ErrorKind::Docker)?;
Ok(
if exit_status.success() || exit_status.code() == Some(143) {
Ok(
serde_json::from_value(output.await.with_kind(crate::ErrorKind::Unknown)??)
.with_kind(crate::ErrorKind::Deserialization)?,
)
}
} else if res.stdout.is_empty() {
serde_json::from_value(Value::Null).with_kind(crate::ErrorKind::Deserialization)?
} else {
Err((
exit_status.code().unwrap_or_default(),
err_output.await.with_kind(crate::ErrorKind::Unknown)??,
))
},
)
serde_json::from_value(String::from_utf8(res.stdout)?.into())
.with_kind(crate::ErrorKind::Deserialization)?
})
} else {
Err((
res.status.code().unwrap_or_default(),
String::from_utf8(res.stderr)?,
))
})
}

pub fn container_name(pkg_id: &PackageId, name: Option<&str>) -> String {
Expand All @@ -364,9 +288,9 @@ impl DockerProcedure {
}

pub fn uncontainer_name(name: &str) -> Option<(PackageId<&str>, Option<&str>)> {
let (pre_tld, _) = name.split_once('.')?;
let (pre_tld, _) = name.split_once(".")?;
if pre_tld.contains('_') {
let (pkg, name) = name.split_once('_')?;
let (pkg, name) = name.split_once("_")?;
Some((Id::try_from(pkg).ok()?.into(), Some(name)))
} else {
Some((Id::try_from(pre_tld).ok()?.into(), None))
Expand Down Expand Up @@ -432,93 +356,3 @@ impl DockerProcedure {
res
}
}

struct RingVec<T> {
value: VecDeque<T>,
capacity: usize,
}
impl<T> RingVec<T> {
fn new(capacity: usize) -> Self {
RingVec {
value: VecDeque::with_capacity(capacity),
capacity,
}
}
fn push(&mut self, item: T) -> Option<T> {
let popped_item = if self.value.len() == self.capacity {
self.value.pop_front()
} else {
None
};
self.value.push_back(item);
popped_item
}
}

async fn buf_reader_to_lines(
reader: impl AsyncBufRead + Unpin,
limit: impl Into<Option<usize>>,
) -> Result<Vec<String>, Error> {
let lines = stream! {
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
yield Ok::<_, Report>(line);
}
};
let output: RingVec<String> = lines
.try_fold(
RingVec::new(limit.into().unwrap_or(1000)),
|mut acc, line| async move {
acc.push(line);
Ok(acc)
},
)
.await
.with_kind(crate::ErrorKind::Unknown)?;
let output: Vec<String> = output.value.into_iter().collect();
Ok(output)
}

async fn max_buffer(
reader: impl AsyncBufRead + Unpin,
max_items: impl Into<Option<usize>>,
) -> Result<Vec<u8>, Error> {
let mut buffer = Vec::new();

let mut lines = reader.lines();
let max_items = max_items.into().unwrap_or(10_000_000);
while let Some(line) = lines.next_line().await? {
let mut line = line.into_bytes();
buffer.append(&mut line);
if buffer.len() >= max_items {
return Err(Error::new(
color_eyre::eyre::eyre!("Reading the buffer exceeding limits of {}", max_items),
crate::ErrorKind::Unknown,
));
}
}
Ok(buffer)
}

#[cfg(test)]
mod tests {
use super::*;
/// Note, this size doesn't mean the vec will match. The vec will go to the next size, 0 -> 7 = 7 and so forth 7-15 = 15
/// Just how the vec with capacity works.
const CAPACITY_IN: usize = 7;
#[test]
fn default_capacity_is_set() {
let ring: RingVec<usize> = RingVec::new(CAPACITY_IN);
assert_eq!(CAPACITY_IN, ring.value.capacity());
assert_eq!(0, ring.value.len());
}
#[test]
fn capacity_can_not_be_exceeded() {
let mut ring = RingVec::new(CAPACITY_IN);
for i in 1..100usize {
ring.push(i);
}
assert_eq!(CAPACITY_IN, ring.value.capacity());
assert_eq!(CAPACITY_IN, ring.value.len());
}
}

0 comments on commit 8216c05

Please sign in to comment.