diff --git a/Cargo.lock b/Cargo.lock index 121284d..22f14ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,11 +279,13 @@ version = "0.1.0" dependencies = [ "chrono", "crossbeam-channel", + "crossbeam-deque", "email-address-parser", "email-parser", "eyre", "flate2", "lazy_static", + "num_cpus", "rayon", "regex", "rusqlite", diff --git a/Cargo.toml b/Cargo.toml index c4c7280..70a44bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,4 +22,6 @@ serde_json = "*" serde = { version = "*", features = ["derive"]} strum = "0.21" strum_macros = "0.21" -crossbeam-channel = "0.5.1" \ No newline at end of file +crossbeam-channel = "0.5.1" +crossbeam-deque = "0.8.1" +num_cpus = "1.13.0" \ No newline at end of file diff --git a/src/database.rs b/src/database.rs index 7d05aed..26a9a7a 100644 --- a/src/database.rs +++ b/src/database.rs @@ -8,14 +8,14 @@ use rusqlite::{self, params, Connection, Error, Row}; #[derive(Debug)] pub struct Database { - connection: Option, + connection: Connection, } -pub enum DBMessage { - Mail(EmailEntry), +/*pub enum DBMessage<'a> { + Mail(EmailEntry<'a>), Error(Report, PathBuf), Done, -} +}*/ impl Database { /// Create a in-memory db. @@ -27,11 +27,35 @@ impl Database { // println!("SQL: {}", &n); //})); Ok(Database { - connection: Some(connection), + connection: connection, }) } - pub fn process(&mut self) -> Sender { + pub fn insert_mail(&self, entry: &EmailEntry) -> Result<()> { + let path = entry.path.display().to_string(); + let domain = &entry.domain; + let local_part = &entry.local_part; + let year = entry.datetime.date().year(); + let month = entry.datetime.date().month(); + let day = entry.datetime.date().day(); + let kind = entry.parser.to_string(); + let subject = entry.subject.to_string(); + let sql = r#"INSERT INTO emails (path, domain, local_part, year, month, day, kind, subject) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#; + let mut prepared = self.connection.prepare(sql)?; + prepared.execute(params![ + path, domain, local_part, year, month, day, kind, subject + ])?; + Ok(()) + } + + pub fn insert_error(&self, message: &Report, path: &PathBuf) -> Result<()> { + let sql = "INSERT INTO errors (message, path) VALUES (?, ?)"; + let mut prepared = self.connection.prepare(sql)?; + prepared.execute(params![message.to_string(), path.display().to_string()])?; + Ok(()) + } + + /*pub fn process(&mut self) -> Sender> { let (sender, receiver) = unbounded(); let connection = self.connection.take().unwrap(); std::thread::spawn(move || loop { @@ -53,7 +77,7 @@ impl Database { //} }); sender - } + }*/ fn create_tables(connection: &Connection) -> Result<()> { let emails_table = r#" @@ -78,30 +102,6 @@ CREATE TABLE IF NOT EXISTS errors ( } } -fn insert_mail(connection: &Connection, entry: &EmailEntry) -> Result<()> { - let path = entry.path.display().to_string(); - let domain = &entry.domain; - let local_part = &entry.local_part; - let year = entry.datetime.date().year(); - let month = entry.datetime.date().month(); - let day = entry.datetime.date().day(); - let kind = entry.parser.to_string(); - let subject = entry.subject.to_string(); - let sql = "INSERT INTO emails (path, domain, local_part, year, month, day, kind, subject) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; - let mut prepared = connection.prepare(sql)?; - prepared.execute(params![ - path, domain, local_part, year, month, day, kind, subject - ])?; - Ok(()) -} - -fn insert_error(connection: &Connection, message: &Report, path: &PathBuf) -> Result<()> { - let sql = "INSERT INTO errors (message, path) VALUES (?, ?)"; - let mut prepared = connection.prepare(sql)?; - prepared.execute(params![message.to_string(), path.display().to_string()])?; - Ok(()) -} - pub trait RowConversion: Sized { fn from_row<'stmt>(row: &Row<'stmt>) -> Result; fn to_row(&self) -> Result; diff --git a/src/emails.rs b/src/emails.rs index 7f89805..eece603 100644 --- a/src/emails.rs +++ b/src/emails.rs @@ -6,6 +6,9 @@ use flate2::read::GzDecoder; use rayon::prelude::*; use serde::Deserialize; use serde_json; +use std::borrow::Cow; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; use strum_macros; const SENDER_HEADER_NAMES: &[&str] = &["Sender", "Reply-to", "From"]; @@ -28,13 +31,13 @@ pub enum ParserKind { /// Representation of an email #[derive(Debug)] -pub struct EmailEntry { +pub struct EmailEntry<'a> { pub path: PathBuf, - pub domain: String, - pub local_part: String, + pub domain: Cow<'a, str>, + pub local_part: Cow<'a, str>, pub datetime: chrono::DateTime, pub parser: ParserKind, - pub subject: String, + pub subject: Cow<'a, str>, } /// Raw representation of an email. @@ -133,7 +136,8 @@ impl Emails { // } //} -fn read_folders(folder: &Path) -> Result> { +pub fn read_folders>(folder: &P) -> Result> { + let folder = folder.as_ref(); Ok(std::fs::read_dir(&folder)? .into_iter() .par_bridge() @@ -183,35 +187,101 @@ fn read_emails(folder_path: &Path) -> Result> { .collect()) } -pub fn read_email(raw_entry: &RawEmailEntry) -> Result { - let content = unziped_content(&raw_entry.eml_path)?; - // We have to try multiple different email readers as each of them seems to fail in a different way - let email = parse_email_parser(&raw_entry, &content).or_else(|e| { - tracing::trace!("Parser Error: {:?}", &e); - parse_meta(&raw_entry, &content) - }); - - Ok(email.wrap_err_with(|| { - format!( - "{}\n{:?}", - String::from_utf8(content.clone()).unwrap(), - &raw_entry - ) - })?) +type ProgressSender = Sender>>; + +use crate::database::Database; +use crossbeam_channel::Sender; +use crossbeam_deque::{Stealer, Worker}; +pub fn process_emails( + emails: Vec, + db: Arc>, + progress_sender: ProgressSender, +) { + // This will open the worker thread that supplies the stealers + // with RawEmailEntry instances + let worker: Worker = Worker::new_fifo(); + let cpus = num_cpus::get() * 2; + let handles: Vec>> = (0..=cpus) + .map(|_| { + let stealer = worker.stealer(); + let sender = progress_sender.clone(); + let db = db.clone(); + std::thread::spawn(move || { + if let Err(e) = stealer_thread(stealer, db, sender) { + // FIXME: Report back + println!("{}", &e); + //db.lock().map(|e| e.insert_error(e, path)) + } + Ok(()) + }) + }) + .collect(); + + for email in emails { + worker.push(email); + } + + for handle in handles { + let _ = handle.join().unwrap(); + } + + progress_sender.send(Ok(None)); } -fn parse_email_parser(raw_entry: &RawEmailEntry, content: &Vec) -> Result { +fn stealer_thread( + stealer: Stealer, + db: Arc>, + progress_sender: ProgressSender, +) -> Result<()> { + while let crossbeam_deque::Steal::Success(raw_entry) = stealer.steal() { + let content = match unziped_content(&raw_entry.eml_path) { + Ok(n) => n, + Err(_) => continue, + }; + let email = match read_email(&raw_entry.eml_path.as_path(), &content) { + Ok(n) => n, + Err(_) => continue, + }; + + let result = db.lock().map(|e| e.insert_mail(&email)); + progress_sender.send(Ok(Some(1))); + match result { + Ok(Err(e)) => { + println!("Insertion Error: {}", &e) + } + Ok(Ok(_)) => (), + Err(e) => { + println!("Poison Guard: {:?}", &e); + } + } + } + + Ok(()) +} + +pub fn read_email<'a, 'b>(path: &'b Path, content: &'a [u8]) -> Result> { match email_parser::email::Email::parse(&content) { - Ok(email) => (&raw_entry.eml_path, email).try_into(), - Err(error) => { - //let content_string = String::from_utf8(content.clone())?; - //println!("{}|{}", &error, &raw_entry.eml_path.display()); - Err(eyre!("Could not `email_parser` email:\n{:?}", &error)) + Ok(email) => { + let domain = email.sender.address.domain; + let local_part = email.sender.address.local_part; + let datetime = emaildatetime_to_chrono(&email.date); + let subject = email.subject.unwrap_or_default(); + + let entry = EmailEntry { + path: path.to_path_buf(), + domain, + local_part, + datetime, + parser: ParserKind::EmailParser, + subject, + }; + Ok(entry) } + Err(error) => Err(eyre!("Could not `email_parser` email:\n{:?}", &error)), } } -fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec) -> Result { +/*fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec) -> Result { use chrono::prelude::*; #[derive(Deserialize)] struct Meta { @@ -232,19 +302,22 @@ fn parse_meta(raw_entry: &RawEmailEntry, _content: &Vec) -> Result TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry { +/*impl<'a, 'b> TryFrom<(&'b Path, email_parser::email::Email<'a>)> for EmailEntry<'a> { type Error = eyre::Report; - fn try_from(content: (&PathBuf, email_parser::email::Email)) -> Result { + fn try_from(content: (&'b Path, email_parser::email::Email<'a>)) -> Result + where + 'b: 'a, + { let (path, email) = content; - let domain = email.sender.address.domain.to_string(); - let local_part = email.sender.address.local_part.to_string(); + let domain = email.sender.address.domain; + let local_part = email.sender.address.local_part; let datetime = emaildatetime_to_chrono(&email.date); - let subject = email.subject.map(|e| e.to_string()).unwrap_or_default(); + let subject = email.subject.unwrap_or_default(); Ok(EmailEntry { - path: path.to_path_buf(), + path, domain, local_part, datetime, @@ -252,7 +325,7 @@ impl<'a> TryFrom<(&PathBuf, email_parser::email::Email<'a>)> for EmailEntry { subject, }) } -} +}*/ fn emaildatetime_to_chrono(dt: &email_parser::time::DateTime) -> chrono::DateTime { Utc.ymd( @@ -312,7 +385,7 @@ mod tests { use super::RawEmailEntry; - #[test] + //#[test] //fn test_weird_email1() { // let data = "No Reply , terhechte.5cffa@m.evernote.com"; // let address = super::parse_unstructured(&data).unwrap(); @@ -323,7 +396,7 @@ mod tests { // } // ); //} - #[test] + //#[test] //fn test_weird_email2() { // let data = r#"info@sport-news.denReply-To:info"@sport-news.denX-Mailer:Sport-News.de"#; // let address = super::parse_unstructured(&data).unwrap(); @@ -334,45 +407,45 @@ mod tests { // } // ); //} - #[test] - fn test_weird_email3() { - crate::setup(); - let eml_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.eml.gz", - ) - .unwrap(); - let meta_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.meta", - ) - .unwrap(); - let r = RawEmailEntry { - folder_name: "2014-09".to_owned(), - eml_path, - meta_path, - }; - //let result = super::read_email(&r).expect(""); - let content = Vec::new(); - let result = super::parse_meta(&r, &content).expect(""); - dbg!(&result); - } + //#[test] + // fn test_weird_email3() { + // crate::setup(); + // let eml_path = PathBuf::from_str( + // "/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.eml.gz", + // ) + // .unwrap(); + // let meta_path = PathBuf::from_str( + // "/Users/terhechte/Documents/gmail_backup/db/2014-09/1479692635489080640.meta", + // ) + // .unwrap(); + // let r = RawEmailEntry { + // folder_name: "2014-09".to_owned(), + // eml_path, + // meta_path, + // }; + // //let result = super::read_email(&r).expect(""); + // let content = Vec::new(); + // let result = super::parse_meta(&r, &content).expect(""); + // dbg!(&result); + // } - #[test] - fn test_weird_email4() { - crate::setup(); - let eml_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.eml.gz", - ) - .unwrap(); - let meta_path = PathBuf::from_str( - "/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.meta", - ) - .unwrap(); - let r = RawEmailEntry { - folder_name: "2014-08".to_owned(), - eml_path, - meta_path, - }; - let result = super::read_email(&r).expect(""); - dbg!(&result); - } + // #[test] + // fn test_weird_email4() { + // crate::setup(); + // let eml_path = PathBuf::from_str( + // "/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.eml.gz", + // ) + // .unwrap(); + // let meta_path = PathBuf::from_str( + // "/Users/terhechte/Documents/gmail_backup/db/2014-08/1475705321427236077.meta", + // ) + // .unwrap(); + // let r = RawEmailEntry { + // folder_name: "2014-08".to_owned(), + // eml_path, + // meta_path, + // }; + // let result = super::read_email(&r).expect(""); + // dbg!(&result); + // } } diff --git a/src/main.rs b/src/main.rs index 1e7235a..ae80154 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,10 @@ enum GmailDBError { #[error("Missing folder argument")] MissingFolder, } +// ________________________________________________________ +// Executed in 355.52 secs fish external +// usr time 121.22 secs 95.00 micros 121.22 secs +// sys time 456.33 secs 598.00 micros 456.33 secs fn main() -> Result<()> { setup(); @@ -73,11 +77,11 @@ fn main() -> Result<()> { Ok(()) } -fn process_email(path: &str) -> Result<()> { - let entry = emails::RawEmailEntry::new(&path); - let mail = emails::read_email(&entry).unwrap(); - Ok(()) -} +//fn process_email(path: &str) -> Result<()> { +// let entry = emails::RawEmailEntry::new(&path); +// let mail = emails::read_email(&entry).unwrap(); +// Ok(()) +//} enum FolderProgress { Total(usize), @@ -90,7 +94,7 @@ fn process_folder(folder: &str) -> Result n, Err(e) => { tx.send(Err(e)).unwrap(); @@ -103,11 +107,11 @@ fn process_folder(folder: &str) -> Result Result