Skip to content
This repository was archived by the owner on Feb 28, 2023. It is now read-only.

rouille -> warp #131

Merged
merged 25 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,599 changes: 669 additions & 930 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ percent-encoding = { version = "2", optional = true }
rand = "0.8"
redis = { version = "0.21", optional = true, default-features = false, features = ["aio", "tokio-comp"] }
regex = "1"
reqwest = { version = "0.11", features = ["json", "blocking"], optional = true }
reqwest = { version = "0.11.7", features = ["json", "native-tls"], optional = true }
retry = "1"
ring = { version = "0.16", optional = true, features = ["std"] }
rusoto_core = { version = "0.47", optional = true }
Expand Down Expand Up @@ -95,15 +95,17 @@ zip = { version = "0.5", default-features = false }
zstd = "0.6"
structopt = "0.3.25"
strum = { version = "0.23.0", features = ["derive"] }
native-tls = "0.2.8"

# dist-server only
crossbeam-utils = { version = "0.8", optional = true }
libmount = { version = "0.1.10", optional = true }
nix = { version = "0.19", optional = true }
rouille = { version = "3", optional = true, default-features = false, features = ["ssl"] }
syslog = { version = "5", optional = true }
void = { version = "1", optional = true }
version-compare = { version = "0.0.11", optional = true }
warp = { version = "0.3.2", optional = true, features = ["tls"] }
thiserror = { version = "1.0.30", optional = true }

# test only
openssl = { version = "0.10", optional = true }
Expand Down Expand Up @@ -145,7 +147,7 @@ unstable = []
# Enables distributed support in the cachepot client
dist-client = ["ar", "flate2", "hyper", "hyperx", "reqwest/stream", "url", "sha2", "tokio/fs"]
# Enables the cachepot-dist binary
dist-server = ["chrono", "crossbeam-utils", "jsonwebtoken", "flate2", "hyperx", "libmount", "nix", "reqwest", "rouille", "sha2", "syslog", "void", "version-compare"]
dist-server = ["chrono", "crossbeam-utils", "jsonwebtoken", "flate2", "hyperx", "libmount", "nix", "reqwest", "sha2", "syslog", "void", "version-compare", "warp", "thiserror"]
# Enables dist tests with external requirements
dist-tests = ["dist-client", "dist-server"]
# Run JWK token crypto against openssl ref impl
Expand Down
12 changes: 6 additions & 6 deletions src/bin/cachepot-dist/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ fn docker_diff(cid: &str) -> Result<String> {
// Force remove the container
fn docker_rm(cid: &str) -> Result<()> {
Command::new("docker")
.args(&["rm", "-f", &cid])
.args(&["rm", "-f", cid])
.check_run()
.context("Failed to force delete container")
}
Expand Down Expand Up @@ -611,11 +611,11 @@ impl DockerBuilder {
fn clean_container(&self, cid: &str) -> Result<()> {
// Clean up any running processes
Command::new("docker")
.args(&["exec", &cid, "/busybox", "kill", "-9", "-1"])
.args(&["exec", cid, "/busybox", "kill", "-9", "-1"])
.check_run()
.context("Failed to run kill on all processes in container")?;

let diff = docker_diff(&cid)?;
let diff = docker_diff(cid)?;
if !diff.is_empty() {
let mut lastpath = None;
for line in diff.split(|c| c == '\n') {
Expand Down Expand Up @@ -650,15 +650,15 @@ impl DockerBuilder {
}
lastpath = Some(changepath);
if let Err(e) = Command::new("docker")
.args(&["exec", &cid, "/busybox", "rm", "-rf", changepath])
.args(&["exec", cid, "/busybox", "rm", "-rf", changepath])
.check_run()
{
// We do a final check anyway, so just continue
warn!("Failed to remove added path in a container: {}", e)
}
}

let newdiff = docker_diff(&cid)?;
let newdiff = docker_diff(cid)?;
// See note about changepath == "/tmp" above
if !newdiff.is_empty() && newdiff != "C /tmp" {
bail!(
Expand Down Expand Up @@ -821,7 +821,7 @@ impl DockerBuilder {
cmd.arg("-e").arg(env);
}
let shell_cmd = "cd \"$1\" && shift && exec \"$@\"";
cmd.args(&[cid, "/busybox", "sh", "-c", &shell_cmd]);
cmd.args(&[cid, "/busybox", "sh", "-c", shell_cmd]);
cmd.arg(&executable);
cmd.arg(cwd);
cmd.arg(executable);
Expand Down
62 changes: 35 additions & 27 deletions src/bin/cachepot-dist/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern crate log;
extern crate serde_derive;

use anyhow::{bail, Context, Error, Result};
use async_trait::async_trait;
use cachepot::config::{
scheduler as scheduler_config, server as server_config, INSECURE_DIST_CLIENT_TOKEN,
};
Expand All @@ -27,7 +28,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use structopt::StructOpt;
use syslog::Facility;
Expand Down Expand Up @@ -93,6 +94,7 @@ struct GenerateJwtHS256ServerToken {
}

#[derive(StructOpt)]
#[allow(clippy::enum_variant_names)]
enum AuthSubcommand {
GenerateSharedToken(GenerateSharedToken),
GenerateJwtHS256Key,
Expand All @@ -101,11 +103,12 @@ enum AuthSubcommand {

// Only supported on x86_64 Linux machines
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn main() {
#[tokio::main]
async fn main() {
init_logging();
std::process::exit({
let cmd = Command::from_args();
match run(cmd) {
match run(cmd).await {
Ok(s) => s,
Err(e) => {
eprintln!("cachepot-dist: error: {}", e);
Expand Down Expand Up @@ -152,10 +155,10 @@ fn create_jwt_server_token(
key: &[u8],
) -> Result<String> {
let key = jwt::EncodingKey::from_secret(key);
jwt::encode(&header, &ServerJwt { server_id }, &key).map_err(Into::into)
jwt::encode(header, &ServerJwt { server_id }, &key).map_err(Into::into)
}
fn dangerous_insecure_extract_jwt_server_token(server_token: &str) -> Option<ServerId> {
jwt::dangerous_insecure_decode::<ServerJwt>(&server_token)
jwt::dangerous_insecure_decode::<ServerJwt>(server_token)
.map(|res| res.claims.server_id)
.ok()
}
Expand All @@ -170,7 +173,7 @@ fn check_jwt_server_token(
.ok()
}

fn run(command: Command) -> Result<i32> {
async fn run(command: Command) -> Result<i32> {
match command {
Command::Auth(AuthSubcommand::GenerateJwtHS256Key) => {
let num_bytes = 256 / 8;
Expand Down Expand Up @@ -203,9 +206,7 @@ fn run(command: Command) -> Result<i32> {
bail!("Could not read config");
}
} else {
secret_key
.expect("missing secret-key in parsed subcommand")
.to_owned()
secret_key.expect("missing secret-key in parsed subcommand")
};

let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)?;
Expand Down Expand Up @@ -251,6 +252,7 @@ fn run(command: Command) -> Result<i32> {
jwks_url,
} => Box::new(
token_check::ValidJWTCheck::new(audience, issuer, &jwks_url)
.await
.context("Failed to create a checker for valid JWTs")?,
),
scheduler_config::ClientAuth::Mozilla { required_groups } => {
Expand All @@ -265,10 +267,10 @@ fn run(command: Command) -> Result<i32> {
scheduler_config::ServerAuth::Insecure => {
warn!("Scheduler starting with DANGEROUSLY_INSECURE server authentication");
let token = INSECURE_DIST_SERVER_TOKEN;
Box::new(move |server_token| check_server_token(server_token, &token))
Arc::new(move |server_token| check_server_token(server_token, token))
}
scheduler_config::ServerAuth::Token { token } => {
Box::new(move |server_token| check_server_token(server_token, &token))
Arc::new(move |server_token| check_server_token(server_token, &token))
}
scheduler_config::ServerAuth::JwtHS256 { secret_key } => {
let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)
Expand All @@ -285,7 +287,7 @@ fn run(command: Command) -> Result<i32> {
sub: None,
algorithms: vec![jwt::Algorithm::HS256],
};
Box::new(move |server_token| {
Arc::new(move |server_token| {
check_jwt_server_token(server_token, &secret_key, &validation)
})
}
Expand All @@ -299,7 +301,7 @@ fn run(command: Command) -> Result<i32> {
check_client_auth,
check_server_auth,
);
void::unreachable(http_scheduler.start()?);
void::unreachable(http_scheduler.start().await?);
}

Command::Server(ServerSubcommand { config, syslog }) => {
Expand Down Expand Up @@ -337,7 +339,7 @@ fn run(command: Command) -> Result<i32> {
let scheduler_auth = match scheduler_auth {
server_config::SchedulerAuth::Insecure => {
warn!("Server starting with DANGEROUSLY_INSECURE scheduler authentication");
create_server_token(server_id, &INSECURE_DIST_SERVER_TOKEN)
create_server_token(server_id, INSECURE_DIST_SERVER_TOKEN)
}
server_config::SchedulerAuth::Token { token } => {
create_server_token(server_id, &token)
Expand Down Expand Up @@ -366,7 +368,7 @@ fn run(command: Command) -> Result<i32> {
server,
)
.context("Failed to create cachepot HTTP server instance")?;
void::unreachable(http_server.start()?)
void::unreachable(http_server.start().await?)
}
}
}
Expand Down Expand Up @@ -464,8 +466,9 @@ impl Scheduler {
}
}

#[async_trait]
impl SchedulerIncoming for Scheduler {
fn handle_alloc_job(
async fn handle_alloc_job(
&self,
requester: &dyn SchedulerOutgoing,
tc: Toolchain,
Expand Down Expand Up @@ -564,6 +567,7 @@ impl SchedulerIncoming for Scheduler {
need_toolchain,
} = requester
.do_assign_job(server_id, job_id, tc, auth.clone())
.await
.with_context(|| {
// LOCKS
let mut servers = self.servers.lock().unwrap();
Expand Down Expand Up @@ -684,7 +688,7 @@ impl SchedulerIncoming for Scheduler {
}
Some(ref mut details) if details.server_nonce != server_nonce => {
for job_id in details.jobs_assigned.iter() {
if jobs.remove(&job_id).is_none() {
if jobs.remove(job_id).is_none() {
warn!(
"Unknown job found when replacing server {}: {}",
server_id.addr(),
Expand Down Expand Up @@ -782,7 +786,7 @@ impl SchedulerIncoming for Scheduler {
pub struct Server {
builder: Box<dyn BuilderIncoming>,
cache: Mutex<TcCache>,
job_toolchains: Mutex<HashMap<JobId, Toolchain>>,
job_toolchains: tokio::sync::Mutex<HashMap<JobId, Toolchain>>,
}

impl Server {
Expand All @@ -796,18 +800,19 @@ impl Server {
Ok(Server {
builder,
cache: Mutex::new(cache),
job_toolchains: Mutex::new(HashMap::new()),
job_toolchains: tokio::sync::Mutex::new(HashMap::new()),
})
}
}

#[async_trait]
impl ServerIncoming for Server {
fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
async fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc);
assert!(self
.job_toolchains
.lock()
.unwrap()
.await
.insert(job_id, tc)
.is_none());
let state = if need_toolchain {
Expand All @@ -821,18 +826,19 @@ impl ServerIncoming for Server {
need_toolchain,
})
}
fn handle_submit_toolchain(
async fn handle_submit_toolchain(
&self,
requester: &dyn ServerOutgoing,
job_id: JobId,
tc_rdr: ToolchainReader,
tc_rdr: ToolchainReader<'_>,
) -> Result<SubmitToolchainResult> {
requester
.do_update_job_state(job_id, JobState::Ready)
.await
.context("Updating job state failed")?;
// TODO: need to lock the toolchain until the container has started
// TODO: can start prepping container
let tc = match self.job_toolchains.lock().unwrap().get(&job_id).cloned() {
let tc = match self.job_toolchains.lock().await.get(&job_id).cloned() {
Some(tc) => tc,
None => return Ok(SubmitToolchainResult::JobNotFound),
};
Expand All @@ -848,18 +854,19 @@ impl ServerIncoming for Server {
.map(|_| SubmitToolchainResult::Success)
.unwrap_or(SubmitToolchainResult::CannotCache))
}
fn handle_run_job(
async fn handle_run_job(
&self,
requester: &dyn ServerOutgoing,
job_id: JobId,
command: CompileCommand,
outputs: Vec<String>,
inputs_rdr: InputsReader,
inputs_rdr: InputsReader<'_>,
) -> Result<RunJobResult> {
requester
.do_update_job_state(job_id, JobState::Started)
.await
.context("Updating job state failed")?;
let tc = self.job_toolchains.lock().unwrap().remove(&job_id);
let tc = self.job_toolchains.lock().await.remove(&job_id);
let res = match tc {
None => Ok(RunJobResult::JobNotFound),
Some(tc) => {
Expand All @@ -877,6 +884,7 @@ impl ServerIncoming for Server {
};
requester
.do_update_job_state(job_id, JobState::Complete)
.await
.context("Updating job state failed")?;
res
}
Expand Down
Loading