Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP feat: start runtime from deployer #450

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ POSTGRES_TAG?=latest

RUST_LOG?=debug

DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=latest APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD)
DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=14 APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD)

.PHONY: images clean src up down deploy shuttle-% postgres docker-compose.rendered.yml test

Expand Down
3 changes: 1 addition & 2 deletions deployer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ once_cell = "1.14.0"
opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
opentelemetry-datadog = { version = "0.5.0", features = ["reqwest-client"] }
pipe = "0.4.0"
portpicker = "0.1.1"
serde = "1.0.137"
serde_json = "1.0.81"
sqlx = { version = "0.6.0", features = ["runtime-tokio-native-tls", "sqlite", "chrono", "json", "migrate", "uuid"] }
strum = { version = "0.24.1", features = ["derive"] }
tar = "0.4.38"
thiserror = "1.0.24"
tokio = { version = "1.19.2", features = ["fs"] }
tokio = { version = "1.19.2", features = ["fs", "process"] }
toml = "0.5.9"
tonic = "0.8.2"
tower = { version = "0.4.12", features = ["make"] }
Expand Down
6 changes: 1 addition & 5 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,7 @@ mod tests {
impl provisioner_factory::AbstractFactory for StubAbstractProvisionerFactory {
type Output = StubProvisionerFactory;

fn get_factory(
&self,
_project_name: shuttle_common::project::ProjectName,
_service_id: Uuid,
) -> Self::Output {
fn get_factory(&self) -> Self::Output {
StubProvisionerFactory
}
}
Expand Down
111 changes: 48 additions & 63 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ use std::{
};

use async_trait::async_trait;
use portpicker::pick_unused_port;
use shuttle_common::project::ProjectName as ServiceName;
use shuttle_service::{
loader::{LoadedService, Loader},
Factory, Logger,
};
use shuttle_proto::runtime::{runtime_client::RuntimeClient, LoadRequest, StartRequest};

use shuttle_service::{Factory, Logger};
use tokio::task::JoinError;
use tracing::{debug, error, info, instrument, trace};
use tracing::{error, info, instrument, trace};
use uuid::Uuid;

use super::{provisioner_factory, runtime_logger, KillReceiver, KillSender, RunReceiver, State};
Expand Down Expand Up @@ -42,18 +40,9 @@ pub async fn task(
let kill_send = kill_send.clone();
let kill_recv = kill_send.subscribe();

let port = match pick_unused_port() {
Some(port) => port,
None => {
start_crashed_cleanup(
&id,
Error::PrepareLoad(
"could not find a free port to deploy service on".to_string(),
),
);
continue;
}
};
// todo: this is the port the legacy runtime is hardcoded to start services on
let port = 7001;

let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
let _service_name = match ServiceName::from_str(&built.service_name) {
Ok(name) => name,
Expand Down Expand Up @@ -174,81 +163,77 @@ pub struct Built {
}

impl Built {
#[instrument(name = "built_handle", skip(self, libs_path, factory, logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
#[instrument(name = "built_handle", skip(self, libs_path, _factory, _logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
#[allow(clippy::too_many_arguments)]
async fn handle(
self,
address: SocketAddr,
libs_path: PathBuf,
factory: &mut dyn Factory,
logger: Logger,
_factory: &mut dyn Factory,
_logger: Logger,
kill_recv: KillReceiver,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>)
+ Send
+ 'static,
) -> Result<()> {
let service = load_deployment(&self.id, address, libs_path, factory, logger).await?;

// todo: refactor this?
kill_old_deployments.await?;

info!("got handle for deployment");
// Execute loaded service
tokio::spawn(run(self.id, service, address, kill_recv, cleanup));
tokio::spawn(run(
self.id,
self.service_name,
libs_path,
address,
kill_recv,
cleanup,
));

Ok(())
}
}

#[instrument(skip(service, kill_recv, cleanup), fields(address = %_address, state = %State::Running))]
#[instrument(skip(_kill_recv, _cleanup), fields(address = %_address, state = %State::Running))]
async fn run(
id: Uuid,
service: LoadedService,
service_name: String,
libs_path: PathBuf,
_address: SocketAddr,
mut kill_recv: KillReceiver,
cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>)
_kill_recv: KillReceiver,
_cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>)
+ Send
+ 'static,
) {
info!("starting up service");
let (mut handle, library) = service;
let result;
loop {
tokio::select! {
Ok(kill_id) = kill_recv.recv() => {
if kill_id == id {
debug!("deployment '{id}' killed");
handle.abort();
result = handle.await;
break;
}
}
rsl = &mut handle => {
result = rsl;
break;
}
}
}
info!("starting up deployer grpc client");
let mut client = RuntimeClient::connect("http://127.0.0.1:6001")
.await
.unwrap();

if let Err(err) = library.close() {
crashed_cleanup(&id, err);
} else {
cleanup(result);
}
}
info!(
"loading project from: {}",
libs_path.clone().into_os_string().into_string().unwrap()
);

#[instrument(skip(id, addr, libs_path, factory, logger))]
async fn load_deployment(
id: &Uuid,
addr: SocketAddr,
libs_path: PathBuf,
factory: &mut dyn Factory,
logger: Logger,
) -> Result<LoadedService> {
let so_path = libs_path.join(id.to_string());
let loader = Loader::from_so_file(so_path)?;
let load_request = tonic::Request::new(LoadRequest {
path: so_path.into_os_string().into_string().unwrap(),
service_name: service_name.clone(),
});
info!("loading service");
let response = client.load(load_request).await;

if let Err(e) = response {
info!("failed to load service: {}", e);
}

let start_request = tonic::Request::new(StartRequest { service_name });

info!("starting service");
let response = client.start(start_request).await.unwrap();

Ok(loader.load(factory, addr, logger).await?)
info!(response = ?response, "client response: ");
}

#[cfg(test)]
Expand Down
2 changes: 0 additions & 2 deletions deployer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub enum Error {
InputOutput(#[from] io::Error),
#[error("Build error: {0}")]
Build(#[source] Box<dyn StdError + Send>),
#[error("Prepare to load error: {0}")]
PrepareLoad(String),
#[error("Load error: {0}")]
Load(#[from] LoaderError),
#[error("Run error: {0}")]
Expand Down
28 changes: 22 additions & 6 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::Infallible, net::SocketAddr};
use std::{convert::Infallible, env, net::SocketAddr, path::PathBuf};

pub use args::Args;
pub use deployment::{
Expand All @@ -13,6 +13,7 @@ use hyper::{
};
pub use persistence::Persistence;
use proxy::AddressGetter;
use tokio::select;
use tracing::{error, info};

mod args;
Expand Down Expand Up @@ -54,12 +55,27 @@ pub async fn start(
);
let make_service = router.into_make_service();

info!("Binding to and listening at address: {}", args.api_address);
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.to_path_buf();

axum::Server::bind(&args.api_address)
.serve(make_service)
.await
.unwrap_or_else(|_| panic!("Failed to bind to address: {}", args.api_address));
let runtime_dir = workspace_root.join("target/debug");

let mut runtime = tokio::process::Command::new(runtime_dir.join("shuttle-runtime"))
.args(&["--legacy", "--provisioner-address", "http://localhost:8000"])
.current_dir(&runtime_dir)
.spawn()
.unwrap();

select! {
_ = runtime.wait() => {
info!("Legacy runtime stopped.")
},
_ = axum::Server::bind(&args.api_address).serve(make_service) => {
info!("Handlers server stopped serving addr: {}", &args.api_address);
},
}
}

pub async fn start_proxy(
Expand Down
3 changes: 2 additions & 1 deletion proto/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ message StartResponse {
// Was the start successful
bool success = 1;

// todo: find a way to add optional flag here
// Optional port the service was started on
// This is likely to be None for bots
optional uint32 port = 2;
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
uint32 port = 2;
}
8 changes: 4 additions & 4 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod provisioner {
// This clippy is disabled as per this prost comment
// https://github.com/tokio-rs/prost/issues/661#issuecomment-1156606409
#![allow(clippy::derive_partial_eq_without_eq)]
// This clippy is disabled as per this prost comment
// https://github.com/tokio-rs/prost/issues/661#issuecomment-1156606409
#![allow(clippy::derive_partial_eq_without_eq)]

pub mod provisioner {
use std::fmt::Display;

use shuttle_common::{
Expand Down
11 changes: 10 additions & 1 deletion runtime/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@ To test, first start a provisioner from the root directory using:
docker-compose -f docker-compose.rendered.yml up provisioner
```

Then in another shell, start the runtime using:
Then in another shell, start the runtime using the clap CLI:

```bash
cargo run -- --legacy --provisioner-address http://localhost:8000
```

Or directly (this is the path hardcoded in `deployer::start`):
```bash
# first, make sure the shuttle-runtime binary is built
cargo build
# then
/home/<path to shuttle repo>/target/debug/shuttle-runtime --legacy --provisioner-address http://localhost:8000
```

Pass the path to `deployer::start`
Then in another shell, load a `.so` file and start it up:

``` bash
Expand Down
14 changes: 7 additions & 7 deletions runtime/src/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ impl Runtime for Legacy {
&self,
request: Request<StartRequest>,
) -> Result<Response<StartResponse>, Status> {
let port = 8001;
let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
let service_port = 7001;
let service_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), service_port);

let provisioner_client = ProvisionerClient::connect(self.provisioner_address.clone())
.await
Expand All @@ -84,18 +84,18 @@ impl Runtime for Legacy {
.map_err(|err| Status::from_error(Box::new(err)))?
.clone();

trace!(%address, "starting");
let service = load_service(address, so_path, &mut factory, logger)
trace!(%service_address, "starting");
let service = load_service(service_address, so_path, &mut factory, logger)
.await
.unwrap();

_ = tokio::spawn(run(service, address));
_ = tokio::spawn(run(service, service_address));

*self.port.lock().unwrap() = Some(port);
*self.port.lock().unwrap() = Some(service_port);

let message = StartResponse {
success: true,
port: Some(port as u32),
port: service_port as u32,
};

Ok(Response::new(message))
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() {

trace!(args = ?args, "parsed args");

let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 6001);

let provisioner_address = args.provisioner_address;

Expand Down
3 changes: 2 additions & 1 deletion runtime/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl Runtime for Next {

let message = StartResponse {
success: true,
port: None,
// todo: port set here until I can set the port field to optional in the protobuf
port: 8001,
};

Ok(Response::new(message))
Expand Down