Skip to content

Commit

Permalink
switch server from log to tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
djugei committed May 16, 2024
1 parent 085fc66 commit 0b87cef
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 26 deletions.
2 changes: 1 addition & 1 deletion async_file_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
tokio = { version = "*", features = ["sync", "fs", "io-util", "macros"] }
log = "*"
tracing = "*"

[dev-dependencies]
tempfile = "*"
Expand Down
2 changes: 1 addition & 1 deletion async_file_cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::future::Future;
use log::{debug, trace};
use std::ffi::OsString;
use std::{collections::HashMap, hash::Hash, io::ErrorKind, path::PathBuf, pin::pin};
use tracing::{debug, trace};

use tokio::io::AsyncSeekExt;
use tokio::{fs::File, sync::Mutex, sync::Semaphore};
Expand Down
25 changes: 15 additions & 10 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = { version = "*", features = [] }
hyper = { version = "*", features = ["client", "http1", "stream", "runtime"] }
tokio = { version = "*", features = ["rt", "rt-multi-thread", "sync", "fs", "io-util", "macros"] }
thiserror = "*"
anyhow = "*"

tracing = "*"
console-subscriber = "*"

tokio = { version = "*", features = ["rt", "rt-multi-thread", "sync", "fs", "io-util", "macros", "tracing"] }
tokio-util = { version = "*", features = ["io"] }
futures-util = "*"
log = "*"
env_logger = "*"
ddelta = "*"
zstd = { version = "*", features = ["zstdmt"] }

axum = { version = "*", features = [] }
hyper = { version = "*", features = ["client", "http1", "stream", "runtime"] }
reqwest = { version = "*", default-features = false, features = ["rustls-tls"]}

ddelta = { path = "../../ddelta-rs"}
zstd = { version = "*", features = ["zstdmt"] }
strsim = "*"

parsing = {path = "../parsing"}
thiserror = "*"
anyhow = "*"
async_file_cache = {path = "../async_file_cache"}
strsim = "*"
29 changes: 15 additions & 14 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use axum::{
Router,
};
use core::future::Future;
use log::{debug, error, info};
use std::{panic, path::PathBuf, sync::Arc};
use thiserror::Error;
use tokio::fs::File;
use tracing::{debug, error, info, Instrument};

use async_file_cache::FileCache;

Expand All @@ -25,7 +25,7 @@ type Str = Box<str>;
use reqwest::Client;

fn main() {
env_logger::init();
console_subscriber::init();

let mut path = PathBuf::from(LOCAL);
path.push("pkg");
Expand All @@ -41,20 +41,18 @@ fn main() {
path.push(p.to_string());
path
};
#[tracing::instrument(level = "info", skip(client, file), "Downloading")]
async fn inner_f(client: Client, key: Package, mut file: File) -> Result<File, DownloadError> {
use tokio::io::AsyncWriteExt;

let mut uri = String::new();
uri.push_str(MIRROR);
uri.push_str(&key.to_string());
let uri = format!("{MIRROR}{key}");
debug!(key = key.to_string(), uri, "getting from primary");
let mut response = client.get(uri).send().await?;

if response.status() == reqwest::StatusCode::NOT_FOUND {
// fall back to archive mirror
info!("using fallback mirror for {key}");
let mut uri = String::new();
uri.push_str(FALLBACK_MIRROR);
uri.push_str(&key.to_string());
info!(key = key.to_string(), "using fallback mirror");
let uri = format!("{FALLBACK_MIRROR}{key}");
response = client.get(uri).send().await?;
}
if !response.status().is_success() {
Expand Down Expand Up @@ -91,6 +89,7 @@ fn main() {
F: Send + Fn(S, Package, File) -> FF,
FF: Future<Output = Result<File, DownloadError>> + Send,
{
let keystring = key.to_string();
let old = state.get_or_generate(key.clone().get_old());
let new = state.get_or_generate(key.get_new());
let (old, new) = tokio::join!(old, new);
Expand All @@ -101,6 +100,7 @@ fn main() {
let mut old = zstd::Decoder::new(old)?;
let new = new.into_std().await;
let mut new = zstd::Decoder::new(new)?;
let span = tracing::info_span!("delta request", key = keystring);

let f: tokio::task::JoinHandle<Result<_, DeltaError>> = tokio::task::spawn_blocking(move || {
let mut zpatch = zstd::Encoder::new(patch, 22)?;
Expand All @@ -110,19 +110,19 @@ fn main() {
}
let mut last_report = 0;
ddelta::generate_chunked(&mut old, &mut new, &mut zpatch, None, |s| match s {
ddelta::State::Reading => debug!("reading"),
ddelta::State::Sorting => debug!("sorting"),
ddelta::State::Reading => debug!(key = keystring, "reading"),
ddelta::State::Sorting => debug!(key = keystring, "sorting"),
ddelta::State::Working(p) => {
const MB: u64 = 1024 * 1024;
if p > last_report + (8 * MB) {
debug!("working: {}MB done", p / MB);
debug!(key = keystring, "working: {}MB done", p / MB);
last_report = p;
}
}
})?;
Ok(zpatch.finish()?)
});
let f = f.await.expect("threading error")?;
let f = f.instrument(span).await.expect("threading error")?;

let f = File::from_std(f);

Expand Down Expand Up @@ -182,6 +182,7 @@ where
F: Send + Sync + 'static + Fn(S, Delta, File) -> FF,
FF: Send + 'static + Future<Output = Result<File, DeltaError>>,
{
let c_span = tracing::info_span!("delta request", from, to);
let c = || async {
let from = Package::try_from(&*from)?;
let to = Package::try_from(&*to)?;
Expand Down Expand Up @@ -217,7 +218,7 @@ where
];
anyhow::Ok((StatusCode::OK, headers, body))
};
c().await.map_err(|e| {
c().instrument(c_span).await.map_err(|e| {
error!("{:?}", e);
(
axum::http::status::StatusCode::INTERNAL_SERVER_ERROR,
Expand Down

0 comments on commit 0b87cef

Please sign in to comment.