Skip to content

Commit

Permalink
feat(gateway,deployer): add more tracing events (#500)
Browse files Browse the repository at this point in the history
* feat(deployer): add more tracing events

* feat(gateway): add more tracing events
  • Loading branch information
akrantz01 authored Dec 13, 2022
1 parent 489b925 commit 8387138
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 26 deletions.
27 changes: 18 additions & 9 deletions deployer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use shuttle_common::project::ProjectName;
use shuttle_common::LogItem;
use tower_http::auth::RequireAuthorizationLayer;
use tower_http::trace::TraceLayer;
use tracing::{debug, debug_span, error, field, trace, Span};
use tracing::{debug, debug_span, error, field, instrument, trace, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;

Expand Down Expand Up @@ -121,6 +121,7 @@ pub fn make_router(
.layer(Extension(project_name))
}

#[instrument(skip_all)]
async fn list_services(
Extension(persistence): Extension<Persistence>,
) -> Result<Json<Vec<shuttle_common::models::service::Response>>> {
Expand All @@ -134,9 +135,10 @@ async fn list_services(
Ok(Json(services))
}

#[instrument(skip(persistence))]
async fn get_service(
Extension(persistence): Extension<Persistence>,
Path((_project_name, service_name)): Path<(String, String)>,
Path((project_name, service_name)): Path<(String, String)>,
) -> Result<Json<shuttle_common::models::service::Detailed>> {
if let Some(service) = persistence.get_service_by_name(&service_name).await? {
let deployments = persistence
Expand Down Expand Up @@ -171,10 +173,11 @@ async fn get_service(
}
}

#[instrument(skip_all, fields(%project_name, %service_name))]
async fn get_service_summary(
Extension(persistence): Extension<Persistence>,
Extension(proxy_fqdn): Extension<FQDN>,
Path((_, service_name)): Path<(String, String)>,
Path((project_name, service_name)): Path<(String, String)>,
) -> Result<Json<shuttle_common::models::service::Summary>> {
if let Some(service) = persistence.get_service_by_name(&service_name).await? {
let deployment = persistence
Expand All @@ -201,10 +204,11 @@ async fn get_service_summary(
}
}

#[instrument(skip_all, fields(%project_name, %service_name))]
async fn post_service(
Extension(persistence): Extension<Persistence>,
Extension(deployment_manager): Extension<DeploymentManager>,
Path((_project_name, service_name)): Path<(String, String)>,
Path((project_name, service_name)): Path<(String, String)>,
Query(params): Query<HashMap<String, String>>,
mut stream: BodyStream,
) -> Result<Json<shuttle_common::models::deployment::Response>> {
Expand Down Expand Up @@ -243,10 +247,11 @@ async fn post_service(
Ok(Json(deployment.into()))
}

#[instrument(skip_all, fields(%project_name, %service_name))]
async fn delete_service(
Extension(persistence): Extension<Persistence>,
Extension(deployment_manager): Extension<DeploymentManager>,
Path((_project_name, service_name)): Path<(String, String)>,
Path((project_name, service_name)): Path<(String, String)>,
) -> Result<Json<shuttle_common::models::service::Detailed>> {
if let Some(service) = persistence.get_service_by_name(&service_name).await? {
let old_deployments = persistence
Expand Down Expand Up @@ -285,9 +290,10 @@ async fn delete_service(
}
}

#[instrument(skip_all, fields(%project_name, %deployment_id))]
async fn get_deployment(
Extension(persistence): Extension<Persistence>,
Path((_project_name, deployment_id)): Path<(String, Uuid)>,
Path((project_name, deployment_id)): Path<(String, Uuid)>,
) -> Result<Json<shuttle_common::models::deployment::Response>> {
if let Some(deployment) = persistence.get_deployment(&deployment_id).await? {
Ok(Json(deployment.into()))
Expand All @@ -296,10 +302,11 @@ async fn get_deployment(
}
}

#[instrument(skip_all, fields(%project_name, %deployment_id))]
async fn delete_deployment(
Extension(deployment_manager): Extension<DeploymentManager>,
Extension(persistence): Extension<Persistence>,
Path((_project_name, deployment_id)): Path<(String, Uuid)>,
Path((project_name, deployment_id)): Path<(String, Uuid)>,
) -> Result<Json<shuttle_common::models::deployment::Response>> {
if let Some(deployment) = persistence.get_deployment(&deployment_id).await? {
deployment_manager.kill(deployment.id).await;
Expand All @@ -310,9 +317,10 @@ async fn delete_deployment(
}
}

#[instrument(skip_all, fields(%project_name, %deployment_id))]
async fn get_logs(
Extension(persistence): Extension<Persistence>,
Path((_project_name, deployment_id)): Path<(String, Uuid)>,
Path((project_name, deployment_id)): Path<(String, Uuid)>,
) -> Result<Json<Vec<LogItem>>> {
if let Some(deployment) = persistence.get_deployment(&deployment_id).await? {
Ok(Json(
Expand Down Expand Up @@ -389,9 +397,10 @@ async fn logs_websocket_handler(mut s: WebSocket, persistence: Persistence, id:
let _ = s.close().await;
}

#[instrument(skip_all, fields(%project_name, %service_name))]
async fn get_secrets(
Extension(persistence): Extension<Persistence>,
Path((_project_name, service_name)): Path<(String, String)>,
Path((project_name, service_name)): Path<(String, String)>,
) -> Result<Json<Vec<secret::Response>>> {
if let Some(service) = persistence.get_service_by_name(&service_name).await? {
let keys = persistence
Expand Down
6 changes: 4 additions & 2 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ pub async fn start(
args.artifacts_path,
);

for existing_deployment in persistence.get_all_runnable_deployments().await.unwrap() {
let runnable_deployments = persistence.get_all_runnable_deployments().await.unwrap();
info!(count = %runnable_deployments.len(), "enqueuing runnable deployments");
for existing_deployment in runnable_deployments {
let built = Built {
id: existing_deployment.id,
service_name: existing_deployment.service_name,
Expand All @@ -56,7 +58,7 @@ pub async fn start(
);
let make_service = router.into_make_service();

info!("Binding to and listening at address: {}", args.api_address);
info!(address=%args.api_address, "Binding to and listening at address");

axum::Server::bind(&args.api_address)
.serve(make_service)
Expand Down
11 changes: 10 additions & 1 deletion gateway/src/api/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use shuttle_common::models::error::ErrorKind;
use shuttle_common::models::{project, user};
use tokio::sync::mpsc::Sender;
use tower_http::trace::TraceLayer;
use tracing::{debug, debug_span, field, Span};
use tracing::{debug, debug_span, field, instrument, Span};

use crate::acme::{AcmeClient, CustomDomain};
use crate::auth::{Admin, ScopedUser, User};
Expand Down Expand Up @@ -65,6 +65,7 @@ impl StatusResponse {
}
}

#[instrument(skip_all, fields(%account_name))]
async fn get_user(
State(RouterState { service, .. }): State<RouterState>,
Path(account_name): Path<AccountName>,
Expand All @@ -75,6 +76,7 @@ async fn get_user(
Ok(AxumJson(user.into()))
}

#[instrument(skip_all, fields(%account_name))]
async fn post_user(
State(RouterState { service, .. }): State<RouterState>,
Path(account_name): Path<AccountName>,
Expand All @@ -85,6 +87,7 @@ async fn post_user(
Ok(AxumJson(user.into()))
}

#[instrument(skip(service))]
async fn get_project(
State(RouterState { service, .. }): State<RouterState>,
ScopedUser { scope, .. }: ScopedUser,
Expand All @@ -98,6 +101,7 @@ async fn get_project(
Ok(AxumJson(response))
}

#[instrument(skip_all, fields(%project))]
async fn post_project(
State(RouterState { service, sender }): State<RouterState>,
User { name, .. }: User,
Expand All @@ -121,6 +125,7 @@ async fn post_project(
Ok(AxumJson(response))
}

#[instrument(skip_all, fields(%project))]
async fn delete_project(
State(RouterState { service, sender }): State<RouterState>,
ScopedUser { scope: project, .. }: ScopedUser,
Expand Down Expand Up @@ -149,6 +154,7 @@ async fn delete_project(
Ok(AxumJson(response))
}

#[instrument(skip_all, fields(scope = %scoped_user.scope))]
async fn route_project(
State(RouterState { service, .. }): State<RouterState>,
scoped_user: ScopedUser,
Expand Down Expand Up @@ -176,6 +182,7 @@ async fn get_status(State(RouterState { sender, .. }): State<RouterState>) -> Re
.unwrap()
}

#[instrument(skip_all)]
async fn revive_projects(
_: Admin,
State(RouterState { service, sender }): State<RouterState>,
Expand All @@ -185,6 +192,7 @@ async fn revive_projects(
.map_err(|_| Error::from_kind(ErrorKind::Internal))
}

#[instrument(skip_all, fields(%email, ?acme_server))]
async fn create_acme_account(
_: Admin,
Extension(acme_client): Extension<AcmeClient>,
Expand All @@ -196,6 +204,7 @@ async fn create_acme_account(
Ok(AxumJson(res))
}

#[instrument(skip_all, fields(%project_name, %fqdn))]
async fn request_acme_certificate(
_: Admin,
State(RouterState { service, sender }): State<RouterState>,
Expand Down
32 changes: 19 additions & 13 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::io::{self, Cursor};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info, info_span, trace, warn};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main(flavor = "multi_thread")]
Expand Down Expand Up @@ -122,20 +122,26 @@ async fn start(db: SqlitePool, fs: PathBuf, args: StartArgs) -> io::Result<()> {
"running health checks",
healthcheck.num_projects = projects.len()
);
let _ = span.enter();
for (project_name, _) in projects {
if let Ok(handle) = gateway
.new_task()
.project(project_name)
.and_then(task::check_health())
.send(&sender)
.await
{
// we wait for the check to be done before
// queuing up the next one
handle.await

let gateway = gateway.clone();
let sender = sender.clone();
async move {
for (project_name, _) in projects {
if let Ok(handle) = gateway
.new_task()
.project(project_name)
.and_then(task::check_health())
.send(&sender)
.await
{
// we wait for the check to be done before
// queuing up the next one
handle.await
}
}
}
.instrument(span)
.await;
}
}
}
Expand Down
12 changes: 11 additions & 1 deletion gateway/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use once_cell::sync::Lazy;
use rand::distributions::{Alphanumeric, DistString};
use serde::{Deserialize, Serialize};
use tokio::time::{self, timeout};
use tracing::{debug, error};
use tracing::{debug, error, instrument};

use crate::{
ContainerSettings, DockerContext, EndState, Error, ErrorKind, IntoTryState, ProjectName,
Expand Down Expand Up @@ -276,6 +276,7 @@ where
type Next = Self;
type Error = Infallible;

#[instrument(skip_all, fields(state = %self.state()))]
async fn next(self, ctx: &Ctx) -> Result<Self::Next, Self::Error> {
let previous = self.clone();
let previous_state = previous.state();
Expand Down Expand Up @@ -557,6 +558,7 @@ where
type Next = ProjectStarting;
type Error = ProjectError;

#[instrument(skip_all)]
async fn next(self, ctx: &Ctx) -> Result<Self::Next, Self::Error> {
let container_name = self.container_name(ctx);
let container = ctx
Expand Down Expand Up @@ -593,6 +595,7 @@ where
type Next = ProjectStarted;
type Error = ProjectError;

#[instrument(skip_all)]
async fn next(self, ctx: &Ctx) -> Result<Self::Next, Self::Error> {
let container_id = self.container.id.as_ref().unwrap();
ctx.docker()
Expand Down Expand Up @@ -642,6 +645,7 @@ where
type Next = ProjectReadying;
type Error = ProjectError;

#[instrument(skip_all)]
async fn next(self, ctx: &Ctx) -> Result<Self::Next, Self::Error> {
time::sleep(Duration::from_secs(1)).await;

Expand Down Expand Up @@ -688,6 +692,7 @@ where
type Next = Self;
type Error = ProjectError;

#[instrument(skip_all)]
async fn next(mut self, _ctx: &Ctx) -> Result<Self::Next, Self::Error> {
Ok(self)
}
Expand Down Expand Up @@ -781,6 +786,7 @@ where

type Error = ProjectError;

#[instrument(skip_all)]
async fn next(self, ctx: &Ctx) -> Result<Self::Next, Self::Error> {
let Self { container } = self;
ctx.docker()
Expand Down Expand Up @@ -808,6 +814,7 @@ where
type Next = ProjectStarting;
type Error = ProjectError;

#[instrument(skip_all)]
async fn next(self, ctx: &Ctx) -> Result<Self::Next, Self::Error> {
let container = self.container;

Expand Down Expand Up @@ -860,6 +867,7 @@ where
type Next = ProjectDestroyed;
type Error = ProjectError;

#[instrument(skip_all)]
async fn next(self, ctx: &Ctx) -> Result<Self::Next, Self::Error> {
let container_id = self.container.id.as_ref().unwrap();
ctx.docker()
Expand Down Expand Up @@ -895,6 +903,7 @@ where
type Next = ProjectDestroyed;
type Error = ProjectError;

#[instrument(skip_all)]
async fn next(self, _ctx: &Ctx) -> Result<Self::Next, Self::Error> {
Ok(self)
}
Expand Down Expand Up @@ -980,6 +989,7 @@ where
type Next = Self;
type Error = Infallible;

#[instrument(skip_all)]
async fn next(self, _ctx: &Ctx) -> Result<Self::Next, Self::Error> {
Ok(self)
}
Expand Down

0 comments on commit 8387138

Please sign in to comment.