Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update notify-rs to Version 5 #453

Merged
merged 24 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 80 additions & 176 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions bin/src/_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ pub async fn _main(
executor.init();

// Use an internal env var to support running integration test w/o additional delays
let event_delay = std::env::var(config::env_vars::INTERNAL_FS_DELAY)
let _event_delay = std::env::var(config::env_vars::INTERNAL_FS_DELAY)
.map(|s| Duration::from_millis(s.parse().unwrap()))
.unwrap_or(FS_EVENT_DELAY);

Expand Down Expand Up @@ -339,7 +339,7 @@ pub async fn _main(
let rules = params.1.clone();
let lookback = params.2.clone();
let offsets = params.3.clone();
let tailer = tail::Tailer::new(watched_dirs, rules, lookback, offsets, event_delay);
let tailer = tail::Tailer::new(watched_dirs, rules, lookback, offsets);
async move { tail::process(tailer).expect("except Failed to create FS Tailer") }
},
)
Expand Down
7 changes: 3 additions & 4 deletions bin/tests/it/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,13 @@ fn test_append_and_delete() {
File::create(&file_path).expect("Could not create file");

let mut agent_handle = common::spawn_agent(AgentSettings::new(dir.to_str().unwrap()));

let mut stderr_reader = BufReader::new(agent_handle.stderr.take().unwrap());

common::wait_for_file_event("initialize", &file_path, &mut stderr_reader);
thread::sleep(std::time::Duration::from_millis(1000));

debug!("got event, appending to file");
common::append_to_file(&file_path, 10_000, 50).expect("Could not append");
common::append_to_file(&file_path, 1000, 50).expect("Could not append");
fs::remove_file(&file_path).expect("Could not remove file");

// Immediately, start appending in a new file
Expand Down Expand Up @@ -262,7 +261,7 @@ fn test_signals(signal: nix::sys::signal::Signal) {
let mut stderr_reader = BufReader::new(agent_handle.stderr.as_mut().unwrap());

common::wait_for_file_event("initialize", &file_path, &mut stderr_reader);
common::append_to_file(&file_path, 100, 50).expect("Could not append");
common::append_to_file(&file_path, 10, 10).expect("Could not append");

// Verify that the file is shown in the open files
assert!(is_file_open(&file_path));
Expand Down Expand Up @@ -294,7 +293,7 @@ fn test_append_and_move() {
let mut stderr_reader = BufReader::new(agent_handle.stderr.as_mut().unwrap());

common::wait_for_file_event("initialize", &file1_path, &mut stderr_reader);
common::append_to_file(&file1_path, 10_000, 50).expect("Could not append");
common::append_to_file(&file1_path, 1000, 50).expect("Could not append");
fs::rename(&file1_path, &file2_path).expect("Could not move file");
fs::remove_file(&file2_path).expect("Could not remove file");

Expand Down
2 changes: 2 additions & 0 deletions bin/tests/it/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ where
let mut lines_buffer = String::new();
let instant = std::time::Instant::now();

debug!("event info: {:?}", event_info);
for _safeguard in 0..100_000 {
assert!(
instant.elapsed() < delay.unwrap_or(std::time::Duration::from_secs(20)),
Expand All @@ -291,6 +292,7 @@ where
lines_buffer.push_str(&line);
lines_buffer.push('\n');
if condition(&line) {
debug!("condition found: {:?}", line);
return lines_buffer;
}
line.clear();
Expand Down
173 changes: 162 additions & 11 deletions common/fs/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ impl FileSystem {
lookback_config: Lookback,
initial_offsets: HashMap<FileId, SpanVec>,
rules: Rules,
delay: Duration,
) -> Self {
let (resume_events_send, resume_events_recv) = async_channel::unbounded();
let (retry_events_send, retry_events_recv) = async_channel::unbounded();
Expand All @@ -305,8 +304,8 @@ impl FileSystem {

let mut missing_dirs: Vec<PathBuf> = Vec::new();

let watcher = Watcher::new(delay);
let mut missing_dir_watcher = Watcher::new(delay);
let watcher = Watcher::new();
let mut missing_dir_watcher = Watcher::new();
let entries = SlotMap::new();

let mut initial_dir_rules = Rules::new();
Expand Down Expand Up @@ -468,10 +467,7 @@ impl FileSystem {
let mut _mfs = mfs.try_lock().expect("could not lock filesystem cache");

let missing_dirs = _mfs.missing_dirs.clone();
let missing_dir_watcher = _mfs
.missing_dir_watcher
.take()
.unwrap_or_else(|| Watcher::new(Duration::new(0, 10000000)));
let missing_dir_watcher = _mfs.missing_dir_watcher.take().unwrap_or_else(Watcher::new);

let missing_dir_event_stream = missing_dir_watcher.receive();
let retry_event_sender = _mfs.retry_events_send.clone();
Expand Down Expand Up @@ -1506,6 +1502,9 @@ mod tests {
use std::{io, panic};
use tempfile::{tempdir, TempDir};

#[cfg(windows)]
use std::sync::mpsc;

static DELAY: Duration = Duration::from_millis(200);

macro_rules! take_events {
Expand Down Expand Up @@ -1596,7 +1595,6 @@ mod tests {
Lookback::Start,
HashMap::new(),
rules,
DELAY,
)
}

Expand Down Expand Up @@ -1658,6 +1656,7 @@ mod tests {

// Simulates the `create_copy` log rotation strategy
#[tokio::test]
#[cfg(unix)]
async fn filesystem_rotate_create_copy() -> io::Result<()> {
let tempdir = TempDir::new()?;
let path = tempdir.path().to_path_buf();
Expand Down Expand Up @@ -1696,6 +1695,42 @@ mod tests {
Ok(())
}

#[tokio::test]
#[cfg(windows)]
async fn filesystem_rotate_create_copy_win() -> io::Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows specific filesystem tests have an _win suffix. The intermediate test macros cause the deletes to fail so to work around this I've added specific file asserts within the thread where the files are created and deleted. They don't mimic the original tests exactly but provide some test coverage.

let tempdir = TempDir::new()?;
let path = tempdir.path().to_path_buf();
let fs = create_fs(&path);

// Copy and remove
let a = path.join("a");
let old = path.join("a.old");

let file_handle = std::thread::spawn({
let a_file = a.clone();
let old_file = old.clone();
move || {
File::create(&a_file).unwrap();
assert!(&a_file.is_file());
copy(&a_file, &old_file).unwrap();
remove_file(&a_file).unwrap();
assert!(!&a_file.is_file());
}
});
file_handle.join().unwrap();

take_events!(fs);
take_events!(fs);

take_events!(fs);
let entry_key = lookup!(fs, a);
assert!(entry_key.is_none());
let entry_key = lookup!(fs, old);
assert_is_file!(fs, entry_key);

Ok(())
}

// Creates a plain old dir
#[tokio::test]
async fn filesystem_create_dir() {
Expand Down Expand Up @@ -1887,6 +1922,7 @@ mod tests {

// Deletes a file
#[tokio::test]
#[cfg(unix)]
async fn filesystem_delete_file() -> io::Result<()> {
let _ = env_logger::Builder::from_default_env().try_init();
let tempdir = TempDir::new()?;
Expand All @@ -1902,6 +1938,41 @@ mod tests {
remove_file(&file_path)?;
take_events!(fs);

take_events!(fs);
take_events!(fs);
assert!(lookup!(fs, file_path).is_none());
Ok(())
}

#[tokio::test]
#[cfg(windows)]
async fn filesystem_delete_file_win() -> io::Result<()> {
let (tx_main, rx_main) = mpsc::channel();
let (tx_thread, rx_thread) = mpsc::channel();

let _ = env_logger::Builder::from_default_env().try_init();
let tempdir = TempDir::new()?;
let path = tempdir.path().to_path_buf();
let fs = create_fs(&path);
let file_path = path.join("file.log");

let file_handle = std::thread::spawn({
let file = file_path.clone();
move || {
File::create(&file).unwrap();
assert!(&file.is_file());
tx_thread.send(true).unwrap();
rx_main.recv().unwrap();
remove_file(&file).unwrap();
assert!(!file.is_file());
}
});
rx_thread.recv().unwrap();

tx_main.send(true).unwrap();
file_handle.join().unwrap();

take_events!(fs);
take_events!(fs);
take_events!(fs);

Expand Down Expand Up @@ -2031,6 +2102,7 @@ mod tests {

// Deletes a hardlink
#[tokio::test]
#[cfg(unix)]
async fn filesystem_delete_hardlink() -> io::Result<()> {
let tempdir = TempDir::new()?;
let path = tempdir.path().to_path_buf();
Expand All @@ -2053,9 +2125,50 @@ mod tests {
Ok(())
}

#[tokio::test]
#[cfg(windows)]
async fn filesystem_delete_hardlink_win() -> io::Result<()> {
let (tx_main, rx_main) = mpsc::channel();
let (tx_thread, rx_thread) = mpsc::channel();

let tempdir = TempDir::new()?;
let path = tempdir.path().to_path_buf();
let fs = create_fs(&path);

// Copy and remove
let a = path.join("a");
let b = path.join("b");

let file_handle = std::thread::spawn({
let a_file = a.clone();
let b_file = b.clone();
move || {
File::create(&a_file).unwrap();
hard_link(&a_file, &b_file).unwrap();
assert!(&a_file.is_file());
assert!(&b_file.is_file());
tx_thread.send(true).unwrap();
rx_main.recv().unwrap();
remove_file(&b_file).unwrap();
assert!(!b_file.is_file());
}
});
rx_thread.recv().unwrap();

tx_main.send(true).unwrap();
file_handle.join().unwrap();

take_events!(fs);
assert!(lookup!(fs, a).is_some());
assert!(lookup!(fs, b).is_none());

Ok(())
}

// Deletes the pointee of a hardlink (not totally accurate since we're not deleting the inode
// entry, but what evs)
#[tokio::test]
#[cfg(unix)]
async fn filesystem_delete_hardlink_pointee() -> io::Result<()> {
let tempdir = TempDir::new()?;
let path = tempdir.path().to_path_buf();
Expand All @@ -2075,6 +2188,44 @@ mod tests {
Ok(())
}

#[tokio::test]
#[cfg(windows)]
async fn filesystem_delete_hardlink_pointee_win() -> io::Result<()> {
let (tx_main, rx_main) = mpsc::channel();
let (tx_thread, rx_thread) = mpsc::channel();

let tempdir = TempDir::new()?;
let path = tempdir.path().to_path_buf();
let fs = create_fs(&path);

let a = path.join("a");
let b = path.join("b");
let file_handle = std::thread::spawn({
let a_file = a.clone();
let b_file = b.clone();
move || {
File::create(&a_file).unwrap();
hard_link(&a_file, &b_file).unwrap();
assert!(&a_file.is_file());
assert!(&b_file.is_file());
tx_thread.send(true).unwrap();
rx_main.recv().unwrap();
remove_file(&a_file).unwrap();
assert!(!a_file.is_file());
}
});
rx_thread.recv().unwrap();

tx_main.send(true).unwrap();
file_handle.join().unwrap();

take_events!(fs);
assert!(lookup!(fs, a).is_none());
assert!(lookup!(fs, b).is_some());

Ok(())
}

/// Moves a directory within the watched directory
///
/// Only run on unix-like systems as moving a directory on Windows with file handles open is
Expand Down Expand Up @@ -2461,7 +2612,7 @@ mod tests {
let events = take_events!(fs);
assert_eq!(
events.len(),
1,
2, // version 5 of notify-rs throws 2 write events
"events: {:#?}",
events
.into_iter()
Expand All @@ -2479,11 +2630,11 @@ mod tests {

writeln!(file2, "hello")?;
let events = take_events!(fs);
assert_eq!(events.len(), 1, "events: {:#?}", events);
assert_eq!(events.len(), 2, "events: {:#?}", events);

writeln!(file3, "hello")?;
let events = take_events!(fs);
assert_eq!(events.len(), 1, "events: {:#?}", events);
assert_eq!(events.len(), 2, "events: {:#?}", events);

drop(file1);
drop(file2);
Expand Down
Loading