From 4ce7a32755a1b4f668da4c0693bda77eb6c627ba Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Wed, 11 Nov 2020 20:53:13 -0600 Subject: [PATCH] chore(file source)!: migrate checkpoints to single JSON file (#4899) * fix bug in decoding Signed-off-by: Luke Steensen * transition to storing checkpoints in a json file Signed-off-by: Luke Steensen * fix mysterious issue Signed-off-by: Luke Steensen * clarify comment Signed-off-by: Luke Steensen * add ignore_before handling Signed-off-by: Luke Steensen * clippy fixes Signed-off-by: Luke Steensen * clippy fixes for tests Signed-off-by: Luke Steensen Signed-off-by: Brian Menges --- Cargo.lock | 3 + lib/file-source/Cargo.toml | 3 + lib/file-source/src/checkpointer.rs | 318 ++++++++++++++++++++++++--- lib/file-source/src/file_server.rs | 8 +- lib/file-source/src/file_watcher.rs | 16 +- lib/file-source/src/fingerprinter.rs | 3 +- src/sources/file.rs | 5 +- 7 files changed, 318 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01f6616f30d05..7a7484d7845b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,6 +1615,7 @@ name = "file-source" version = "0.1.0" dependencies = [ "bytes 0.5.6", + "chrono", "crc", "flate2", "futures 0.3.5", @@ -1624,6 +1625,8 @@ dependencies = [ "quickcheck", "rand 0.7.3", "scan_fmt", + "serde", + "serde_json", "tempfile", "tokio", "tracing 0.1.21", diff --git a/lib/file-source/Cargo.toml b/lib/file-source/Cargo.toml index 0242776952fd2..a0b9b8fb246ab 100644 --- a/lib/file-source/Cargo.toml +++ b/lib/file-source/Cargo.toml @@ -17,6 +17,9 @@ flate2 = "1.0.19" winapi = { version = "0.3", features = ["winioctl"] } libc = "0.2" tokio = { version = "0.2.13", features = ["time"] } +serde = { version = "1.0.117", features = ["derive"] } +serde_json = "1.0.33" +chrono = { version = "0.4.19", features = ["serde"] } [dev-dependencies] quickcheck = "0.9" diff --git a/lib/file-source/src/checkpointer.rs b/lib/file-source/src/checkpointer.rs index aa7e452838d0c..7ed8b418ee6f7 100644 --- a/lib/file-source/src/checkpointer.rs +++ b/lib/file-source/src/checkpointer.rs @@ -1,26 +1,58 @@ use super::{fingerprinter::FileFingerprint, FilePosition}; +use chrono::{DateTime, Utc}; use glob::glob; +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, fs, io, path::{Path, PathBuf}, - time, }; +const TMP_FILE_NAME: &str = "checkpoints.new.json"; +const STABLE_FILE_NAME: &str = "checkpoints.json"; + +/// This enum represents the file format of checkpoints persisted to disk. Right now there is only +/// one variant, but any incompatible changes will require and additional variant to be added here +/// and handled anywhere that we transit this format. +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "version", rename_all = "snake_case")] +enum State { + #[serde(rename = "1")] + V1 { checkpoints: Vec }, +} + +/// A simple JSON-friendly struct of the fingerprint/position pair, since fingerprints as objects +/// cannot be keys in a plain JSON map. +#[derive(Debug, Serialize, Deserialize)] +struct Checkpoint { + fingerprint: FileFingerprint, + position: FilePosition, + modified: DateTime, +} + pub struct Checkpointer { directory: PathBuf, + tmp_file_path: PathBuf, + stable_file_path: PathBuf, glob_string: String, checkpoints: HashMap, + modified_times: HashMap>, } impl Checkpointer { pub fn new(data_dir: &Path) -> Checkpointer { let directory = data_dir.join("checkpoints"); let glob_string = directory.join("*").to_string_lossy().into_owned(); + let tmp_file_path = data_dir.join(TMP_FILE_NAME); + let stable_file_path = data_dir.join(STABLE_FILE_NAME); + Checkpointer { directory, glob_string, + tmp_file_path, + stable_file_path, checkpoints: HashMap::new(), + modified_times: HashMap::new(), } } @@ -29,6 +61,7 @@ impl Checkpointer { /// For each of the non-legacy variants, prepend an identifier byte that falls outside of the /// hex range used by the legacy implementation. This allows them to be differentiated by /// simply peeking at the first byte. + #[cfg(test)] fn encode(&self, fng: FileFingerprint, pos: FilePosition) -> PathBuf { use FileFingerprint::*; @@ -62,7 +95,7 @@ impl Checkpointer { } 'i' => { let (dev, ino, pos) = - scan_fmt!(file_name, "i{x}.{y}.{}", [hex u64], [hex u64], FilePosition) + scan_fmt!(file_name, "i{x}.{x}.{}", [hex u64], [hex u64], FilePosition) .unwrap(); (DevInode(dev, ino), pos) } @@ -73,14 +106,55 @@ impl Checkpointer { } } - pub fn set_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) { + pub fn update_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) { self.checkpoints.insert(fng, pos); + self.modified_times.insert(fng, Utc::now()); } pub fn get_checkpoint(&self, fng: FileFingerprint) -> Option { self.checkpoints.get(&fng).cloned() } + fn load_checkpoint(&mut self, checkpoint: Checkpoint) { + self.checkpoints + .insert(checkpoint.fingerprint, checkpoint.position); + self.modified_times + .insert(checkpoint.fingerprint, checkpoint.modified); + } + + fn set_state(&mut self, state: State, ignore_before: Option>) { + match state { + State::V1 { checkpoints } => { + for checkpoint in checkpoints { + if let Some(ignore_before) = ignore_before { + if checkpoint.modified < ignore_before { + continue; + } + } + self.load_checkpoint(checkpoint); + } + } + } + } + + fn get_state(&self) -> State { + State::V1 { + checkpoints: self + .checkpoints + .iter() + .map(|(&fingerprint, &position)| Checkpoint { + fingerprint, + position, + modified: self + .modified_times + .get(&fingerprint) + .cloned() + .unwrap_or_else(Utc::now), + }) + .collect(), + } + } + /// Scan through a given list of fresh fingerprints (i.e. not legacy Unknown) to see if any /// match an existing legacy fingerprint. If so, upgrade the existing fingerprint. pub fn maybe_upgrade(&mut self, fresh: impl Iterator) { @@ -94,7 +168,28 @@ impl Checkpointer { } } + /// Persist the current checkpoints state to disk, making our best effort to do so in an atomic + /// way that allow for recovering the previous state in the event of a crash. pub fn write_checkpoints(&mut self) -> Result { + // First write the new checkpoints to a tmp file and flush it fully to disk. If vector + // dies anywhere during this section, the existing stable file will still be in its current + // valid state and we'll be able to recover. + let mut f = fs::File::create(&self.tmp_file_path)?; + serde_json::to_writer(&mut f, &self.get_state())?; + f.sync_all()?; + + // Once the temp file is fully flushed, rename the tmp file to replace the previous stable + // file. This is an atomic operation on POSIX systems (and the stdlib claims to provide + // equivalent behavior on Windows), which should prevent scenarios where we don't have at + // least one full valid file to recover from. + fs::rename(&self.tmp_file_path, &self.stable_file_path)?; + + Ok(self.checkpoints.len()) + } + + /// Write checkpoints to disk in the legacy format. Used for compatibility testing only. + #[cfg(test)] + pub fn write_legacy_checkpoints(&mut self) -> Result { fs::remove_dir_all(&self.directory).ok(); fs::create_dir_all(&self.directory)?; for (&fng, &pos) in self.checkpoints.iter() { @@ -103,62 +198,192 @@ impl Checkpointer { Ok(self.checkpoints.len()) } - pub fn read_checkpoints(&mut self, ignore_before: Option) { + /// Read persisted checkpoints from disk, preferring the new JSON file format but falling back + /// to the legacy system when those files are found instead. + pub fn read_checkpoints(&mut self, ignore_before: Option>) { + // First try reading from the tmp file location. If this works, it means that the previous + // process was interrupted in the process of checkpointing and the tmp file should contain + // more recent data that should be preferred. + match self.read_checkpoints_file(&self.tmp_file_path) { + Ok(state) => { + warn!(message = "Recovered checkpoint data from interrupted process."); + self.set_state(state, ignore_before); + + // Try to move this tmp file to the stable location so we don't immediately overwrite + // it when we next persist checkpoints. + if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path) { + warn!(message = "Error persisting recovered checkpoint file.", %error); + } + return; + } + Err(error) if error.kind() == io::ErrorKind::NotFound => { + // This is expected, so no warning needed + } + Err(error) => { + error!(message = "Unable to recover checkpoint data from interrupted process.", %error); + } + } + + // Next, attempt to read checkpoints from the stable file location. This is the + // expected location, so warn more aggressively if something goes wrong. + match self.read_checkpoints_file(&self.stable_file_path) { + Ok(state) => { + info!(message = "Loaded checkpoint data."); + self.set_state(state, ignore_before); + return; + } + Err(error) if error.kind() == io::ErrorKind::NotFound => { + // This is expected, so no warning needed + } + Err(error) => { + warn!(message = "Unable to load checkpoint data.", %error); + return; + } + } + + // If we haven't returned yet, go ahead and look for the legacy files and try to read them. + info!("Attempting to read legacy checkpoint files."); + self.read_legacy_checkpoints(ignore_before); + + if self.write_checkpoints().is_ok() { + fs::remove_dir_all(&self.directory).ok(); + } + } + + fn read_checkpoints_file(&self, path: &Path) -> Result { + let reader = io::BufReader::new(fs::File::open(path)?); + serde_json::from_reader(reader).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } + + fn read_legacy_checkpoints(&mut self, ignore_before: Option>) { for path in glob(&self.glob_string).unwrap().flatten() { + let mut mtime = None; if let Some(ignore_before) = ignore_before { if let Ok(Ok(modified)) = fs::metadata(&path).map(|metadata| metadata.modified()) { + let modified = DateTime::::from(modified); if modified < ignore_before { fs::remove_file(path).ok(); continue; } + mtime = Some(modified); } } let (fng, pos) = self.decode(&path); self.checkpoints.insert(fng, pos); + if let Some(mtime) = mtime { + self.modified_times.insert(fng, mtime); + } } } } #[cfg(test)] mod test { - use super::{Checkpointer, FileFingerprint, FilePosition}; + use super::{ + Checkpoint, Checkpointer, FileFingerprint, FilePosition, STABLE_FILE_NAME, TMP_FILE_NAME, + }; + use chrono::{Duration, Utc}; use tempfile::tempdir; #[test] fn test_checkpointer_basics() { - let fingerprint: FileFingerprint = 0x1234567890abcdef.into(); - let position: FilePosition = 1234; - let data_dir = tempdir().unwrap(); - let mut chkptr = Checkpointer::new(&data_dir.path()); - assert_eq!( - chkptr.decode(&chkptr.encode(fingerprint, position)), - (fingerprint, position) - ); - chkptr.set_checkpoint(fingerprint, position); - assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + let fingerprints = vec![ + FileFingerprint::DevInode(1, 2), + FileFingerprint::Checksum(3456), + FileFingerprint::FirstLineChecksum(78910), + FileFingerprint::Unknown(1337), + ]; + for fingerprint in fingerprints { + let position: FilePosition = 1234; + let data_dir = tempdir().unwrap(); + let mut chkptr = Checkpointer::new(&data_dir.path()); + assert_eq!( + chkptr.decode(&chkptr.encode(fingerprint, position)), + (fingerprint, position) + ); + chkptr.update_checkpoint(fingerprint, position); + assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + } } #[test] - fn test_checkpointer_restart() { - let fingerprint: FileFingerprint = 0x1234567890abcdef.into(); + fn test_checkpointer_expiration() { + let newer = ( + FileFingerprint::DevInode(1, 2), + Utc::now() - Duration::seconds(5), + ); + let newish = ( + FileFingerprint::Checksum(3456), + Utc::now() - Duration::seconds(10), + ); + let oldish = ( + FileFingerprint::FirstLineChecksum(78910), + Utc::now() - Duration::seconds(15), + ); + let older = ( + FileFingerprint::Unknown(1337), + Utc::now() - Duration::seconds(20), + ); + let ignore_before = Some(Utc::now() - Duration::seconds(12)); + let position: FilePosition = 1234; let data_dir = tempdir().unwrap(); + + // load and persist the checkpoints { let mut chkptr = Checkpointer::new(&data_dir.path()); - chkptr.set_checkpoint(fingerprint, position); - assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); - chkptr.write_checkpoints().ok(); + + for (fingerprint, modified) in &[&newer, &newish, &oldish, &older] { + chkptr.load_checkpoint(Checkpoint { + fingerprint: *fingerprint, + position, + modified: *modified, + }); + assert_eq!(chkptr.get_checkpoint(*fingerprint), Some(position)); + chkptr.write_checkpoints().unwrap(); + } } + + // read them back and assert old are removed { let mut chkptr = Checkpointer::new(&data_dir.path()); - assert_eq!(chkptr.get_checkpoint(fingerprint), None); - chkptr.read_checkpoints(None); - assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + chkptr.read_checkpoints(ignore_before); + + assert_eq!(chkptr.get_checkpoint(newish.0), Some(position)); + assert_eq!(chkptr.get_checkpoint(newer.0), Some(position)); + assert_eq!(chkptr.get_checkpoint(oldish.0), None); + assert_eq!(chkptr.get_checkpoint(older.0), None); + } + } + + #[test] + fn test_checkpointer_restart() { + let fingerprints = vec![ + FileFingerprint::DevInode(1, 2), + FileFingerprint::Checksum(3456), + FileFingerprint::FirstLineChecksum(78910), + FileFingerprint::Unknown(1337), + ]; + for fingerprint in fingerprints { + let position: FilePosition = 1234; + let data_dir = tempdir().unwrap(); + { + let mut chkptr = Checkpointer::new(&data_dir.path()); + chkptr.update_checkpoint(fingerprint, position); + assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + chkptr.write_checkpoints().ok(); + } + { + let mut chkptr = Checkpointer::new(&data_dir.path()); + assert_eq!(chkptr.get_checkpoint(fingerprint), None); + chkptr.read_checkpoints(None); + assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + } } } #[test] - fn test_checkpointer_upgrades() { + fn test_checkpointer_fingerprint_upgrades() { let new_fingerprint = FileFingerprint::DevInode(1, 2); let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.to_legacy()); let position: FilePosition = 1234; @@ -166,7 +391,7 @@ mod test { let data_dir = tempdir().unwrap(); { let mut chkptr = Checkpointer::new(&data_dir.path()); - chkptr.set_checkpoint(old_fingerprint, position); + chkptr.update_checkpoint(old_fingerprint, position); assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position)); chkptr.write_checkpoints().ok(); } @@ -181,4 +406,47 @@ mod test { assert_eq!(chkptr.get_checkpoint(old_fingerprint), None); } } + + #[test] + fn test_checkpointer_file_upgrades() { + let fingerprint = FileFingerprint::DevInode(1, 2); + let position: FilePosition = 1234; + + let data_dir = tempdir().unwrap(); + + // Write out checkpoints in the legacy file format + { + let mut chkptr = Checkpointer::new(&data_dir.path()); + chkptr.update_checkpoint(fingerprint, position); + assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + chkptr.write_legacy_checkpoints().unwrap(); + } + + // Ensure that the new files were not written but the old style of files were + assert_eq!(false, data_dir.path().join(TMP_FILE_NAME).exists()); + assert_eq!(false, data_dir.path().join(STABLE_FILE_NAME).exists()); + assert_eq!(true, data_dir.path().join("checkpoints").is_dir()); + + // Read from those old files, ensure the checkpoints were loaded properly, and then write + // them normally (i.e. in the new format) + { + let mut chkptr = Checkpointer::new(&data_dir.path()); + chkptr.read_checkpoints(None); + assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + chkptr.write_checkpoints().unwrap(); + } + + // Ensure that the stable file is present, the tmp file is not, and the legacy files have + // been cleaned up + assert_eq!(false, data_dir.path().join(TMP_FILE_NAME).exists()); + assert_eq!(true, data_dir.path().join(STABLE_FILE_NAME).exists()); + assert_eq!(false, data_dir.path().join("checkpoints").is_dir()); + + // Ensure one last time that we can reread from the new files and get the same result + { + let mut chkptr = Checkpointer::new(&data_dir.path()); + chkptr.read_checkpoints(None); + assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position)); + } + } } diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 453f92bb3a2ed..e07eb03bfa7bc 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -5,6 +5,7 @@ use crate::{ FileSourceInternalEvents, }; use bytes::Bytes; +use chrono::{DateTime, Utc}; use futures::{ executor::block_on, future::{select, Either}, @@ -37,7 +38,7 @@ where pub paths_provider: PP, pub max_read_bytes: usize, pub start_at_beginning: bool, - pub ignore_before: Option, + pub ignore_before: Option>, pub max_line_bytes: usize, pub data_dir: PathBuf, pub glob_minimum_cooldown: Duration, @@ -101,7 +102,8 @@ where existing_files.sort_by_key(|(path, _file_id)| { fs::metadata(&path) .and_then(|m| m.created()) - .unwrap_or_else(|_| time::SystemTime::now()) + .map(DateTime::::from) + .unwrap_or_else(|_| Utc::now()) }); checkpointer.maybe_upgrade(existing_files.iter().map(|(_, id)| id).cloned()); @@ -252,7 +254,7 @@ where if bytes_read > 0 { global_bytes_read = global_bytes_read.saturating_add(bytes_read); - checkpointer.set_checkpoint(file_id, watcher.get_file_position()); + checkpointer.update_checkpoint(file_id, watcher.get_file_position()); } else { // Should the file be removed if let Some(grace_period) = self.remove_after { diff --git a/lib/file-source/src/file_watcher.rs b/lib/file-source/src/file_watcher.rs index 5893e3e98d079..e3d8e7b9ba8ef 100644 --- a/lib/file-source/src/file_watcher.rs +++ b/lib/file-source/src/file_watcher.rs @@ -1,11 +1,12 @@ use crate::FilePosition; use bytes::{Bytes, BytesMut}; +use chrono::{DateTime, Utc}; use flate2::bufread::MultiGzDecoder; use std::{ fs::{self, File}, io::{self, BufRead, Seek}, path::PathBuf, - time::{Duration, Instant, SystemTime}, + time::{Duration, Instant}, }; use crate::metadata_ext::PortableFileExt; @@ -40,7 +41,7 @@ impl FileWatcher { pub fn new( path: PathBuf, file_position: FilePosition, - ignore_before: Option, + ignore_before: Option>, max_line_bytes: usize, ) -> Result { let f = fs::File::open(&path)?; @@ -48,9 +49,10 @@ impl FileWatcher { let metadata = f.metadata()?; let mut reader = io::BufReader::new(f); - let too_old = if let (Some(ignore_before), Ok(modified_time)) = - (ignore_before, metadata.modified()) - { + let too_old = if let (Some(ignore_before), Ok(modified_time)) = ( + ignore_before, + metadata.modified().map(DateTime::::from), + ) { modified_time < ignore_before } else { false @@ -314,7 +316,7 @@ mod test { let p = read_until_with_max_size(&mut buf, &mut pos, b'3', &mut v, 1000).unwrap(); assert_eq!(pos, 4); assert_eq!(p, None); - assert_eq!(&*v, []); + assert_eq!(&*v, [0; 0]); let mut buf = Cursor::new(&b"short\nthis is too long\nexact size\n11 eleven11\n"[..]); let mut pos = 0; @@ -332,6 +334,6 @@ mod test { let p = read_until_with_max_size(&mut buf, &mut pos, b'\n', &mut v, 10).unwrap(); assert_eq!(pos, 46); assert_eq!(p, None); - assert_eq!(&*v, []); + assert_eq!(&*v, [0; 0]); } } diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index 7c427156cff56..c079addc06db9 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -1,4 +1,5 @@ use crate::{metadata_ext::PortableFileExt, FileSourceInternalEvents}; +use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, fs::{self, File}, @@ -19,7 +20,7 @@ pub enum Fingerprinter { DevInode, } -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] pub enum FileFingerprint { Checksum(u64), FirstLineChecksum(u64), diff --git a/src/sources/file.rs b/src/sources/file.rs index 13a4730d2c975..d1242ad751665 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -9,6 +9,7 @@ use crate::{ Pipeline, }; use bytes::Bytes; +use chrono::Utc; use file_source::{ paths_provider::glob::{Glob, MatchOptions}, FileServer, Fingerprinter, @@ -24,7 +25,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use std::convert::TryInto; use std::path::PathBuf; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use tokio::task::spawn_blocking; #[derive(Debug, Snafu)] @@ -193,7 +194,7 @@ pub fn file_source( ) -> super::Source { let ignore_before = config .ignore_older - .map(|secs| SystemTime::now() - Duration::from_secs(secs)); + .map(|secs| Utc::now() - chrono::Duration::seconds(secs as i64)); let glob_minimum_cooldown = Duration::from_millis(config.glob_minimum_cooldown); let paths_provider = Glob::new(&config.include, &config.exclude, MatchOptions::default())