Skip to content

Commit

Permalink
feat: deployer next (#575)
Browse files Browse the repository at this point in the history
* feat: propagate next runtime

* feat: store is_next in DB

* feat: runtime manager to allow deployer to start up both runtimes

* feat: make sure tests run

* refactor: better migration query

* refactor: handle runtime errors better

* feat: shutdown runtimes

* bug: missing so

* bug: stop services

* bug: ffi and runtime manager not living long enough

* bug: missing so error

* refactor: run cleanups

* refactor: clippy suggestions
  • Loading branch information
chesedo authored Jan 16, 2023
1 parent b17b3a1 commit cc072b2
Show file tree
Hide file tree
Showing 21 changed files with 525 additions and 250 deletions.
1 change: 1 addition & 0 deletions cargo-shuttle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ impl Shuttle {
is_wasm,
runtime::StorageManagerType::WorkingDir(working_directory.to_path_buf()),
&format!("http://localhost:{}", run_args.port + 1),
run_args.port + 2,
)
.await
.map_err(|err| {
Expand Down
1 change: 1 addition & 0 deletions codegen/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
quote!(
#app

#[cfg(not(test))]
#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn __SHUTTLE_Axum_call(
Expand Down
1 change: 1 addition & 0 deletions deployer/migrations/0001_next.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE deployments ADD COLUMN is_next BOOLEAN DEFAULT 0 NOT NULL;
96 changes: 30 additions & 66 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use chrono::{DateTime, Utc};
use serde_json::json;
use shuttle_common::{tracing::JsonVisitor, STATE_MESSAGE};
use shuttle_proto::runtime;
use std::{net::SocketAddr, str::FromStr, time::SystemTime};
use tracing::{error, field::Visit, span, warn, Metadata, Subscriber};
use std::{str::FromStr, time::SystemTime};
use tracing::{field::Visit, span, warn, Metadata, Subscriber};
use tracing_subscriber::Layer;
use uuid::Uuid;

Expand Down Expand Up @@ -63,8 +63,6 @@ pub struct Log {
pub fields: serde_json::Value,

pub r#type: LogType,

pub address: Option<String>,
}

impl From<Log> for persistence::Log {
Expand Down Expand Up @@ -106,23 +104,10 @@ impl From<Log> for shuttle_common::LogItem {

impl From<Log> for DeploymentState {
fn from(log: Log) -> Self {
let address = if let Some(address_str) = log.address {
match SocketAddr::from_str(&address_str) {
Ok(address) => Some(address),
Err(err) => {
error!(error = %err, "failed to convert to [SocketAddr]");
None
}
}
} else {
None
};

Self {
id: log.id,
state: log.state,
last_update: log.timestamp,
address,
}
}
}
Expand All @@ -139,7 +124,6 @@ impl From<runtime::LogItem> for Log {
target: log.target,
fields: serde_json::from_slice(&log.fields).unwrap(),
r#type: LogType::Event,
address: None,
}
}
}
Expand Down Expand Up @@ -230,7 +214,6 @@ where
.unwrap_or_else(|| metadata.target().to_string()),
fields: serde_json::Value::Object(visitor.fields),
r#type: LogType::Event,
address: None,
});
break;
}
Expand Down Expand Up @@ -274,7 +257,6 @@ where
target: metadata.target().to_string(),
fields: Default::default(),
r#type: LogType::State,
address: details.address.clone(),
});

extensions.insert::<ScopeDetails>(details);
Expand All @@ -286,7 +268,6 @@ where
struct ScopeDetails {
id: Uuid,
state: State,
address: Option<String>,
}

impl From<&tracing::Level> for LogLevel {
Expand Down Expand Up @@ -314,9 +295,6 @@ impl NewStateVisitor {
/// Field containing the deployment state identifier
const STATE_IDENT: &'static str = "state";

/// Field containing the deployment address identifier
const ADDRESS_IDENT: &'static str = "address";

fn is_valid(metadata: &Metadata) -> bool {
metadata.is_span()
&& metadata.fields().field(Self::ID_IDENT).is_some()
Expand All @@ -330,8 +308,6 @@ impl Visit for NewStateVisitor {
self.details.state = State::from_str(&format!("{value:?}")).unwrap_or_default();
} else if field.name() == Self::ID_IDENT {
self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default();
} else if field.name() == Self::ADDRESS_IDENT {
self.details.address = Some(format!("{value:?}"));
}
}
}
Expand All @@ -340,17 +316,18 @@ impl Visit for NewStateVisitor {
mod tests {
use std::{
fs::read_dir,
net::SocketAddr,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};

use crate::{persistence::DeploymentUpdater, RuntimeManager};
use axum::body::Bytes;
use ctor::ctor;
use flate2::{write::GzEncoder, Compression};
use shuttle_proto::runtime::runtime_client::RuntimeClient;
use tempdir::TempDir;
use tokio::{select, time::sleep};
use tonic::transport::Channel;
use tracing_subscriber::prelude::*;
use uuid::Uuid;

Expand Down Expand Up @@ -383,15 +360,13 @@ mod tests {
struct StateLog {
id: Uuid,
state: State,
has_address: bool,
}

impl From<Log> for StateLog {
fn from(log: Log) -> Self {
Self {
id: log.id,
state: log.state,
has_address: log.address.is_some(),
}
}
}
Expand Down Expand Up @@ -423,10 +398,12 @@ mod tests {
}
}

async fn get_runtime_client() -> RuntimeClient<Channel> {
RuntimeClient::connect("http://127.0.0.1:6001")
.await
.unwrap()
fn get_runtime_manager() -> Arc<tokio::sync::Mutex<RuntimeManager>> {
let tmp_dir = TempDir::new("shuttle_run_test").unwrap();
let path = tmp_dir.into_path();
let (tx, _rx) = crossbeam_channel::unbounded();

RuntimeManager::new(path, "http://provisioner:8000".to_string(), tx)
}

#[async_trait::async_trait]
Expand All @@ -449,6 +426,22 @@ mod tests {
}
}

#[derive(Clone)]
struct StubDeploymentUpdater;

#[async_trait::async_trait]
impl DeploymentUpdater for StubDeploymentUpdater {
type Err = std::io::Error;

async fn set_address(&self, _id: &Uuid, _address: &SocketAddr) -> Result<(), Self::Err> {
Ok(())
}

async fn set_is_next(&self, _id: &Uuid, _is_next: bool) -> Result<(), Self::Err> {
Ok(())
}
}

#[derive(Clone)]
struct StubActiveDeploymentGetter;

Expand Down Expand Up @@ -527,27 +520,22 @@ mod tests {
StateLog {
id,
state: State::Queued,
has_address: false,
},
StateLog {
id,
state: State::Building,
has_address: false,
},
StateLog {
id,
state: State::Built,
has_address: false,
},
StateLog {
id,
state: State::Loading,
has_address: true,
},
StateLog {
id,
state: State::Running,
has_address: true,
},
]
);
Expand Down Expand Up @@ -577,32 +565,26 @@ mod tests {
StateLog {
id,
state: State::Queued,
has_address: false,
},
StateLog {
id,
state: State::Building,
has_address: false,
},
StateLog {
id,
state: State::Built,
has_address: false,
},
StateLog {
id,
state: State::Loading,
has_address: true,
},
StateLog {
id,
state: State::Running,
has_address: true,
},
StateLog {
id,
state: State::Stopped,
has_address: false,
},
]
);
Expand Down Expand Up @@ -639,32 +621,26 @@ mod tests {
StateLog {
id,
state: State::Queued,
has_address: false,
},
StateLog {
id,
state: State::Building,
has_address: false,
},
StateLog {
id,
state: State::Built,
has_address: false,
},
StateLog {
id,
state: State::Loading,
has_address: true,
},
StateLog {
id,
state: State::Running,
has_address: true,
},
StateLog {
id,
state: State::Completed,
has_address: false,
},
]
);
Expand Down Expand Up @@ -712,32 +688,26 @@ mod tests {
StateLog {
id,
state: State::Queued,
has_address: false,
},
StateLog {
id,
state: State::Building,
has_address: false,
},
StateLog {
id,
state: State::Built,
has_address: false,
},
StateLog {
id,
state: State::Loading,
has_address: true,
},
StateLog {
id,
state: State::Running,
has_address: true,
},
StateLog {
id,
state: State::Crashed,
has_address: false,
},
]
);
Expand Down Expand Up @@ -785,27 +755,22 @@ mod tests {
StateLog {
id,
state: State::Queued,
has_address: false,
},
StateLog {
id,
state: State::Building,
has_address: false,
},
StateLog {
id,
state: State::Built,
has_address: false,
},
StateLog {
id,
state: State::Loading,
has_address: true,
},
StateLog {
id,
state: State::Crashed,
has_address: false,
},
]
);
Expand Down Expand Up @@ -833,6 +798,7 @@ mod tests {
service_name: "run-test".to_string(),
service_id: Uuid::new_v4(),
tracing_context: Default::default(),
is_next: false,
})
.await;

Expand All @@ -854,17 +820,14 @@ mod tests {
StateLog {
id,
state: State::Built,
has_address: false,
},
StateLog {
id,
state: State::Loading,
has_address: true,
},
StateLog {
id,
state: State::Crashed,
has_address: false,
},
]
);
Expand Down Expand Up @@ -905,7 +868,8 @@ mod tests {
.active_deployment_getter(StubActiveDeploymentGetter)
.artifacts_path(PathBuf::from("/tmp"))
.secret_getter(StubSecretGetter)
.runtime(get_runtime_client().await)
.runtime(get_runtime_manager())
.deployment_updater(StubDeploymentUpdater)
.queue_client(StubBuildQueueClient)
.build()
}
Expand Down
Loading

0 comments on commit cc072b2

Please sign in to comment.