Skip to content
This repository was archived by the owner on Feb 28, 2023. It is now read-only.

Commit 05a9ca1

Browse files
committed
fix tests
1 parent da34528 commit 05a9ca1

File tree

8 files changed

+91
-90
lines changed

8 files changed

+91
-90
lines changed

src/bin/cachepot-dist/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ impl DockerBuilder {
858858
}
859859

860860
impl BuilderIncoming for DockerBuilder {
861-
// From Server
861+
// From Worker
862862
fn run_build(
863863
&self,
864864
tc: Toolchain,

src/bin/cachepot-dist/main.rs

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@ extern crate serde_derive;
88
use anyhow::{bail, Context, Error, Result};
99
use async_trait::async_trait;
1010
use cachepot::config::{
11-
coordinator as coordinator_config, scheduler as scheduler_config, WorkerUrl,
12-
INSECURE_DIST_CLIENT_TOKEN,
11+
scheduler as scheduler_config, worker, WorkerUrl, INSECURE_DIST_CLIENT_TOKEN,
1312
};
1413
use cachepot::dist::{
1514
self, AllocJobResult, AssignJobResult, BuilderIncoming, CompileCommand, CoordinatorIncoming,
16-
CoordinatorOutgoing, HeartbeatServerResult, InputsReader, JobAlloc, JobAuthorizer, JobComplete,
15+
CoordinatorOutgoing, HeartbeatWorkerResult, InputsReader, JobAlloc, JobAuthorizer, JobComplete,
1716
JobId, JobState, RunJobResult, SchedulerIncoming, SchedulerOutgoing, SchedulerStatusResult,
18-
ServerNonce, SubmitToolchainResult, TcCache, Toolchain, ToolchainReader, UpdateJobStateResult,
17+
SubmitToolchainResult, TcCache, Toolchain, ToolchainReader, UpdateJobStateResult, WorkerNonce,
1918
};
2019
use cachepot::util::daemonize;
2120
use jsonwebtoken as jwt;
@@ -41,7 +40,7 @@ pub const INSECURE_DIST_WORKER_TOKEN: &str = "dangerously_insecure_server";
4140
enum Command {
4241
Auth(AuthSubcommand),
4342
Scheduler(SchedulerSubcommand),
44-
Server(ServerSubcommand),
43+
Worker(WorkerSubcommand),
4544
}
4645

4746
#[derive(StructOpt)]
@@ -58,7 +57,7 @@ struct SchedulerSubcommand {
5857

5958
#[derive(StructOpt)]
6059
#[structopt(rename_all = "kebab-case")]
61-
struct ServerSubcommand {
60+
struct WorkerSubcommand {
6261
/// Use the server config file at PATH
6362
#[structopt(long, value_name = "PATH")]
6463
config: PathBuf,
@@ -303,15 +302,15 @@ async fn run(command: Command) -> Result<i32> {
303302
void::unreachable(http_scheduler.start().await?);
304303
}
305304

306-
Command::Server(ServerSubcommand { config, syslog }) => {
307-
let coordinator_config::Config {
305+
Command::Worker(WorkerSubcommand { config, syslog }) => {
306+
let worker::Config {
308307
builder,
309308
cache_dir,
310309
public_addr,
311310
scheduler_url,
312311
scheduler_auth,
313312
toolchain_cache_size,
314-
} = if let Some(config) = coordinator_config::from_path(&config)? {
313+
} = if let Some(config) = worker::from_path(&config)? {
315314
config
316315
} else {
317316
bail!("Could not load config!");
@@ -322,10 +321,10 @@ async fn run(command: Command) -> Result<i32> {
322321
}
323322

324323
let builder: Box<dyn dist::BuilderIncoming> = match builder {
325-
coordinator_config::BuilderType::Docker => {
324+
worker::BuilderType::Docker => {
326325
Box::new(build::DockerBuilder::new().context("Docker builder failed to start")?)
327326
}
328-
coordinator_config::BuilderType::Overlay {
327+
worker::BuilderType::Overlay {
329328
bwrap_path,
330329
build_dir,
331330
} => Box::new(
@@ -336,14 +335,12 @@ async fn run(command: Command) -> Result<i32> {
336335

337336
let server_id = public_addr.clone();
338337
let scheduler_auth = match scheduler_auth {
339-
coordinator_config::SchedulerAuth::Insecure => {
338+
worker::SchedulerAuth::Insecure => {
340339
warn!("Server starting with DANGEROUSLY_INSECURE scheduler authentication");
341340
create_server_token(server_id, INSECURE_DIST_WORKER_TOKEN)
342341
}
343-
coordinator_config::SchedulerAuth::Token { token } => {
344-
create_server_token(server_id, &token)
345-
}
346-
coordinator_config::SchedulerAuth::JwtToken { token } => {
342+
worker::SchedulerAuth::Token { token } => create_server_token(server_id, &token),
343+
worker::SchedulerAuth::JwtToken { token } => {
347344
let token_server_id: WorkerUrl =
348345
dangerous_insecure_extract_jwt_server_token(&token)
349346
.context("Could not decode scheduler auth jwt")?;
@@ -358,9 +355,9 @@ async fn run(command: Command) -> Result<i32> {
358355
}
359356
};
360357

361-
let server = Server::new(builder, &cache_dir, toolchain_cache_size)
358+
let server = Worker::new(builder, &cache_dir, toolchain_cache_size)
362359
.context("Failed to create cachepot server instance")?;
363-
let http_server = dist::http::Server::new(
360+
let http_server = dist::http::Worker::new(
364361
public_addr.0.to_url().clone(),
365362
scheduler_url.to_url().clone(),
366363
scheduler_auth,
@@ -400,18 +397,18 @@ pub struct Scheduler {
400397
// Currently running jobs, can never be Complete
401398
jobs: Mutex<BTreeMap<JobId, JobDetail>>,
402399

403-
servers: Mutex<HashMap<WorkerUrl, ServerDetails>>,
400+
servers: Mutex<HashMap<WorkerUrl, WorkerDetails>>,
404401
}
405402

406-
struct ServerDetails {
403+
struct WorkerDetails {
407404
jobs_assigned: HashSet<JobId>,
408405
// Jobs assigned that haven't seen a state change. Can only be pending
409406
// or ready.
410407
jobs_unclaimed: HashMap<JobId, Instant>,
411408
last_seen: Instant,
412409
last_error: Option<Instant>,
413410
num_cpus: usize,
414-
server_nonce: ServerNonce,
411+
server_nonce: WorkerNonce,
415412
job_authorizer: Box<dyn JobAuthorizer>,
416413
}
417414

@@ -426,7 +423,7 @@ impl Scheduler {
426423

427424
fn prune_workers(
428425
&self,
429-
servers: &mut MutexGuard<HashMap<WorkerUrl, ServerDetails>>,
426+
servers: &mut MutexGuard<HashMap<WorkerUrl, WorkerDetails>>,
430427
jobs: &mut MutexGuard<BTreeMap<JobId, JobDetail>>,
431428
) {
432429
let now = Instant::now();
@@ -492,7 +489,7 @@ impl SchedulerIncoming for Scheduler {
492489
match best_err {
493490
Some((
494491
_,
495-
&mut ServerDetails {
492+
&mut WorkerDetails {
496493
last_error: Some(best_last_err),
497494
..
498495
},
@@ -611,13 +608,13 @@ impl SchedulerIncoming for Scheduler {
611608
})
612609
}
613610

614-
fn handle_heartbeat_server(
611+
fn handle_heartbeat_worker(
615612
&self,
616613
server_id: WorkerUrl,
617-
server_nonce: ServerNonce,
614+
server_nonce: WorkerNonce,
618615
num_cpus: usize,
619616
job_authorizer: Box<dyn JobAuthorizer>,
620-
) -> Result<HeartbeatServerResult> {
617+
) -> Result<HeartbeatWorkerResult> {
621618
if num_cpus == 0 {
622619
bail!("Invalid number of CPUs (0) specified in heartbeat")
623620
}
@@ -680,7 +677,7 @@ impl SchedulerIncoming for Scheduler {
680677
}
681678
}
682679

683-
return Ok(HeartbeatServerResult { is_new: false });
680+
return Ok(HeartbeatWorkerResult { is_new: false });
684681
}
685682
Some(ref mut details) if details.server_nonce != server_nonce => {
686683
for job_id in details.jobs_assigned.iter() {
@@ -697,7 +694,7 @@ impl SchedulerIncoming for Scheduler {
697694
info!("Registered new server {:?}", server_id);
698695
servers.insert(
699696
server_id,
700-
ServerDetails {
697+
WorkerDetails {
701698
last_seen: Instant::now(),
702699
last_error: None,
703700
jobs_assigned: HashSet::new(),
@@ -707,7 +704,7 @@ impl SchedulerIncoming for Scheduler {
707704
job_authorizer,
708705
},
709706
);
710-
Ok(HeartbeatServerResult { is_new: true })
707+
Ok(HeartbeatWorkerResult { is_new: true })
711708
}
712709

713710
fn handle_update_job_state(
@@ -778,21 +775,21 @@ impl SchedulerIncoming for Scheduler {
778775
}
779776
}
780777

781-
pub struct Server {
778+
pub struct Worker {
782779
builder: Box<dyn BuilderIncoming>,
783780
cache: Mutex<TcCache>,
784781
job_toolchains: tokio::sync::Mutex<HashMap<JobId, Toolchain>>,
785782
}
786783

787-
impl Server {
784+
impl Worker {
788785
pub fn new(
789786
builder: Box<dyn BuilderIncoming>,
790787
cache_dir: &Path,
791788
toolchain_cache_size: u64,
792-
) -> Result<Server> {
789+
) -> Result<Worker> {
793790
let cache = TcCache::new(&cache_dir.join("tc"), toolchain_cache_size)
794791
.context("Failed to create toolchain cache")?;
795-
Ok(Server {
792+
Ok(Worker {
796793
builder,
797794
cache: Mutex::new(cache),
798795
job_toolchains: tokio::sync::Mutex::new(HashMap::new()),
@@ -801,7 +798,7 @@ impl Server {
801798
}
802799

803800
#[async_trait]
804-
impl CoordinatorIncoming for Server {
801+
impl CoordinatorIncoming for Worker {
805802
async fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
806803
let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc);
807804
assert!(self

src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ pub mod scheduler {
835835
}
836836

837837
#[cfg(feature = "dist-worker")]
838-
pub mod coordinator {
838+
pub mod worker {
839839
use super::{HTTPUrl, WorkerUrl};
840840
use std::path::{Path, PathBuf};
841841

src/coordinator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1354,7 +1354,7 @@ pub struct CoordinatorStats {
13541354
pub dist_errors: u64,
13551355
}
13561356

1357-
/// Info and stats about the server.
1357+
/// Info and stats about the coordinator.
13581358
#[derive(Serialize, Deserialize, Clone, Debug)]
13591359
pub struct CoordinatorInfo {
13601360
pub stats: CoordinatorStats,

0 commit comments

Comments
 (0)