Skip to content

Commit

Permalink
split more ee log logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Nov 13, 2024
1 parent 4a39e45 commit ce01fa1
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 192 deletions.
2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5938b3f66b070ba3578aeeda89e6cc17c4cbda2d
053cf0a539c64efbe8eee0e2a330b72114a43193
198 changes: 8 additions & 190 deletions backend/windmill-worker/src/job_logger.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use itertools::Itertools;
use swc_ecma_parser::lexer::util::CharExt;

#[cfg(all(feature = "enterprise", feature = "parquet"))]
use object_store::path::Path;
use regex::Regex;

#[cfg(all(feature = "enterprise", feature = "parquet"))]
use windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS;

use windmill_common::error::{self};
use windmill_common::worker::{CLOUD_HOSTED, TMP_DIR};
use windmill_common::worker::CLOUD_HOSTED;

use windmill_queue::append_logs;

Expand All @@ -19,6 +10,12 @@ use std::sync::Arc;
use uuid::Uuid;
use windmill_common::DB;

use crate::job_logger_ee::default_disk_log_storage;

#[cfg(all(feature = "enterprise", feature = "parquet"))]
use crate::job_logger_ee::s3_storage;


pub enum CompactLogs {
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
NotEE,
Expand All @@ -28,150 +25,7 @@ pub enum CompactLogs {
S3,
}

async fn compact_logs(
job_id: Uuid,
w_id: &str,
db: &DB,
nlogs: String,
total_size: Arc<AtomicU32>,
compact_kind: CompactLogs,
_worker_name: &str,
) -> error::Result<(String, String)> {
let mut prev_logs = sqlx::query_scalar!(
"SELECT logs FROM job_logs WHERE job_id = $1 AND workspace_id = $2",
job_id,
w_id
)
.fetch_optional(db)
.await?
.flatten()
.unwrap_or_default();
let size = prev_logs.char_indices().count() as i32;
let nlogs_len = nlogs.char_indices().count();
let to_keep_in_db = usize::max(
usize::min(nlogs_len, 3000),
nlogs_len % LARGE_LOG_THRESHOLD_SIZE,
);
let extra_split = to_keep_in_db < nlogs_len;
let stored_in_storage_len = if extra_split {
nlogs_len - to_keep_in_db
} else {
0
};
let extra_to_newline = nlogs
.chars()
.skip(stored_in_storage_len)
.find_position(|x| x.is_line_break())
.map(|(i, _)| i)
.unwrap_or(to_keep_in_db);
let stored_in_storage_to_newline = stored_in_storage_len + extra_to_newline;

let (append_to_storage, stored_in_db) = if extra_split {
if stored_in_storage_to_newline == nlogs.len() {
(nlogs.as_ref(), "".to_string())
} else {
let split_idx = nlogs
.char_indices()
.nth(stored_in_storage_to_newline)
.map(|(i, _)| i)
.unwrap_or(0);
let (append_to_storage, stored_in_db) = nlogs.split_at(split_idx);
// tracing::error!("{append_to_storage} ||||| {stored_in_db}");
// tracing::error!(
// "{:?} {:?} {} {}",
// excess_prev_logs.lines().last(),
// current_logs.lines().next(),
// split_idx,
// excess_size_modulo
// );
(append_to_storage, stored_in_db.to_string())
}
} else {
// tracing::error!("{:?}", nlogs.lines().last());
("", nlogs.to_string())
};

let new_size_with_excess = size + stored_in_storage_to_newline as i32;

let new_size = total_size.fetch_add(
new_size_with_excess as u32,
std::sync::atomic::Ordering::SeqCst,
) + new_size_with_excess as u32;

let path = format!(
"logs/{job_id}/{}_{new_size}.txt",
chrono::Utc::now().timestamp_millis()
);

let mut new_current_logs = match compact_kind {
CompactLogs::NoS3 => format!("\n[windmill] No object storage set in instance settings. Previous logs have been saved to disk at {path}"),
CompactLogs::S3 => format!("\n[windmill] Previous logs have been saved to object storage at {path}"),
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
CompactLogs::NotEE => format!("\n[windmill] Previous logs have been saved to disk at {path}"),
};
new_current_logs.push_str(&stored_in_db);

sqlx::query!(
"UPDATE job_logs SET logs = $1, log_offset = $2,
log_file_index = array_append(coalesce(log_file_index, array[]::text[]), $3)
WHERE workspace_id = $4 AND job_id = $5",
new_current_logs,
new_size as i32,
path,
w_id,
job_id
)
.execute(db)
.await?;
prev_logs.push_str(&append_to_storage);

return Ok((prev_logs, path));
}

async fn default_disk_log_storage(
job_id: Uuid,
w_id: &str,
db: &DB,
nlogs: String,
total_size: Arc<AtomicU32>,
compact_kind: CompactLogs,
worker_name: &str,
) {
match compact_logs(
job_id,
&w_id,
&db,
nlogs,
total_size,
compact_kind,
worker_name,
)
.await
{
Err(e) => tracing::error!("Could not compact logs for job {job_id}: {e:?}",),
Ok((prev_logs, path)) => {
let path = format!("{}/{}", TMP_DIR, path);
let splitted = &path.split("/").collect_vec();
tokio::fs::create_dir_all(splitted.into_iter().take(splitted.len() - 1).join("/"))
.await
.map_err(|e| {
tracing::error!("Could not create logs directory: {e:?}",);
e
})
.ok();
let created = tokio::fs::File::create(&path).await;
if let Err(e) = created {
tracing::error!("Could not create logs file {path}: {e:?}",);
return;
}
if let Err(e) = tokio::fs::write(&path, prev_logs).await {
tracing::error!("Could not write to logs file {path}: {e:?}");
} else {
tracing::info!("Logs length of {job_id} has exceeded a threshold. Previous logs have been saved to disk at {path}");
}
}
}
}

pub(crate) async fn append_job_logs(
job_id: Uuid,
Expand All @@ -184,43 +38,7 @@ pub(crate) async fn append_job_logs(
) -> () {
if must_compact_logs {
#[cfg(all(feature = "enterprise", feature = "parquet"))]
if let Some(os) = OBJECT_STORE_CACHE_SETTINGS.read().await.clone() {
match compact_logs(
job_id,
&w_id,
&db,
logs,
total_size,
CompactLogs::S3,
&worker_name,
)
.await
{
Err(e) => tracing::error!("Could not compact logs for job {job_id}: {e:?}",),
Ok((prev_logs, path)) => {
tracing::info!("Logs length of {job_id} has exceeded a threshold. Previous logs have been saved to object storage at {path}");
let path2 = path.clone();
if let Err(e) = os
.put(&Path::from(path), prev_logs.to_string().into_bytes().into())
.await
{
tracing::error!("Could not save logs to s3: {e:?}");
}
tracing::info!("Logs of {job_id} saved to object storage at {path2}");
}
}
} else {
default_disk_log_storage(
job_id,
&w_id,
&db,
logs,
total_size,
CompactLogs::NoS3,
&worker_name,
)
.await;
}
s3_storage(job_id, &w_id, &db, &logs, &total_size, &worker_name).await;

#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
{
Expand Down
26 changes: 26 additions & 0 deletions backend/windmill-worker/src/job_logger_ee.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use uuid::Uuid;
use windmill_common::DB;

use crate::job_logger::CompactLogs;

#[cfg(all(feature = "enterprise", feature = "parquet"))]
pub(crate) async fn s3_storage(_job_id: Uuid, _w_id: &String, _db: &sqlx::Pool<sqlx::Postgres>, _logs: &String, _total_size: &Arc<AtomicU32>, _worker_name: &String) {
tracing::info!("Logs length of {job_id} has exceeded a threshold. Implementation to store excess on s3 in not OSS");
}

pub(crate) async fn default_disk_log_storage(
job_id: Uuid,
_w_id: &str,
_db: &DB,
_nlogs: String,
_total_size: Arc<AtomicU32>,
_compact_kind: CompactLogs,
_worker_name: &str,
) {
tracing::info!("Logs length of {job_id} has exceeded a threshold. Implementation to store excess on disk in not OSS");
}


2 changes: 1 addition & 1 deletion backend/windmill-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod rust_executor;
mod worker;
mod worker_flow;
mod worker_lockfiles;

mod job_logger_ee;
pub use worker::*;

pub use result_processor::handle_job_error;
Expand Down

0 comments on commit ce01fa1

Please sign in to comment.