Skip to content

Commit

Permalink
Worker services now start after listeners to prevent error confiusion
Browse files Browse the repository at this point in the history
To prevent an error message from the worker stating it cannot connect
to the scheduler due to startup ordering, we now always startup
workers after schedulers are listening.
  • Loading branch information
allada committed Sep 13, 2023
1 parent 637c8a9 commit 16f2ca5
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 65 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# --release causes link-time-optmization to be enabled, which can take a while
# to compile, but will result in a much faster binary.
#
# Note: If you see an error like:
# "Could not connect to endpoint grpc://127.0.0.1:50061"
# You can ignore it is caused because the worker & scheduler are in the same
# process and the worker tried to connect to the scheduler before it was ready.
# It will auto-reconnect when able and everything will work fine.
cargo run --release --bin cas -- ./config/examples/basic_cas.json
```
In a separate terminal session, run the following command to connect the running server launched above to Bazel or another BRE client:
Expand Down Expand Up @@ -69,7 +63,6 @@ This project can be considered ~stable~ and is currently used in production syst
We support building with Bazel or Cargo. Cargo **might** produce faster binaries because LTO (Link Time Optimization) is enabled for release versions, where Bazel currently does not support LTO for rust.

### Bazel requirements
* Linux (most recent versions) (untested on Windows, but might work)
* Bazel 5.0.0+
* gcc
* g++
Expand Down
1 change: 1 addition & 0 deletions cas/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rust_binary(
"//util:common",
"//util:error",
"//util:metrics_utils",
"@crate_index//:async-lock",
"@crate_index//:clap",
"@crate_index//:drop_guard",
"@crate_index//:env_logger",
Expand Down
117 changes: 63 additions & 54 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use async_lock::Mutex as AsyncMutex;
use axum::Router;
use clap::Parser;
use drop_guard::guard;
Expand Down Expand Up @@ -103,55 +104,6 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
}
}

let mut futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();
{
let worker_cfgs = cfg.workers.unwrap_or(vec![]);
let root_worker_metrics = root_metrics_registry.sub_registry_with_prefix("workers");
let mut worker_names = HashSet::with_capacity(worker_cfgs.len());
for (i, worker_cfg) in worker_cfgs.into_iter().enumerate() {
let spawn_fut = match worker_cfg {
WorkerConfig::local(local_worker_cfg) => {
let fast_slow_store =
store_manager
.get_store(&local_worker_cfg.cas_fast_slow_store)
.err_tip(|| {
format!(
"Failed to find store for cas_store_ref in worker config : {}",
local_worker_cfg.cas_fast_slow_store
)
})?;
let ac_store = store_manager.get_store(&local_worker_cfg.ac_store).err_tip(|| {
format!(
"Failed to find store for ac_store_ref in worker config : {}",
local_worker_cfg.ac_store
)
})?;
let local_worker =
new_local_worker(Arc::new(local_worker_cfg), fast_slow_store.clone(), ac_store.clone())
.await
.err_tip(|| "Could not make LocalWorker")?;
let name = if local_worker.name().is_empty() {
format!("worker_{}", i)
} else {
local_worker.name().clone()
};
if worker_names.contains(&name) {
Err(Box::new(make_err!(
Code::InvalidArgument,
"Duplicate worker name '{}' found in config",
name
)))?;
}
let worker_metrics = root_worker_metrics.sub_registry_with_prefix(&name);
local_worker.register_metrics(worker_metrics);
worker_names.insert(name);
tokio::spawn(local_worker.run())
}
};
futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
}
}

fn into_encoding(from: &CompressionAlgorithm) -> Option<CompressionEncoding> {
match from {
CompressionAlgorithm::Gzip => Some(CompressionEncoding::Gzip),
Expand Down Expand Up @@ -201,8 +153,10 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
})
.collect();

let mut root_futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();

// Lock our registry as immutable and clonable.
let root_metrics_registry = Arc::new(root_metrics_registry);
let root_metrics_registry = Arc::new(AsyncMutex::new(root_metrics_registry));
for (server_cfg, connected_clients_mux) in servers_and_clients {
let services = server_cfg.services.ok_or("'services' must be configured")?;

Expand Down Expand Up @@ -387,7 +341,8 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
// if it needs to wait on a future.
spawn_blocking(move || {
let mut buf = String::new();
prometheus_client::encoding::text::encode(&mut buf, &root_metrics_registry)
let root_metrics_registry_guard = futures::executor::block_on(root_metrics_registry.lock());
prometheus_client::encoding::text::encode(&mut buf, &root_metrics_registry_guard)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map(|_| {
// This is a hack to get around this bug: https://github.com/prometheus/client_rust/issues/155
Expand All @@ -413,9 +368,12 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
)
}

let tcp_listener = TcpListener::bind(&server_cfg.listen_address.parse::<SocketAddr>()?).await?;
futures.push(Box::pin(async move {
let socket_addr = server_cfg.listen_address.parse::<SocketAddr>()?;
let tcp_listener = TcpListener::bind(&socket_addr).await?;
log::warn!("Ready, listening on {}", socket_addr);
root_futures.push(Box::pin(async move {
loop {
// Wait for client to connect.
let (tcp_stream, remote_addr) = tcp_listener.accept().await?;
connected_clients_mux.inner.lock().insert(remote_addr);
// This is the safest way to guarantee that if our future
Expand All @@ -435,7 +393,58 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
}));
}

if let Err(e) = select_all(futures).await.0 {
{
// We start workers after our TcpListener is setup so if our worker connects to one
// of these services it will be able to connect.
let worker_cfgs = cfg.workers.unwrap_or(vec![]);
let mut root_metrics_registry_guard = root_metrics_registry.lock().await;
let root_worker_metrics = root_metrics_registry_guard.sub_registry_with_prefix("workers");
let mut worker_names = HashSet::with_capacity(worker_cfgs.len());
for (i, worker_cfg) in worker_cfgs.into_iter().enumerate() {
let spawn_fut = match worker_cfg {
WorkerConfig::local(local_worker_cfg) => {
let fast_slow_store =
store_manager
.get_store(&local_worker_cfg.cas_fast_slow_store)
.err_tip(|| {
format!(
"Failed to find store for cas_store_ref in worker config : {}",
local_worker_cfg.cas_fast_slow_store
)
})?;
let ac_store = store_manager.get_store(&local_worker_cfg.ac_store).err_tip(|| {
format!(
"Failed to find store for ac_store_ref in worker config : {}",
local_worker_cfg.ac_store
)
})?;
let local_worker =
new_local_worker(Arc::new(local_worker_cfg), fast_slow_store.clone(), ac_store.clone())
.await
.err_tip(|| "Could not make LocalWorker")?;
let name = if local_worker.name().is_empty() {
format!("worker_{}", i)
} else {
local_worker.name().clone()
};
if worker_names.contains(&name) {
Err(Box::new(make_err!(
Code::InvalidArgument,
"Duplicate worker name '{}' found in config",
name
)))?;
}
let worker_metrics = root_worker_metrics.sub_registry_with_prefix(&name);
local_worker.register_metrics(worker_metrics);
worker_names.insert(name);
tokio::spawn(local_worker.run())
}
};
root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
}
}

if let Err(e) = select_all(root_futures).await.0 {
panic!("{:?}", e);
}
unreachable!("None of the futures should resolve in main()");
Expand Down
6 changes: 2 additions & 4 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,9 @@ impl ByteStreamServer {
}

let digest = DigestInfo::try_new(resource_info.hash, resource_info.expected_size)?;
let result = tokio::spawn(async move { Pin::new(store_clone.as_ref()).has(digest).await })
.await
.err_tip(|| "Failed to join spawn")?;

let Some(item_size) = result.err_tip(|| "Failed to call .has() on store")? else {
let has_fut = Pin::new(store_clone.as_ref()).has(digest);
let Some(item_size) = has_fut.await.err_tip(|| "Failed to call .has() on store")? else {
return Err(make_err!(Code::NotFound, "{}", "not found"));
};
Ok(Response::new(QueryWriteStatusResponse {
Expand Down
1 change: 1 addition & 0 deletions cas/worker/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ impl<T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorker<T, U> {
update_for_worker_stream,
),
};
log::warn!("Worker {} connected to scheduler", inner.worker_id);

// Now listen for connections and run all other services.
if let Err(e) = inner.run(update_for_worker_stream).await {
Expand Down
1 change: 1 addition & 0 deletions gencargo/cas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ path = "../../cas/cas_main.rs"
doctest = false

[dependencies]
async-lock = { workspace = true }
axum = { workspace = true }
clap = { workspace = true }
drop_guard = { workspace = true }
Expand Down

0 comments on commit 16f2ca5

Please sign in to comment.