Skip to content

Commit

Permalink
feat: add cache to disk support for jsonl
Browse files Browse the repository at this point in the history
Due to the design of chainsaw and the need to aggregate over a file, RAM
usage can grow erratically depending on the number of detections within
a file. When it comes to streamable formats like JSONL we can make use
of the disk to cache the detections therefore saving RAM at the cost of
performance. This commit enables this approach.

Fixes WithSecureLabs#102
  • Loading branch information
Alex Kornitzer committed Jul 3, 2023
1 parent 93a1eeb commit e60655c
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ rayon = "1.5"
regex = "1.6"
rustc-hash = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_json = { version = "1.0", features = ["raw_value"] }
serde_yaml = "0.9"
tau-engine = { version = "1.0", features = ["core", "json", "sync"] }
tempfile = "3.2"
term_size = "0.3"
uuid = { version = "1.1", features = ["serde", "v4"] }

Expand Down
58 changes: 54 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::collections::{hash_map::DefaultHasher, BTreeMap, HashMap, HashSet};
use std::fs;
use std::hash::{Hash, Hasher};
use std::io::*;
use std::time::Duration;

use chrono::{DateTime, NaiveDateTime, SecondsFormat, TimeZone, Utc};
use chrono_tz::Tz;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use prettytable::{cell, format, Row, Table};
use serde::Serialize;
use serde_json::{Map, Number, Value as Json};
use serde_json::{value::RawValue, Map, Number, Value as Json};
use tau_engine::{Document, Value as Tau};
use uuid::Uuid;

Expand Down Expand Up @@ -167,6 +168,7 @@ pub fn print_log(
count = documents.len();
documents.first().expect("could not get document")
}
_ => unimplemented!(),
};

let name = match rule {
Expand Down Expand Up @@ -353,6 +355,7 @@ pub fn print_detections(
count = documents.len();
documents.first().expect("could not get document")
}
_ => unimplemented!(),
};

let mut rows = vec![];
Expand Down Expand Up @@ -754,6 +757,7 @@ pub fn print_csv(
count = documents.len();
documents.first().expect("could not get document")
}
_ => unimplemented!(),
};

let mut rows = vec![];
Expand Down Expand Up @@ -886,6 +890,7 @@ pub fn print_json(
local: bool,
timezone: Option<Tz>,
jsonl: bool,
cache: Option<fs::File>,
) -> crate::Result<()> {
let hunts: HashMap<_, _> = hunts.iter().map(|h| (&h.id, h)).collect();
let mut detections = detections
Expand Down Expand Up @@ -950,9 +955,54 @@ pub fn print_json(
.collect::<Vec<Detection>>();
detections.sort_by(|x, y| x.timestamp.cmp(&y.timestamp));
if jsonl {
for det in &detections {
cs_print_json!(det)?;
cs_println!();
if let Some(cache) = cache.as_ref() {
let mut f = BufReader::new(cache);
for det in detections {
match det.kind {
Kind::Cached {
document,
offset,
size,
} => {
let _ = f.seek(SeekFrom::Start(*offset as u64));
let mut buf = vec![0u8; *size];
let _ = f.read_exact(&mut buf).expect("could not read cached data");
let data = String::from_utf8(buf).expect("could not convert cached data");
let raw =
RawValue::from_string(data).expect("could not serialize cached data");
let kind = Kind::Cached {
document: crate::hunt::RawDocument {
kind: document.kind.clone(),
path: document.path.clone(),
data: Some(&*raw),
},
offset: *offset,
size: *size,
};

cs_print_json!(&Detection {
authors: &det.authors,
group: &det.group,
kind: &kind,
level: &det.level,
name: &det.name,
source: det.source,
status: &det.status,
timestamp: det.timestamp,
sigma: det.sigma,
})?;
}
_ => {
cs_print_json!(&det)?;
}
}
cs_println!();
}
} else {
for det in detections {
cs_print_json!(&det)?;
cs_println!();
}
}
} else {
cs_print_json!(&detections)?;
Expand Down
74 changes: 60 additions & 14 deletions src/hunt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::hash::{BuildHasherDefault, Hash, Hasher};
use std::io::Read;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;

Expand All @@ -16,7 +16,7 @@ use serde::{
ser::{SerializeStruct, Serializer},
Deserialize, Serialize,
};
use serde_json::Value as Json;
use serde_json::{value::RawValue, Value as Json};
use tau_engine::{
core::parser::{Expression, ModSym, Pattern},
Document as TauDocument, Value as Tau,
Expand Down Expand Up @@ -103,11 +103,30 @@ impl<'a> Serialize for Document<'a> {
}
}

#[derive(Debug, Serialize)]
pub struct RawDocument<'a> {
pub kind: FileKind,
pub path: &'a Path,
#[serde(borrow)]
pub data: Option<&'a RawValue>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case", tag = "kind")]
pub enum Kind<'a> {
Aggregate { documents: Vec<Document<'a>> },
Individual { document: Document<'a> },
Aggregate {
documents: Vec<Document<'a>>,
},
Individual {
document: Document<'a>,
},
Cached {
document: RawDocument<'a>,
#[serde(skip)]
offset: usize,
#[serde(skip)]
size: usize,
},
}

#[derive(Default)]
Expand Down Expand Up @@ -693,13 +712,18 @@ impl Hunter {
HunterBuilder::new()
}

pub fn hunt<'a>(&'a self, file: &'a Path) -> crate::Result<Vec<Detections>> {
pub fn hunt<'a>(
&'a self,
file: &'a Path,
cache: &Option<std::fs::File>,
) -> crate::Result<Vec<Detections>> {
let mut reader = Reader::load(file, self.inner.load_unknown, self.inner.skip_errors)?;
let kind = reader.kind();
let aggregates: Mutex<FxHashMap<(Uuid, Uuid), (&Aggregate, FxHashMap<u64, Vec<Uuid>>)>> =
Mutex::new(FxHashMap::default());
let files: Mutex<FxHashMap<Uuid, (Value, NaiveDateTime)>> =
Mutex::new(FxHashMap::default());
let offset = Mutex::new(0);
let mut detections = reader
.documents()
.par_bridge()
Expand Down Expand Up @@ -906,16 +930,38 @@ impl Hunter {
}
}
if !hits.is_empty() {
Some(Ok(Detections {
hits,
kind: Kind::Individual {
document: Document {
kind,
path: file,
data: bincode::serialize(&value).ok()?,
if let Some(mut cache) = cache.as_ref() {
let mut offset = offset.lock().expect("could not lock offset");
let json = serde_json::to_string(&Json::from(value))
.expect("could not serialise data");
let _ = cache.write_all(json.as_bytes());
let val = *offset;
let size = json.as_bytes().len();
*offset += size;
Some(Ok(Detections {
hits,
kind: Kind::Cached {
document: RawDocument {
kind,
path: file,
data: None,
},
offset: val,
size,
},
},
}))
}))
} else {
Some(Ok(Detections {
hits,
kind: Kind::Individual {
document: Document {
kind,
path: file,
data: bincode::serialize(&value).ok()?,
},
},
}))
}
} else {
None
}
Expand Down
13 changes: 12 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ enum Command {
#[arg(short = 'r', long = "rule", number_of_values = 1)]
rule: Option<Vec<PathBuf>>,

/// Cache results to disk to reduce memory usage at the cost of performance.
#[arg(short = 'c', long = "cache-to-disk", requires("jsonl"))]
cache: bool,
/// Set the column width for the tabular output.
#[arg(long = "column-width", conflicts_with = "json")]
column_width: Option<u32>,
Expand Down Expand Up @@ -421,6 +424,7 @@ fn run() -> Result<()> {
rule,

load_unknown,
cache,
mut column_width,
csv,
extension,
Expand Down Expand Up @@ -638,7 +642,12 @@ fn run() -> Result<()> {
let pb = cli::init_progress_bar(files.len() as u64, "Hunting".to_string());
for file in &files {
pb.tick();
let scratch = hunter.hunt(file).with_context(|| {
let cache = if cache {
tempfile::tempfile().ok()
} else {
None
};
let scratch = hunter.hunt(file, &cache).with_context(|| {
format!("Failed to hunt through file '{}'", file.to_string_lossy())
})?;
hits += scratch.iter().map(|d| d.hits.len()).sum::<usize>();
Expand All @@ -651,6 +660,7 @@ fn run() -> Result<()> {
local,
timezone,
jsonl,
cache,
)?;
} else {
detections.extend(scratch);
Expand All @@ -671,6 +681,7 @@ fn run() -> Result<()> {
local,
timezone,
jsonl,
None,
)?;
} else if jsonl {
// Work already done
Expand Down

0 comments on commit e60655c

Please sign in to comment.