Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ fn main() {
let y = x.build();

let git_info = y.version_control.map(|v| v.git().cloned());
println!("Git information: {:?}", git_info);
println!("Git information: {git_info:?}");
}
42 changes: 23 additions & 19 deletions engine/src/assets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ impl From<String> for AssetFile {

impl AssetFile {
#[tracing::instrument(name = "from_buffer", skip(state, buffer))]
pub async fn from_buffer(state: &State, buffer: &[u8], path: impl AsRef<str> + Debug) -> Result<(Self, NewlyCreatedFile, String, String, i64), sqlx::Error> {
let file_hash = hash_file(&buffer);
let content_type = infer::get(&buffer)
.map(|t| t.mime_type().to_string()).unwrap_or_else(|| {
content_type_from_file_name(path.as_ref())
});
pub async fn from_buffer(
state: &State,
buffer: &[u8],
path: impl AsRef<str> + Debug,
) -> Result<(Self, NewlyCreatedFile, String, String, i64), sqlx::Error> {
let file_hash = hash_file(buffer);
let content_type = infer::get(buffer).map_or_else(
|| content_type_from_file_name(path.as_ref()),
|t| t.mime_type().to_string(),
);

let file_size = buffer.len() as i64;

let newly_created_file = query_as!(
NewlyCreatedFile,
r#"
NewlyCreatedFile,
r#"
WITH ins AS (
INSERT INTO files (file_hash, file_size)
VALUES ($1, $2)
Expand All @@ -47,16 +51,16 @@ impl AssetFile {
WHERE file_hash = $1
LIMIT 1;
"#,
&file_hash,
file_size
)
.fetch_one(&state.database.pool)
.await?;
&file_hash,
file_size
)
.fetch_one(&state.database.pool)
.await?;

tracing::info!("File: {:?}", newly_created_file);

let file = AssetFile {
path: file_hash.to_string()
path: file_hash.to_string(),
};

Ok((file, newly_created_file, file_hash, content_type, file_size))
Expand All @@ -68,11 +72,11 @@ fn hash_file(file: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(file);
let hash = hasher.finalize();
format!("{:x}", hash)
format!("{hash:x}")
}

fn content_type_from_file_name(file_name: &str) -> String {
let extension = file_name.split('.').last().unwrap_or_default();
let extension = file_name.split('.').next_back().unwrap_or_default();

info!("Content type from file name: {:?}", extension);

Expand All @@ -90,10 +94,10 @@ fn content_type_from_file_name(file_name: &str) -> String {
"woff" => "font/woff".to_string(),
"woff2" => "font/woff2".to_string(),
"ttf" => "font/ttf".to_string(),
"otf" => "font/otf".to_string(),
"otf" => "font/otf".to_string(),
_ => {
info!("Unknown file extension: {:?}", extension);
"application/octet-stream".to_string()
},
}
}
}
19 changes: 12 additions & 7 deletions engine/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::time::Duration;
use crate::models::deployment::{Deployment, DeploymentFileEntry};
use bytes::Bytes;
use serde_json::Value;
use crate::models::deployment::{Deployment, DeploymentFileEntry};
use std::time::Duration;

#[derive(Debug)]
pub struct Cache {
/// Raw JSON-based cache used elsewhere in the app.
pub raw: moka::future::Cache<String, Value>,
/// Cache domain -> Deployment
pub domain: moka::future::Cache<String, Option<Deployment>>,
/// Cache deployment_id:path -> DeploymentFileEntry
/// Cache `deployment_id:path` -> `DeploymentFileEntry`
pub file_entry: moka::future::Cache<String, Option<DeploymentFileEntry>>,
/// Cache file_hash -> Bytes for small files
/// Cache `file_hash` -> Bytes for small files
pub file_bytes: moka::future::Cache<String, Bytes>,
}

Expand All @@ -33,13 +33,18 @@ impl Default for Cache {
.max_capacity(1000)
.time_to_live(Duration::from_secs(60 * 60))
.build();
Self { raw, domain, file_entry, file_bytes }
Self {
raw,
domain,
file_entry,
file_bytes,
}
}
}

impl Cache {
/// Invalidate cached deployment for this domain.
pub fn bump_domain(&self, domain: &str) {
self.domain.invalidate(domain);
pub async fn bump_domain(&self, domain: &str) {
self.domain.invalidate(domain).await;
}
}
18 changes: 12 additions & 6 deletions engine/src/handlers/car/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use async_std::stream::StreamExt;
use lapin::{
options::{BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, QueueDeclareOptions},
options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
QueueDeclareOptions,
},
publisher_confirm::Confirmation,
types::FieldTable,
BasicProperties, Channel, Connection,
};
use serde::{Deserialize, Serialize};
use tracing::info;

use crate::{models::deployment::Deployment, state::{AMQPConfig, AppState}};
use crate::{
models::deployment::Deployment,
state::{AMQPConfig, AppState},
};

#[derive(Debug, Serialize, Deserialize)]
pub struct CarRequest {
Expand Down Expand Up @@ -37,7 +43,7 @@ pub struct CarHandler {
}

impl CarHandler {
pub async fn init(config: &AMQPConfig, connection: &Connection) -> Option<CarHandler> {
pub async fn init(config: &AMQPConfig, connection: &Connection) -> Option<Self> {
let channel = connection.create_channel().await.unwrap();

let generation_key = config.car_queue.as_deref().unwrap_or("car").to_string();
Expand Down Expand Up @@ -72,7 +78,7 @@ impl CarHandler {
.await
.unwrap();

Some(CarHandler {
Some(Self {
channel,
generation_key,
result_key,
Expand Down Expand Up @@ -106,7 +112,7 @@ impl CarHandler {
if let Some(ipfs_cid) = payload.cid {
Deployment::update_ipfs_cid(&state.database, &payload.deployment_id, &ipfs_cid)
.await
.ok();
.ok();
}

// if let Some(file_path) = payload.file_path {
Expand All @@ -117,7 +123,7 @@ impl CarHandler {
// // download from ipfs

// // POST /add?local=true&format=car
// // form-data, file: deploy.car
// // form-data, file: deploy.car
// }

delivery.ack(BasicAckOptions::default()).await.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions engine/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ pub struct TaskRabbit {
}

impl TaskRabbit {
pub async fn init(config: &AMQPConfig) -> TaskRabbit {
pub async fn init(config: &AMQPConfig) -> Self {
let connection = Connection::connect(config.addr.as_str(), ConnectionProperties::default())
.await
.unwrap();

info!("Connected to RabbitMQ");

let previews = PreviewHandler::init(&config, &connection).await;
let previews = PreviewHandler::init(config, &connection).await;

let car = CarHandler::init(&config, &connection).await;
let car = CarHandler::init(config, &connection).await;

TaskRabbit {
Self {
connection,
previews,
car,
Expand Down
11 changes: 7 additions & 4 deletions engine/src/handlers/preview/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use lapin::{options::{BasicPublishOptions, QueueDeclareOptions}, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Channel, Connection};
use lapin::{
options::{BasicPublishOptions, QueueDeclareOptions},
publisher_confirm::Confirmation,
types::FieldTable,
BasicProperties, Channel, Connection,
};
use serde::{Deserialize, Serialize};
use tracing::info;

Expand Down Expand Up @@ -39,11 +44,9 @@ impl PreviewHandler {
.await
.unwrap();

Some(PreviewHandler { channel, queue_key })
Some(Self { channel, queue_key })
}

///

pub async fn queue_bunshot(&self, site_id: &str, deployment_id: &str, domain: &str) {
let payload = serde_json::to_string(&BunshotPayload {
site_id: site_id.to_string(),
Expand Down
8 changes: 6 additions & 2 deletions engine/src/ipfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ pub struct IPFSModule {
}

impl IPFSModule {
pub fn new(cluster_url: String, public_cluster_url: String) -> Self {
Self { cluster_url, public_cluster_url }
#[must_use]
pub const fn new(cluster_url: String, public_cluster_url: String) -> Self {
Self {
cluster_url,
public_cluster_url,
}
}
}
16 changes: 7 additions & 9 deletions engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use tracing::{error, info};
pub mod assets;
pub mod cache;
pub mod database;
pub mod handlers;
pub mod ipfs;
pub mod middlewares;
pub mod models;
pub mod routes;
pub mod server;
pub mod state;
pub mod storage;
pub mod utils;
pub mod handlers;
pub mod ipfs;
pub mod server;

use tracing_subscriber::prelude::*;

Expand All @@ -33,7 +33,7 @@ async fn main() {

if let Some(endpoint) = otlp_endpoint {
info!("Starting Edgerouter with OTLP tracing");

// Set up propagator for trace context
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

Expand Down Expand Up @@ -61,25 +61,23 @@ async fn main() {
let tracer = trace_provider.tracer("edgeserver");

// Simple telemetry layer
let telemetry_layer = tracing_opentelemetry::layer()
.with_tracer(tracer);
let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);

// Create a formatting layer with span closure events
let fmt_layer = tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);

// Set up filter for relevant components
let filter = tracing_subscriber::EnvFilter::from_default_env()
.add_directive("poem=info".parse().unwrap())
.add_directive("edgeserver=debug".parse().unwrap());

// Register layers with the subscriber
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.with(telemetry_layer)
.init();

} else {
info!("Starting Edgerouter without OTLP tracing, provide OTLP_ENDPOINT to enable tracing");
tracing_subscriber::fmt::init();
Expand Down
Loading