diff --git a/Makefile b/Makefile index 3b547af18..8ba720726 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ DD_ENV=production USE_TLS=enable else DOCKER_COMPOSE_FILES=-f docker-compose.yml -f docker-compose.dev.yml -STACK=shuttle-dev +STACK?=shuttle-dev APPS_FQDN=unstable.shuttleapp.rs DB_FQDN=db.unstable.shuttle.rs CONTAINER_REGISTRY=public.ecr.aws/shuttle-dev diff --git a/docker-compose.yml b/docker-compose.yml index 0291d99b9..5dd493f5b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,7 @@ networks: ipam: driver: default config: - - subnet: 10.99.0.0/24 + - subnet: 10.99.0.0/16 services: gateway: image: "${CONTAINER_REGISTRY}/gateway:${BACKEND_TAG}" diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 4f6d00d3a..aa0ec05f5 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -54,3 +54,4 @@ colored = "2" portpicker = "0.1" snailquote = "0.3" tempfile = "3.3.0" + diff --git a/gateway/src/project.rs b/gateway/src/project.rs index f5bae93e1..e36f105f0 100644 --- a/gateway/src/project.rs +++ b/gateway/src/project.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::convert::Infallible; +use std::convert::{identity, Infallible}; use std::net::{IpAddr, SocketAddr}; use std::time::Duration; @@ -7,8 +7,9 @@ use bollard::container::{ Config, CreateContainerOptions, RemoveContainerOptions, StopContainerOptions, }; use bollard::errors::Error as DockerError; -use bollard::models::{ContainerConfig, ContainerInspectResponse, ContainerStateStatusEnum}; +use bollard::models::{ContainerInspectResponse, ContainerStateStatusEnum}; use bollard::system::EventsOptions; +use fqdn::FQDN; use futures::prelude::*; use http::uri::InvalidUri; use http::Uri; @@ -81,6 +82,59 @@ where } } +pub trait ContainerInspectResponseExt { + fn container(&self) -> &ContainerInspectResponse; + + fn project_name(&self, prefix: &str) -> Result { + // This version can't be enabled while there are active + // deployers before v0.8.0 since the don't have this label + // TODO: switch to this version when you notice all deployers + // are greater than v0.8.0 + // let name = safe_unwrap!(container.config.labels.get("project.name")).to_string(); + + let container = self.container(); + let container_name = safe_unwrap!(container.name.strip_prefix("/")).to_string(); + safe_unwrap!(container_name.strip_prefix(prefix).strip_suffix("_run")) + .parse::() + .map_err(|_| ProjectError::internal("invalid project name")) + } + + fn find_arg_and_then<'s, F, O>(&'s self, find: &str, and_then: F) -> Result + where + F: FnOnce(&'s str) -> O, + O: 's, + { + let mut args = self.args()?.iter(); + let out = if args.any(|arg| arg.as_str() == find) { + args.next().map(|s| and_then(s.as_str())) + } else { + None + }; + out.ok_or_else(|| ProjectError::internal(format!("no such argument: {find}"))) + } + + fn args(&self) -> Result<&Vec, ProjectError> { + let container = self.container(); + Ok(safe_unwrap!(container.args)) + } + + fn fqdn(&self) -> Result { + self.find_arg_and_then("--proxy-fqdn", identity)? + .parse() + .map_err(|_| ProjectError::internal("invalid value for --proxy-fqdn")) + } + + fn initial_key(&self) -> Result { + self.find_arg_and_then("--admin-secret", str::to_owned) + } +} + +impl ContainerInspectResponseExt for ContainerInspectResponse { + fn container(&self) -> &ContainerInspectResponse { + self + } +} + impl From for Error { fn from(err: DockerError) -> Self { error!(error = %err, "internal Docker error"); @@ -185,9 +239,9 @@ impl Project { } } - pub fn initial_key(&self) -> Option<&String> { - if let Self::Creating(ProjectCreating { initial_key, .. }) = self { - Some(initial_key) + pub fn initial_key(&self) -> Option<&str> { + if let Self::Creating(creating) = self { + Some(creating.initial_key()) } else { None } @@ -293,24 +347,20 @@ where /// project into the wrong state if the docker is transitioning /// the state of its resources under us async fn refresh(self, ctx: &Ctx) -> Result { - let _container = if let Some(container_id) = self.container_id() { - Some(ctx.docker().inspect_container(&container_id, None).await?) - } else { - None - }; - let refreshed = match self { Self::Creating(creating) => Self::Creating(creating), Self::Starting(ProjectStarting { container }) | Self::Started(ProjectStarted { container, .. }) | Self::Ready(ProjectReady { container, .. }) | Self::Stopping(ProjectStopping { container }) - | Self::Stopped(ProjectStopped { container }) => { - let container = container.refresh(ctx).await?; - match container.state.as_ref().unwrap().status.as_ref().unwrap() { + | Self::Stopped(ProjectStopped { container }) => match container + .clone() + .refresh(ctx) + .await + { + Ok(container) => match container.state.as_ref().unwrap().status.as_ref().unwrap() { ContainerStateStatusEnum::RUNNING => { - let service = Service::from_container(container.clone())?; - Self::Started(ProjectStarted { container, service }) + Self::Started(ProjectStarted::new(container)) } ContainerStateStatusEnum::CREATED => { Self::Starting(ProjectStarting { container }) @@ -322,8 +372,19 @@ where "container resource has drifted out of sync: cannot recover", )) } + }, + Err(DockerError::DockerResponseServerError { + status_code: 404, .. + }) => { + // container not found, let's try to recreate it + // with the same image + let project_name = container.project_name(&ctx.container_settings().prefix)?; + let initial_key = container.initial_key()?; + let creating = ProjectCreating::new(project_name, initial_key).from(container); + Self::Creating(creating) } - } + Err(err) => return Err(err.into()), + }, Self::Destroying(destroying) => Self::Destroying(destroying), Self::Destroyed(destroyed) => Self::Destroyed(destroyed), Self::Errored(err) => Self::Errored(err), @@ -332,11 +393,18 @@ where } } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ProjectCreating { project_name: ProjectName, + /// The admin secret with which the start deployer initial_key: String, + /// Override the default fqdn (`${project_name}.${public}`) fqdn: Option, + /// Override the default image (specified in the args to this gateway) + image: Option, + /// Configuration will be extracted from there if specified (will + /// take precedence over other overrides) + from: Option, } impl ProjectCreating { @@ -345,9 +413,16 @@ impl ProjectCreating { project_name, initial_key, fqdn: None, + image: None, + from: None, } } + pub fn from(mut self, from: ContainerInspectResponse) -> Self { + self.from = Some(from); + self + } + pub fn with_fqdn(mut self, fqdn: String) -> Self { self.fqdn = Some(fqdn); self @@ -358,10 +433,19 @@ impl ProjectCreating { Self::new(project_name, initial_key) } + pub fn with_image(mut self, image: String) -> Self { + self.image = Some(image); + self + } + pub fn project_name(&self) -> &ProjectName { &self.project_name } + pub fn initial_key(&self) -> &str { + &self.initial_key + } + fn container_name(&self, ctx: &C) -> String { let prefix = &ctx.container_settings().prefix; @@ -375,7 +459,7 @@ impl ProjectCreating { ctx: &C, ) -> (CreateContainerOptions, Config) { let ContainerSettings { - image, + image: default_image, prefix, provisioner_host, network_name, @@ -388,43 +472,51 @@ impl ProjectCreating { initial_key, project_name, fqdn, + image, + .. } = &self; let create_container_options = CreateContainerOptions { name: self.container_name(ctx), }; - let container_config: ContainerConfig = deserialize_json!({ - "Image": image, - "Hostname": format!("{prefix}{project_name}"), - "Labels": { - "shuttle_prefix": prefix, - "project.name": project_name, - }, - "Cmd": [ - "--admin-secret", - initial_key, - "--project", - project_name, - "--api-address", - format!("0.0.0.0:{RUNTIME_API_PORT}"), - "--provisioner-address", - provisioner_host, - "--provisioner-port", - "8000", - "--proxy-address", - "0.0.0.0:8000", - "--proxy-fqdn", - fqdn.clone().unwrap_or(format!("{project_name}.{public}")), - "--artifacts-path", - "/opt/shuttle", - "--state", - "/opt/shuttle/deployer.sqlite", - ], - "Env": [ - "RUST_LOG=debug", - ] - }); + let container_config = self + .from + .as_ref() + .and_then(|container| container.config.clone()) + .unwrap_or_else(|| { + deserialize_json!({ + "Image": image.as_ref().unwrap_or(default_image), + "Hostname": format!("{prefix}{project_name}"), + "Labels": { + "shuttle.prefix": prefix, + "shuttle.project": project_name, + }, + "Cmd": [ + "--admin-secret", + initial_key, + "--project", + project_name, + "--api-address", + format!("0.0.0.0:{RUNTIME_API_PORT}"), + "--provisioner-address", + provisioner_host, + "--provisioner-port", + "8000", + "--proxy-address", + "0.0.0.0:8000", + "--proxy-fqdn", + fqdn.clone().unwrap_or(format!("{project_name}.{public}")), + "--artifacts-path", + "/opt/shuttle", + "--state", + "/opt/shuttle/deployer.sqlite", + ], + "Env": [ + "RUST_LOG=debug", + ] + }) + }); let mut config = Config::::from(container_config); @@ -517,16 +609,23 @@ where let container = self.container.refresh(ctx).await?; - let service = Service::from_container(container.clone())?; - - Ok(Self::Next { container, service }) + Ok(Self::Next::new(container)) } } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ProjectStarted { container: ContainerInspectResponse, - service: Service, + service: Option, +} + +impl ProjectStarted { + pub fn new(container: ContainerInspectResponse) -> Self { + Self { + container, + service: None, + } + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -547,7 +646,10 @@ where time::sleep(Duration::from_secs(1)).await; let container = self.container.refresh(ctx).await?; - let mut service = self.service; + let mut service = match self.service { + Some(service) => service, + None => Service::from_container(ctx, container.clone())?, + }; if service.is_healthy().await { Ok(Self::Next::Ready(ProjectReady { container, service })) @@ -564,7 +666,10 @@ where )); } - Ok(Self::Next::Started(ProjectStarted { container, service })) + Ok(Self::Next::Started(ProjectStarted { + container, + service: Some(service), + })) } } } @@ -589,7 +694,7 @@ where } impl ProjectReady { - pub fn name(&self) -> &str { + pub fn name(&self) -> &ProjectName { &self.service.name } @@ -619,26 +724,26 @@ impl HealthCheckRecord { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Service { - name: String, + name: ProjectName, target: IpAddr, last_check: Option, } impl Service { - pub fn from_container(container: ContainerInspectResponse) -> Result { - // This version can't be enabled while there are active deployers before v0.8.0 since the don't have this label - // TODO: switch to this version when you notice all deployers are greater than v0.8.0 - // let name = safe_unwrap!(container.config.labels.get("project.name")).to_string(); - let container_name = safe_unwrap!(container.name.strip_prefix("/")).to_string(); - let prefix = safe_unwrap!(container.config.labels.get("shuttle_prefix")).to_string(); - let resource_name = - safe_unwrap!(container_name.strip_prefix(&prefix).strip_suffix("_run")).to_string(); + pub fn from_container( + ctx: &Ctx, + container: ContainerInspectResponse, + ) -> Result { + let resource_name = container.project_name(&ctx.container_settings().prefix)?; let network = safe_unwrap!(container.network_settings.networks) .values() .next() .ok_or_else(|| ProjectError::internal("project was not linked to a network"))?; - let target = safe_unwrap!(network.ip_address).parse().unwrap(); + + let target = safe_unwrap!(network.ip_address) + .parse() + .map_err(|_| ProjectError::internal("project did not join the network"))?; Ok(Self { name: resource_name, @@ -962,6 +1067,8 @@ pub mod tests { project_name: "my-project-test".parse().unwrap(), initial_key: "test".to_string(), fqdn: None, + image: None, + from: None, }), #[assertion = "Container created, assigned an `id`"] Ok(Project::Starting(ProjectStarting { diff --git a/gateway/src/service.rs b/gateway/src/service.rs index de535c3fe..4fe54ff1c 100644 --- a/gateway/src/service.rs +++ b/gateway/src/service.rs @@ -273,11 +273,18 @@ impl GatewayService { project_name: &ProjectName, project: &Project, ) -> Result<(), Error> { - query("UPDATE projects SET project_state = ?1 WHERE project_name = ?2") + let query = match project { + Project::Creating(state) => query( + "UPDATE projects SET initial_key = ?1, project_state = ?2 WHERE project_name = ?3", + ) + .bind(state.initial_key()) .bind(SqlxJson(project)) - .bind(project_name) - .execute(&self.db) - .await?; + .bind(project_name), + _ => query("UPDATE projects SET project_state = ?1 WHERE project_name = ?2") + .bind(SqlxJson(project)) + .bind(project_name), + }; + query.execute(&self.db).await?; Ok(()) } @@ -419,14 +426,9 @@ impl GatewayService { let project = row.get::, _>("project_state").0; if project.is_destroyed() { // But is in `::Destroyed` state, recreate it - let project = SqlxJson(Project::create(project_name.clone())); - query("UPDATE projects SET project_state = ?1, initial_key = ?2 WHERE project_name = ?3") - .bind(&project) - .bind(project.initial_key().unwrap()) - .bind(&project_name) - .execute(&self.db) - .await?; - Ok(project.0) + let project = Project::create(project_name.clone()); + self.update_project(&project_name, &project).await?; + Ok(project) } else { // Otherwise it already exists Err(Error::from_kind(ErrorKind::ProjectAlreadyExists)) diff --git a/gateway/src/task.rs b/gateway/src/task.rs index f0e1012da..b6abedb21 100644 --- a/gateway/src/task.rs +++ b/gateway/src/task.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; use tokio::time::{sleep, timeout}; -use tracing::warn; +use tracing::{info, warn}; use uuid::Uuid; use crate::project::*; @@ -108,17 +108,16 @@ pub fn destroy() -> impl Task { pub fn check_health() -> impl Task { run(|ctx| async move { - if let Project::Ready(mut ready) = ctx.state { - if ready.is_healthy().await { - TaskResult::Done(Project::Ready(ready)) - } else { - match Project::Ready(ready).refresh(&ctx.gateway).await { - Ok(update) => TaskResult::Done(update), - Err(err) => TaskResult::Err(err), + match ctx.state.refresh(&ctx.gateway).await { + Ok(Project::Ready(mut ready)) => { + if ready.is_healthy().await { + TaskResult::Done(Project::Ready(ready)) + } else { + TaskResult::Done(Project::Ready(ready).stop().unwrap()) } } - } else { - TaskResult::Err(Error::from_kind(ErrorKind::NotReady)) + Ok(update) => TaskResult::Done(update), + Err(err) => TaskResult::Err(err), } }) } @@ -323,6 +322,8 @@ where let ctx = self.service.context(); + info!(%self.project_name, "starting work on project"); + let project = match self.service.find_project(&self.project_name).await { Ok(project) => project, Err(err) => return TaskResult::Err(err),