Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gateway admin revive #412

Merged
merged 9 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions gateway/src/api/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ pub mod tests {
#[tokio::test]
async fn api_create_get_delete_projects() -> anyhow::Result<()> {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(256);
tokio::spawn(async move {
Expand Down Expand Up @@ -326,8 +325,7 @@ pub mod tests {
#[tokio::test]
async fn api_create_get_users() -> anyhow::Result<()> {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(256);
tokio::spawn(async move {
Expand Down Expand Up @@ -416,8 +414,7 @@ pub mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn status() {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(1);
let (ctl_send, ctl_recv) = oneshot::channel();
Expand Down
46 changes: 33 additions & 13 deletions gateway/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Args {
pub enum Commands {
Start(StartArgs),
Init(InitArgs),
Exec(ExecCmds),
}

#[derive(clap::Args, Debug, Clone)]
Expand All @@ -29,6 +30,35 @@ pub struct StartArgs {
/// Address to bind the user plane to
#[arg(long, default_value = "127.0.0.1:8000")]
pub user: SocketAddr,
#[command(flatten)]
pub context: ContextArgs,
}

#[derive(clap::Args, Debug, Clone)]
pub struct InitArgs {
/// Name of initial account to create
#[arg(long)]
pub name: String,
/// Key to assign to initial account
#[arg(long)]
pub key: Option<Key>,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ExecCmds {
#[command(flatten)]
pub context: ContextArgs,
#[command(subcommand)]
pub command: ExecCmd,
}

#[derive(Subcommand, Debug, Clone)]
pub enum ExecCmd {
Revive,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ContextArgs {
/// Default image to deploy user runtimes into
#[arg(long, default_value = "public.ecr.aws/shuttle/deployer:latest")]
pub image: String,
Expand All @@ -40,23 +70,13 @@ pub struct StartArgs {
/// the provisioner service
#[arg(long, default_value = "provisioner")]
pub provisioner_host: String,
/// The path to the docker daemon socket
#[arg(long, default_value = "/var/run/docker.sock")]
pub docker_host: String,
/// The Docker Network name in which to deploy user runtimes
#[arg(long, default_value = "shuttle_default")]
pub network_name: String,
/// FQDN where the proxy can be reached at
#[arg(long)]
pub proxy_fqdn: FQDN,
}

#[derive(clap::Args, Debug, Clone)]
pub struct InitArgs {
/// Name of initial account to create
#[arg(long)]
pub name: String,
/// Key to assign to initial account
#[arg(long)]
pub key: Option<Key>,
/// The path to the docker daemon socket
#[arg(long, default_value = "/var/run/docker.sock")]
pub docker_host: String,
}
38 changes: 19 additions & 19 deletions gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub mod tests {
use tracing::info;

use crate::api::make_api;
use crate::args::StartArgs;
use crate::args::{ContextArgs, StartArgs};
use crate::auth::User;
use crate::proxy::make_proxy;
use crate::service::{ContainerSettings, GatewayService, MIGRATIONS};
Expand Down Expand Up @@ -485,21 +485,18 @@ pub mod tests {
args: StartArgs,
hyper: HyperClient<HttpConnector, Body>,
pool: SqlitePool,
fqdn: String,
}

#[derive(Clone, Copy)]
pub struct WorldContext<'c> {
pub docker: &'c Docker,
pub container_settings: &'c ContainerSettings,
pub hyper: &'c HyperClient<HttpConnector, Body>,
pub fqdn: &'c str,
}

impl World {
pub async fn new() -> Self {
let docker = Docker::connect_with_local_defaults().unwrap();
let fqdn = "test.shuttleapp.rs".to_string();

docker
.list_images::<&str>(None)
Expand Down Expand Up @@ -529,17 +526,19 @@ pub mod tests {

let args = StartArgs {
control,
docker_host,
user,
image,
prefix,
provisioner_host,
network_name,
proxy_fqdn: FQDN::from_str(&fqdn).unwrap(),
context: ContextArgs {
docker_host,
image,
prefix,
provisioner_host,
network_name,
proxy_fqdn: FQDN::from_str("test.shuttleapp.rs").unwrap(),
},
};

let settings = ContainerSettings::builder(&docker, fqdn.clone())
.from_args(&args)
let settings = ContainerSettings::builder(&docker)
.from_args(&args.context)
.await;

let hyper = HyperClient::builder().build(HttpConnector::new());
Expand All @@ -553,12 +552,11 @@ pub mod tests {
args,
hyper,
pool,
fqdn,
}
}

pub fn args(&self) -> StartArgs {
self.args.clone()
pub fn args(&self) -> ContextArgs {
self.args.context.clone()
}

pub fn pool(&self) -> SqlitePool {
Expand All @@ -570,7 +568,11 @@ pub mod tests {
}

pub fn fqdn(&self) -> String {
self.fqdn.clone()
self.args()
.proxy_fqdn
.to_string()
.trim_end_matches('.')
.to_string()
}
}

Expand All @@ -580,7 +582,6 @@ pub mod tests {
docker: &self.docker,
container_settings: &self.settings,
hyper: &self.hyper,
fqdn: &self.fqdn,
}
}
}
Expand All @@ -598,8 +599,7 @@ pub mod tests {
#[tokio::test]
async fn end_to_end() {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);
let worker = Worker::new(Arc::clone(&service));

let (log_out, mut log_in) = channel(256);
Expand Down
20 changes: 17 additions & 3 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use clap::Parser;
use futures::prelude::*;
use shuttle_gateway::args::{Args, Commands, InitArgs};
use shuttle_gateway::args::{Args, Commands, ExecCmd, ExecCmds, InitArgs};
use shuttle_gateway::auth::Key;
use shuttle_gateway::proxy::make_proxy;
use shuttle_gateway::service::{GatewayService, MIGRATIONS};
use shuttle_gateway::worker::{Work, Worker};
use shuttle_gateway::{api::make_api, args::StartArgs};
use shuttle_gateway::{Refresh, Service};
use shuttle_gateway::{project, Refresh, Service};
use sqlx::migrate::MigrateDatabase;
use sqlx::{query, Sqlite, SqlitePool};
use std::io;
Expand Down Expand Up @@ -55,16 +55,18 @@ async fn main() -> io::Result<()> {
match args.command {
Commands::Start(start_args) => start(db, start_args).await,
Commands::Init(init_args) => init(db, init_args).await,
Commands::Exec(exec_cmd) => exec(db, exec_cmd).await,
}
}

async fn start(db: SqlitePool, args: StartArgs) -> io::Result<()> {
let fqdn = args
.context
.proxy_fqdn
.to_string()
.trim_end_matches('.')
.to_string();
let gateway = Arc::new(GatewayService::init(args.clone(), fqdn.clone(), db).await);
let gateway = Arc::new(GatewayService::init(args.context.clone(), db).await);

let worker = Worker::new(Arc::clone(&gateway));

Expand Down Expand Up @@ -146,3 +148,15 @@ async fn init(db: SqlitePool, args: InitArgs) -> io::Result<()> {
println!("`{}` created as super user with key: {key}", args.name);
Ok(())
}

async fn exec(db: SqlitePool, exec_cmd: ExecCmds) -> io::Result<()> {
let gateway = GatewayService::init(exec_cmd.context.clone(), db).await;

match exec_cmd.command {
ExecCmd::Revive => project::exec::revive(gateway)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?,
};

Ok(())
}
56 changes: 56 additions & 0 deletions gateway/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,62 @@ impl<'c> State<'c> for ProjectError {
}
}

pub mod exec {
use bollard::service::ContainerState;

use crate::{
service::GatewayService,
worker::{do_work, Work},
};

use super::*;

pub async fn revive(gateway: GatewayService) -> Result<(), ProjectError> {
let mut mutations = Vec::new();

for Work {
project_name,
account_name,
work,
} in gateway
.iter_projects()
.await
.expect("could not list projects")
{
if let Project::Errored(ProjectError { ctx: Some(ctx), .. }) = work {
if let Some(container) = ctx.container() {
if let Ok(container) = gateway
.context()
.docker()
.inspect_container(safe_unwrap!(container.id), None)
.await
{
if let Some(ContainerState {
status: Some(ContainerStateStatusEnum::EXITED),
..
}) = container.state
{
mutations.push(Work {
project_name,
account_name,
work: Project::Stopped(ProjectStopped { container }),
});
}
}
}
}
}

for work in mutations {
debug!(?work, "project will be revived");

do_work(work, &gateway).await;
}

Ok(())
}
}

#[cfg(test)]
pub mod tests {

Expand Down
Loading