Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: link the tracing spans between services #445

93 changes: 68 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,22 @@ STACK=shuttle-prod
APPS_FQDN=shuttleapp.rs
DB_FQDN=db.shuttle.rs
CONTAINER_REGISTRY=public.ecr.aws/shuttle
DD_ENV=production
else
DOCKER_COMPOSE_FILES=-f docker-compose.yml -f docker-compose.dev.yml
STACK=shuttle-dev
APPS_FQDN=unstable.shuttleapp.rs
DB_FQDN=db.unstable.shuttle.rs
CONTAINER_REGISTRY=public.ecr.aws/shuttle-dev
DD_ENV=unstable
endif

POSTGRES_EXTRA_PATH?=./extras/postgres
POSTGRES_TAG?=14

RUST_LOG?=debug

DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=${POSTGRES_TAG} APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD)
DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=${POSTGRES_TAG} APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD) DD_ENV=$(DD_ENV)

.PHONY: images clean src up down deploy shuttle-% postgres docker-compose.rendered.yml test bump-% deploy-examples publish publish-% --validate-version

Expand Down
7 changes: 4 additions & 3 deletions deployer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ hyper = { version = "0.14.20", features = ["client", "http1", "http2", "tcp" ] }
# not great, but waiting for WebSocket changes to be merged
hyper-reverse-proxy = { git = "https://github.com/chesedo/hyper-reverse-proxy", branch = "master" }
once_cell = "1.14.0"
opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
opentelemetry-datadog = { version = "0.5.0", features = ["reqwest-client"] }
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
opentelemetry-datadog = { version = "0.6.0", features = ["reqwest-client"] }
opentelemetry-http = "0.7.0"
pipe = "0.4.0"
portpicker = "0.1.1"
serde = "1.0.137"
Expand All @@ -37,7 +38,7 @@ tonic = "0.8.0"
tower = { version = "0.4.12", features = ["make"] }
tower-http = { version = "0.3.4", features = ["auth", "trace"] }
tracing = "0.1.35"
tracing-opentelemetry = "0.17.4"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
uuid = { version = "1.1.2", features = ["v4"] }

Expand Down
3 changes: 3 additions & 0 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ mod tests {
id,
service_name: "run-test".to_string(),
service_id: Uuid::new_v4(),
tracing_context: Default::default(),
})
.await;

Expand Down Expand Up @@ -945,6 +946,7 @@ mod tests {
service_id: Uuid::new_v4(),
data: Bytes::from("violets are red").to_vec(),
will_run_tests: false,
tracing_context: Default::default(),
})
.await;

Expand Down Expand Up @@ -988,6 +990,7 @@ mod tests {
service_id: Uuid::new_v4(),
data: bytes,
will_run_tests: false,
tracing_context: Default::default(),
}
}
}
11 changes: 9 additions & 2 deletions deployer/src/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use std::path::PathBuf;

pub use queue::Queued;
pub use run::{ActiveDeploymentsGetter, Built};
use tracing::instrument;
use tracing::{instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::persistence::{SecretRecorder, State};
use tokio::sync::{broadcast, mpsc};
Expand Down Expand Up @@ -54,7 +55,13 @@ impl DeploymentManager {
}

#[instrument(skip(self), fields(id = %queued.id, state = %State::Queued))]
pub async fn queue_push(&self, queued: Queued) {
pub async fn queue_push(&self, mut queued: Queued) {
let cx = Span::current().context();

opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut queued.tracing_context);
});

self.pipeline.queue_send.send(queued).await.unwrap();
}

Expand Down
46 changes: 33 additions & 13 deletions deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use crate::persistence::{LogLevel, SecretRecorder};
use cargo_metadata::Message;
use chrono::Utc;
use crossbeam_channel::Sender;
use opentelemetry::global;
use serde_json::json;
use shuttle_service::loader::{build_crate, get_config};
use tracing::{debug, error, info, instrument, trace};
use tracing::{debug, debug_span, error, info, instrument, trace, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::fs::remove_file;
use std::io::Read;
Expand Down Expand Up @@ -58,27 +60,43 @@ pub async fn task(
let libs_path = libs_path.clone();

tokio::spawn(async move {
match queued
.handle(builds_path, libs_path, log_recorder, secret_recorder)
.await
{
Ok(built) => promote_to_run(built, run_send_cloned).await,
Err(err) => build_failed(&id, err),
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&queued.tracing_context)
});
let span = debug_span!("builder");
span.set_parent(parent_cx);

async move {
match queued
.handle(builds_path, libs_path, log_recorder, secret_recorder)
.await
{
Ok(built) => promote_to_run(built, run_send_cloned).await,
Err(err) => build_failed(&id, err),
}
}
.instrument(span)
.await
});
}
}

#[instrument(fields(id = %_id, state = %State::Crashed))]
fn build_failed(_id: &Uuid, err: impl std::error::Error + 'static) {
#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
fn build_failed(_id: &Uuid, error: impl std::error::Error + 'static) {
error!(
error = &err as &dyn std::error::Error,
error = &error as &dyn std::error::Error,
"service build encountered an error"
);
}

#[instrument(fields(id = %built.id, state = %State::Built))]
async fn promote_to_run(built: Built, run_send: RunSender) {
async fn promote_to_run(mut built: Built, run_send: RunSender) {
let cx = Span::current().context();

opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut built.tracing_context);
});

if let Err(err) = run_send.send(built.clone()).await {
build_failed(&built.id, err);
}
Expand All @@ -90,10 +108,11 @@ pub struct Queued {
pub service_id: Uuid,
pub data: Vec<u8>,
pub will_run_tests: bool,
pub tracing_context: HashMap<String, String>,
}

impl Queued {
#[instrument(name = "queued_handle", skip(self, builds_path, libs_path, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))]
#[instrument(skip(self, builds_path, libs_path, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))]
async fn handle(
self,
builds_path: PathBuf,
Expand Down Expand Up @@ -169,6 +188,7 @@ impl Queued {
id: self.id,
service_name: self.service_name,
service_id: self.service_id,
tracing_context: Default::default(),
};

Ok(built)
Expand Down
Loading