Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP feat: start runtime from deployer #450

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ POSTGRES_TAG?=latest

RUST_LOG?=debug

DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=latest APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD)
DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=14 APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD)

.PHONY: images clean src up down deploy shuttle-% postgres docker-compose.rendered.yml test

Expand Down
4 changes: 4 additions & 0 deletions deployer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ version = "0.7.0"
path = "../service"
features = ["loader"]

[dependencies.shuttle-runtime]
version = "0.1.0"
path = "../runtime"

oddgrd marked this conversation as resolved.
Show resolved Hide resolved
[dev-dependencies]
ctor = "0.1.22"
hex = "0.4.3"
Expand Down
127 changes: 76 additions & 51 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ use std::{
use async_trait::async_trait;
use portpicker::pick_unused_port;
use shuttle_common::project::ProjectName as ServiceName;
use shuttle_service::{
loader::{LoadedService, Loader},
Factory, Logger,
};
use shuttle_proto::runtime::{runtime_client::RuntimeClient, LoadRequest, StartRequest};

use shuttle_service::{Factory, Logger};
use tokio::task::JoinError;
use tracing::{debug, error, info, instrument, trace};
use tracing::{error, info, instrument, trace};
use uuid::Uuid;

use super::{provisioner_factory, runtime_logger, KillReceiver, KillSender, RunReceiver, State};
Expand Down Expand Up @@ -174,83 +173,109 @@ pub struct Built {
}

impl Built {
#[instrument(name = "built_handle", skip(self, libs_path, factory, logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
#[instrument(name = "built_handle", skip(self, libs_path, _factory, _logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
#[allow(clippy::too_many_arguments)]
async fn handle(
self,
address: SocketAddr,
libs_path: PathBuf,
factory: &mut dyn Factory,
logger: Logger,
_factory: &mut dyn Factory,
_logger: Logger,
kill_recv: KillReceiver,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>)
+ Send
+ 'static,
) -> Result<()> {
let service = load_deployment(&self.id, address, libs_path, factory, logger).await?;

// todo: refactor this?
kill_old_deployments.await?;

info!("got handle for deployment");
// Execute loaded service
tokio::spawn(run(self.id, service, address, kill_recv, cleanup));
tokio::spawn(run(
self.id,
self.service_name,
libs_path,
address,
kill_recv,
cleanup,
));

Ok(())
}
}

#[instrument(skip(service, kill_recv, cleanup), fields(address = %_address, state = %State::Running))]
#[instrument(skip(_kill_recv, _cleanup), fields(address = %_address, state = %State::Running))]
async fn run(
id: Uuid,
service: LoadedService,
service_name: String,
libs_path: PathBuf,
_address: SocketAddr,
mut kill_recv: KillReceiver,
cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>)
_kill_recv: KillReceiver,
_cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>)
+ Send
+ 'static,
) {
info!("starting up service");
let (mut handle, library) = service;
let result;
loop {
tokio::select! {
Ok(kill_id) = kill_recv.recv() => {
if kill_id == id {
debug!("deployment '{id}' killed");
handle.abort();
result = handle.await;
break;
}
}
rsl = &mut handle => {
result = rsl;
break;
}
}
}
info!("starting up deployer grpc client");
let mut client = RuntimeClient::connect("http://127.0.0.1:8002")
.await
.unwrap();

if let Err(err) = library.close() {
crashed_cleanup(&id, err);
} else {
cleanup(result);
}
}
info!(
"loading project from: {}",
libs_path.clone().into_os_string().into_string().unwrap()
);

#[instrument(skip(id, addr, libs_path, factory, logger))]
async fn load_deployment(
id: &Uuid,
addr: SocketAddr,
libs_path: PathBuf,
factory: &mut dyn Factory,
logger: Logger,
) -> Result<LoadedService> {
let so_path = libs_path.join(id.to_string());
let loader = Loader::from_so_file(so_path)?;

Ok(loader.load(factory, addr, logger).await?)
let load_request = tonic::Request::new(LoadRequest {
path: so_path.into_os_string().into_string().unwrap(),
service_name: service_name.clone(),
});
info!("loading service");
let response = client.load(load_request).await;

if let Err(e) = response {
info!("something went wrong {}", e);
}
info!("starting service");
let start_request = tonic::Request::new(StartRequest { service_name });
let _response = client.start(start_request).await.unwrap();
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
// let (mut handle, library) = service;

// let result;
// loop {
// tokio::select! {
// Ok(kill_id) = kill_recv.recv() => {
// todo!()
// }
// route = run => {
// result = rsl;
// break;
// }
// }
// }

// if let Err(err) = library.close() {
// crashed_cleanup(&id, err);
// } else {
// cleanup(result);
// }
}

// #[instrument(skip(id, addr, libs_path, factory, logger))]
// async fn load_deployment(
// id: &Uuid,
// addr: SocketAddr,
// libs_path: PathBuf,
// factory: &mut dyn Factory,
// logger: Logger,
// ) -> Result<LoadedService> {
// let so_path = libs_path.join(id.to_string());
// let loader = Loader::from_so_file(so_path)?;

// Ok(loader.load(factory, addr, logger).await?)
// }

#[cfg(test)]
mod tests {
use std::{
Expand Down
16 changes: 10 additions & 6 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use hyper::{
};
pub use persistence::Persistence;
use proxy::AddressGetter;
use tokio::select;
use tracing::{error, info};

mod args;
Expand Down Expand Up @@ -54,12 +55,15 @@ pub async fn start(
);
let make_service = router.into_make_service();

info!("Binding to and listening at address: {}", args.api_address);

axum::Server::bind(&args.api_address)
.serve(make_service)
.await
.unwrap_or_else(|_| panic!("Failed to bind to address: {}", args.api_address));
select! {
_ = tokio::spawn(shuttle_runtime::start_legacy()) => {
info!("Legacy runtime stopped.")
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
},
_ = axum::Server::bind(&args.api_address)
.serve(make_service) => {
info!("Handlers server error, addr: {}", &args.api_address);
},
}
}

pub async fn start_proxy(
Expand Down
3 changes: 2 additions & 1 deletion proto/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ message StartResponse {
// Was the start successful
bool success = 1;

// todo: find a way to add optional flag here
// Optional port the service was started on
// This is likely to be None for bots
optional uint32 port = 2;
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
uint32 port = 2;
}
14 changes: 7 additions & 7 deletions runtime/src/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ impl Runtime for Legacy {
&self,
request: Request<StartRequest>,
) -> Result<Response<StartResponse>, Status> {
let port = 8001;
let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
let service_port = 8001;
oddgrd marked this conversation as resolved.
Show resolved Hide resolved
let service_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), service_port);

let provisioner_client = ProvisionerClient::connect(self.provisioner_address.clone())
.await
Expand All @@ -84,18 +84,18 @@ impl Runtime for Legacy {
.map_err(|err| Status::from_error(Box::new(err)))?
.clone();

trace!(%address, "starting");
let service = load_service(address, so_path, &mut factory, logger)
trace!(%service_address, "starting");
let service = load_service(service_address, so_path, &mut factory, logger)
.await
.unwrap();

_ = tokio::spawn(run(service, address));
_ = tokio::spawn(run(service, service_address));

*self.port.lock().unwrap() = Some(port);
*self.port.lock().unwrap() = Some(service_port);

let message = StartResponse {
success: true,
port: Some(port as u32),
port: service_port as u32,
};

Ok(Response::new(message))
Expand Down
17 changes: 17 additions & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,23 @@ mod legacy;
mod next;
pub mod provisioner_factory;

use std::net::{Ipv4Addr, SocketAddr};

pub use args::Args;
pub use legacy::Legacy;
pub use next::Next;
use shuttle_proto::runtime::runtime_server::RuntimeServer;
use tonic::transport::{Endpoint, Server};

pub async fn start_legacy() {
// starting the router on 8002 to avoid conflicts
let grpc_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8002);
let provisioner_address = Endpoint::from_static("http://127.0.0.1:8000");

let legacy = Legacy::new(provisioner_address);
let svc = RuntimeServer::new(legacy);

let router = Server::builder().add_service(svc);

router.serve(grpc_address).await.unwrap();
}
3 changes: 2 additions & 1 deletion runtime/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl Runtime for Next {

let message = StartResponse {
success: true,
port: None,
// todo: port set here until I can set the port field to optional in the protobuf
port: 8001,
};

Ok(Response::new(message))
Expand Down