diff --git a/CHANGELOG.md b/CHANGELOG.md index f42dc8b..48b1949 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Clear in-memory subscriptions when a SIGHUP signal is received, resulting in all file descriptors used by subscriptions being closed (#37) - `heartbeats_queue_size` now defaults to 2048 instead of 32 (#37) - **Breaking change**: Keytab file path must be specified only once for all collectors (using Kerberos authentication) +- A malformed event will no longer stop the event stream (for a computer/subscription) because formatters are not allowed to fail. In problematic cases, some work is done to try to recover the raw data of the event, and an `OpenWEC.Error` field is added (in the JSON formatter) to help catch the problem (#47) ## [0.1.0] - 2023-05-30 diff --git a/doc/formats.md b/doc/formats.md index 32e61c7..e600c83 100644 --- a/doc/formats.md +++ b/doc/formats.md @@ -44,6 +44,12 @@ openwec_data := { "Version": string, "Uuid": string, "Uri": string + }, + /* Only in case of error during event parsing or serializing */ + "Error": { + "OriginalContent": string, + "Type": string, + "Message": string } } diff --git a/server/src/event.rs b/server/src/event.rs index feee3b0..57f9a6d 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, bail, Context, Result}; use chrono::{DateTime, Utc}; -use log::{info, trace}; -use roxmltree::{Document, Node}; +use log::{debug, info, trace, warn}; +use roxmltree::{Document, Error, Node}; use serde::Serialize; use std::{collections::HashMap, net::SocketAddr}; @@ -67,10 +67,57 @@ impl DataType { } } +#[derive(Serialize, Debug, Clone, Default)] +#[serde(tag = "Type")] +pub enum ErrorType { + /// Initial XML parsing failed but Raw content could be recovered + RawContentRecovered { + #[serde(rename = "Message")] + message: String, + }, + /// Initial XML parsing failed and recovering failed again + FailedToRecoverRawContent { + #[serde(rename = "Message")] + message: String, + }, + /// Initial XML parsing failed and no recovering strategy was usable + Unrecoverable { + #[serde(rename = "Message")] + message: String, + }, + /// Failed to feed event from parsed XML document + FailedToFeedEvent { + #[serde(rename = "Message")] + message: String, + }, + #[default] + Unknown, +} + +impl ToString for ErrorType { + fn to_string(&self) -> String { + match self { + ErrorType::RawContentRecovered { message } => message.clone(), + ErrorType::FailedToRecoverRawContent { message } => message.clone(), + ErrorType::Unrecoverable { message } => message.clone(), + ErrorType::FailedToFeedEvent { message } => message.clone(), + ErrorType::Unknown => "Unknown error".to_string(), + } + } +} + +#[derive(Debug, Default, Serialize, Clone)] +struct ErrorInfo { + #[serde(rename = "OriginalContent")] + original_content: String, + #[serde(flatten)] + _type: ErrorType, +} + #[derive(Debug, Default, Serialize, Clone)] pub struct Event { - #[serde(rename = "System")] - system: System, + #[serde(rename = "System", skip_serializing_if = "Option::is_none")] + system: Option, #[serde(flatten, skip_serializing_if = "DataType::is_unknown")] data: DataType, #[serde(rename = "RenderingInfo", skip_serializing_if = "Option::is_none")] @@ -80,25 +127,16 @@ pub struct Event { } impl Event { - pub fn from_str(metadata: &EventMetadata, content: &str) -> Result { - let doc = Document::parse(content).context("Failed to parse event XML")?; - let mut event = Event::default(); - event.additional = Additional { - addr: metadata.addr().ip().to_string(), - principal: metadata.principal().to_owned(), // TODO : change to something that works for TLS as well (modify db and output) - node: metadata.node_name().cloned(), - time_received: metadata.time_received().to_rfc3339(), - subscription: SubscriptionType { - uuid: metadata.subscription_uuid().to_owned(), - version: metadata.subscription_version().to_owned(), - name: metadata.subscription_name().to_owned(), - uri: metadata.subscription_uri().cloned(), - }, - }; + /// Reads a parsed XML document and feeds an Event struct + fn feed_event_from_document( + event: &mut Event, + doc: &Document<'_>, + content: &str, // Only used for logging + ) -> Result<()> { let root = doc.root_element(); for node in root.children() { if node.tag_name().name() == "System" { - event.system = System::from(&node).context("Parsing failure in System")? + event.system = Some(System::from(&node).context("Parsing failure in System")?) } else if node.tag_name().name() == "EventData" { event.data = parse_event_data(&node).context("Parsing failure in EventData")? } else if node.tag_name().name() == "UserData" { @@ -120,8 +158,120 @@ impl Event { trace!("Event was: {}", content); } } + Ok(()) + } - Ok(event) + fn add_event_parsing_error( + event: &mut Event, + content: &str, + error_type: ErrorType, + warn: bool, + ) { + event.additional.error = Some(ErrorInfo { + original_content: content.to_string(), + _type: error_type.clone(), + }); + let error_message = error_type.to_string(); + if warn { + warn!("{}. Context: {:?}", error_message, event.additional); + } else { + debug!("{}. Context: {:?}", error_message, event.additional); + } + } + + fn try_to_recover(event: &mut Event, initial_error: Error, content: &str) { + // Sometimes, `RenderingInfo` content is malformed, meaning that + // the event content is cut off in the middle without appropriate closing + // tags resulting in invalid XML (see issue #46 for more details). + // + // When this problem occurs, we try to remove the RenderingInfo + // element and recover the Raw event content. Such an operation should + // be valid because RenderingInfo is the last "specified" child node + // of the Event element according to the Event schema. + // + // See tests: + // - test_serialize_malformed_raw_content_recovered + // - test_serialize_malformed_unrecoverable_1 + // - test_serialize_malformed_unrecoverable_2 + // - test_serialize_failed_to_recover + // - test_serialize_malformed_failed_to_feed_event + let (error_type, do_warn) = match content.rsplit_once(" { + let clean_content = beginning.to_string() + ""; + match Document::parse(&clean_content) { + Ok(doc) => { + match Event::feed_event_from_document(event, &doc, &clean_content) { + Ok(_) => + (ErrorType::RawContentRecovered { message: format!( + "Failed to parse event XML ({}) but Raw content could be recovered.", + initial_error + ) }, false), + Err(feed_error) => + (ErrorType::FailedToFeedEvent { message: format!( + "Could not feed event from document: {}", + feed_error + ) }, true), + } + } + Err(recovering_error) => { + (ErrorType::FailedToRecoverRawContent { message: format!( + "Failed to parse event XML ({}) and Raw content recovering failed ({})", + initial_error, recovering_error + ) }, true) + } + } + } + None => ( + ErrorType::Unrecoverable { + message: format!("Failed to parse event XML: {}", initial_error), + }, + true, + ), + }; + Event::add_event_parsing_error(event, content, error_type, do_warn); + } + + pub fn from_str(metadata: &EventMetadata, content: &str) -> Self { + let mut event = Event { + additional: Additional { + addr: metadata.addr().ip().to_string(), + principal: metadata.principal().to_owned(), // TODO : change to something that works for TLS as well (modify db and output) + node: metadata.node_name().cloned(), + time_received: metadata.time_received().to_rfc3339(), + subscription: SubscriptionType { + uuid: metadata.subscription_uuid().to_owned(), + version: metadata.subscription_version().to_owned(), + name: metadata.subscription_name().to_owned(), + uri: metadata.subscription_uri().cloned(), + }, + error: None, + }, + ..Default::default() + }; + + let doc_parse_attempt = Document::parse(content); + match doc_parse_attempt { + Ok(doc) => { + if let Err(feed_error) = Event::feed_event_from_document(&mut event, &doc, content) + { + let message = format!("Could not feed event from document: {}", feed_error); + Event::add_event_parsing_error( + &mut event, + content, + ErrorType::FailedToFeedEvent { message }, + true, + ); + } + } + Err(initial_error) => { + debug!( + "Failed to parse XML event: {}. Lets try to recover it.", + initial_error + ); + Event::try_to_recover(&mut event, initial_error, content); + } + } + event } } @@ -213,6 +363,8 @@ struct Additional { subscription: SubscriptionType, #[serde(rename = "Node", skip_serializing_if = "Option::is_none")] node: Option, + #[serde(rename = "Error", skip_serializing_if = "Option::is_none")] + error: Option, } #[derive(Debug, Default, Serialize, Clone)] @@ -427,12 +579,14 @@ struct RenderingInfo { impl RenderingInfo { fn from(rendering_info_node: &Node) -> Result { - let mut rendering_info = RenderingInfo::default(); + let mut rendering_info = RenderingInfo { + culture: rendering_info_node + .attribute("Culture") + .unwrap_or_default() + .to_owned(), + ..Default::default() + }; - rendering_info.culture = rendering_info_node - .attribute("Culture") - .unwrap_or_default() - .to_owned(); for node in rendering_info_node.children() { let tag = node.tag_name(); if tag.name() == "Message" { @@ -855,7 +1009,7 @@ mod tests { #[test] fn test_4689_parsing() { - Event::from_str( + let event = Event::from_str( &EventMetadata { addr: SocketAddr::from_str("192.168.0.1:5985").unwrap(), principal: "win10.windomain.local".to_owned(), @@ -871,8 +1025,8 @@ mod tests { subscription_uri: Some("/this/is/a/test".to_string()), }, EVENT_4689, - ) - .expect("Failed to parse Event"); + ); + assert!(event.additional.error.is_none()) } const EVENT_4688: &str = r#"4688201331200x8020000000000000114689Securitywin10.windomain.localS-1-5-18WIN10$WINDOMAIN0x3e70x3a8C:\Program Files (x86)\Microsoft\EdgeUpdate\MicrosoftEdgeUpdate.exe%%19360x240S-1-0-0--0x0C:\Windows\System32\services.exeS-1-16-16384A new process has been created. @@ -925,16 +1079,14 @@ Type 3 is a limited token with administrative privileges removed and administrat subscription_uri: Some("/this/is/a/test".to_string()), }, EVENT_4688, - ) - .expect("Failed to parse Event"); + ); + assert!(event.additional.error.is_none()); let event_json = serde_json::to_string(&event).unwrap(); let event_json_value: Value = serde_json::from_str(&event_json).unwrap(); let expected_value: Value = serde_json::from_str(EVENT_4688_JSON).unwrap(); - println!("{}", event_json_value); - println!("{}", expected_value); assert_eq!(event_json_value, expected_value); } @@ -968,16 +1120,14 @@ Licensing Status= subscription_uri: None, }, EVENT_1003, - ) - .expect("Failed to parse Event"); + ); + assert!(event.additional.error.is_none()); let event_json = serde_json::to_string(&event).unwrap(); let event_json_value: Value = serde_json::from_str(&event_json).unwrap(); let expected_value: Value = serde_json::from_str(EVENT_1003_JSON).unwrap(); - println!("{}", event_json_value); - println!("{}", expected_value); assert_eq!(event_json_value, expected_value); } @@ -1007,16 +1157,14 @@ If this computer is a domain controller for the specified domain, it sets up the subscription_uri: Some("/this/is/a/test".to_string()), }, EVENT_5719, - ) - .expect("Failed to parse Event"); + ); + assert!(event.additional.error.is_none()); let event_json = serde_json::to_string(&event).unwrap(); let event_json_value: Value = serde_json::from_str(&event_json).unwrap(); let expected_value: Value = serde_json::from_str(EVENT_5719_JSON).unwrap(); - println!("{}", event_json_value); - println!("{}", expected_value); assert_eq!(event_json_value, expected_value); } @@ -1041,8 +1189,8 @@ If this computer is a domain controller for the specified domain, it sets up the subscription_uri: Some("/this/is/a/test".to_string()), }, EVENT_6013, - ) - .expect("Failed to parse Event"); + ); + assert!(event.additional.error.is_none()); let event_json = serde_json::to_string(&event).unwrap(); @@ -1075,16 +1223,14 @@ If this computer is a domain controller for the specified domain, it sets up the subscription_uri: Some("/this/is/a/test".to_string()), }, EVENT_1100, - ) - .expect("Failed to parse Event"); + ); + assert!(event.additional.error.is_none()); let event_json = serde_json::to_string(&event).unwrap(); let event_json_value: Value = serde_json::from_str(&event_json).unwrap(); let expected_value: Value = serde_json::from_str(EVENT_1100_JSON).unwrap(); - println!("{}", event_json_value); - println!("{}", expected_value); assert_eq!(event_json_value, expected_value); } @@ -1109,8 +1255,8 @@ If this computer is a domain controller for the specified domain, it sets up the subscription_uri: Some("/this/is/a/test".to_string()), }, EVENT_111, - ) - .expect("Failed to parse Event"); + ); + assert!(event.additional.error.is_none()); let event_json = serde_json::to_string(&event).expect("Failed to serialize event"); @@ -1119,4 +1265,174 @@ If this computer is a domain controller for the specified domain, it sets up the assert_eq!(event_json_value, expected_value); } + + const RAW_CONTENT_RECOVERED: &str = r#"4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\Windows\\System32\\svchost.exeA user's local group membership was enumerated. Subject: "#; + const RAW_CONTENT_RECOVERED_JSON: &str = r#"{"System":{ "Provider":{ "Name":"Microsoft-Windows-Security-Auditing", "Guid":"{54849625-5478-4994-a5ba-3e3b0328c30d}" }, "EventID":4798, "Version":0, "Level":0, "Task":13824, "Opcode":0, "Keywords":"0x8020000000000000", "TimeCreated":"2023-09-29T13:39:08.7234692Z", "EventRecordID":980236, "Correlation":{ "ActivityID":"{f59bb999-ec5b-0008-f6b9-9bf55becd901}" }, "Execution":{ "ProcessID":1440, "ThreadID":16952 }, "Channel":"Security", "Computer":"dvas0004_xps" }, "EventData":{ "SubjectLogonId":"0x3e7", "TargetDomainName":"xxxxx_xps", "CallerProcessId":"0x28d4", "CallerProcessName":"C:\\\\Windows\\\\System32\\\\svchost.exe", "TargetUserName":"davev", "SubjectDomainName":"WORKGROUP", "SubjectUserName":"xxxxx_XPS$", "TargetSid":"S-1-5-21-1604529354-1295832394-4197355770-1001", "SubjectUserSid":"S-1-5-18" }, "OpenWEC":{ "IpAddress":"127.0.0.1", "TimeReceived":"2023-09-29T14:33:12.574363325+00:00", "Principal":"demo-client", "Subscription":{ "Uuid":"91E05B32-F8F6-48CF-8AB4-4038233B83AC", "Version":"523D1886-E73E-4A96-A95D-F0326CB282F0", "Name":"my-test-subscription" }, "Error":{ "OriginalContent":"4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\\\Windows\\\\System32\\\\svchost.exeA user's local group membership was enumerated. Subject: ", "Type": "RawContentRecovered", "Message":"Failed to parse event XML (the root node was opened but never closed) but Raw content could be recovered." } } }"#; + + #[test] + fn test_serialize_malformed_raw_content_recovered() { + // Try to serialize a malformed event, and use the recovering strategy to + // recover its Raw content + let event = Event::from_str( + &EventMetadata { + addr: SocketAddr::from_str("127.0.0.1:5985").unwrap(), + principal: "demo-client".to_string(), + node_name: None, + time_received: chrono::DateTime::parse_from_rfc3339( + "2023-09-29T14:33:12.574363325+00:00", + ) + .unwrap() + .with_timezone(&Utc), + subscription_name: "my-test-subscription".to_string(), + subscription_uuid: "91E05B32-F8F6-48CF-8AB4-4038233B83AC".to_string(), + subscription_version: "523D1886-E73E-4A96-A95D-F0326CB282F0".to_string(), + subscription_uri: None, + }, + RAW_CONTENT_RECOVERED, + ); + assert!(event.additional.error.is_some()); + + let event_json = serde_json::to_string(&event).expect("Failed to serialize event"); + + let event_json_value: Value = serde_json::from_str(&event_json).unwrap(); + let expected_value: Value = serde_json::from_str(RAW_CONTENT_RECOVERED_JSON).unwrap(); + + assert_eq!(event_json_value, expected_value); + } + + const UNRECOVERABLE_1: &str = r#"4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\Windows\\System32\\svchost.exe"#; + const UNRECOVERABLE_1_JSON: &str = r#"{"OpenWEC":{ "IpAddress":"127.0.0.1", "TimeReceived":"2023-09-29T14:33:12.574363325+00:00", "Principal":"demo-client", "Subscription":{ "Uuid":"91E05B32-F8F6-48CF-8AB4-4038233B83AC", "Version":"523D1886-E73E-4A96-A95D-F0326CB282F0", "Name":"my-test-subscription" }, "Error":{ "Type": "Unrecoverable", "OriginalContent":"4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\\\Windows\\\\System32\\\\svchost.exe", "Message":"Failed to parse event XML: the root node was opened but never closed" } } }"#; + + #[test] + fn test_serialize_malformed_unrecoverable_1() { + // Try to serialize an event for which there is no recovering strategy + let event = Event::from_str( + &EventMetadata { + addr: SocketAddr::from_str("127.0.0.1:5985").unwrap(), + principal: "demo-client".to_string(), + node_name: None, + time_received: chrono::DateTime::parse_from_rfc3339( + "2023-09-29T14:33:12.574363325+00:00", + ) + .unwrap() + .with_timezone(&Utc), + subscription_name: "my-test-subscription".to_string(), + subscription_uuid: "91E05B32-F8F6-48CF-8AB4-4038233B83AC".to_string(), + subscription_version: "523D1886-E73E-4A96-A95D-F0326CB282F0".to_string(), + subscription_uri: None, + }, + UNRECOVERABLE_1, + ); + assert!(event.additional.error.is_some()); + + let event_json = serde_json::to_string(&event).expect("Failed to serialize event"); + + let event_json_value: Value = serde_json::from_str(&event_json).unwrap(); + let expected_value: Value = serde_json::from_str(UNRECOVERABLE_1_JSON).unwrap(); + + assert_eq!(event_json_value, expected_value); + } + + const UNRECOVERABLE_2: &str = r#"4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\Windows\\System32\\svchost.exe4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\\\Windows\\\\System32\\\\svchost.exe4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\Windows\\System32\\svchost.exeA use"#; + const FAILED_TO_FEED_EVENT_JSON: &str = r#"{"OpenWEC":{ "IpAddress":"127.0.0.1", "TimeReceived":"2023-09-29T14:33:12.574363325+00:00", "Principal":"demo-client", "Subscription":{ "Uuid":"91E05B32-F8F6-48CF-8AB4-4038233B83AC", "Version":"523D1886-E73E-4A96-A95D-F0326CB282F0", "Name":"my-test-subscription" }, "Error":{ "Type": "FailedToFeedEvent", "OriginalContent":"4798001382400x8020000000000000980236Securitydvas0004_xpsdavevxxxxx_xpsS-1-5-21-1604529354-1295832394-4197355770-1001S-1-5-18xxxxx_XPS$WORKGROUP0x3e70x28d4C:\\\\Windows\\\\System32\\\\svchost.exeA use", "Message":"Could not feed event from document: Parsing failure in System" } } }"#; + + #[test] + fn test_serialize_malformed_failed_to_feed_event() { + // Try to serialize a malformed event for which the recovering strategy can + // not succeed because is invalid. + let event = Event::from_str( + &EventMetadata { + addr: SocketAddr::from_str("127.0.0.1:5985").unwrap(), + principal: "demo-client".to_string(), + node_name: None, + time_received: chrono::DateTime::parse_from_rfc3339( + "2023-09-29T14:33:12.574363325+00:00", + ) + .unwrap() + .with_timezone(&Utc), + subscription_name: "my-test-subscription".to_string(), + subscription_uuid: "91E05B32-F8F6-48CF-8AB4-4038233B83AC".to_string(), + subscription_version: "523D1886-E73E-4A96-A95D-F0326CB282F0".to_string(), + subscription_uri: None, + }, + FAILED_TO_FEED_EVENT, + ); + assert!(event.additional.error.is_some()); + + let event_json = serde_json::to_string(&event).expect("Failed to serialize event"); + + let event_json_value: Value = serde_json::from_str(&event_json).unwrap(); + let expected_value: Value = serde_json::from_str(FAILED_TO_FEED_EVENT_JSON).unwrap(); + + assert_eq!(event_json_value, expected_value); + } } diff --git a/server/src/formatter.rs b/server/src/formatter.rs index f583c8e..a7da978 100644 --- a/server/src/formatter.rs +++ b/server/src/formatter.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::{Context, Result}; +use log::warn; use crate::event::{Event, EventMetadata}; use common::subscription::SubscriptionOutputFormat; @@ -21,22 +21,26 @@ impl From<&SubscriptionOutputFormat> for Format { } impl Format { - pub fn format(&self, metadata: &EventMetadata, raw: Arc) -> Result> { + pub fn format(&self, metadata: &EventMetadata, raw: Arc) -> Option> { + // Formatters are allowed to return None when they can't do + // anything else... match &self { Format::Json => format_json(metadata, raw), - Format::Raw => format_raw(raw), + Format::Raw => Some(raw), } } } -fn format_json(metadata: &EventMetadata, raw: Arc) -> Result> { - let event = Event::from_str(metadata, raw.as_ref()) - .with_context(|| format!("Failed to parse event: {:?}", raw))?; - Ok(Arc::new(serde_json::to_string(&event).with_context( - || format!("Failed to format event: {:?}", event), - )?)) -} - -fn format_raw(raw: Arc) -> Result> { - Ok(raw) +fn format_json(metadata: &EventMetadata, raw: Arc) -> Option> { + let event = Event::from_str(metadata, raw.as_ref()); + match serde_json::to_string(&event) { + Ok(str) => Some(Arc::new(str)), + Err(e) => { + warn!( + "Failed to serialize event in JSON: {:?}. Event was: {:?}", + e, event + ); + None + } + } } diff --git a/server/src/logic.rs b/server/src/logic.rs index 055645f..5d05c80 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -356,11 +356,9 @@ async fn handle_events( for format in subscription.formats() { let mut content = Vec::new(); for raw in events.iter() { - content.push( - format - .format(&metadata, raw.clone()) - .with_context(|| format!("Failed to format event with {:?}", format))?, - ); + if let Some(str) = format.format(&metadata, raw.clone()) { + content.push(str.clone()) + } } formatted_events.insert(format.clone(), Arc::new(content)); }