Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ New crate containing public type definitions for the notify and debouncer crates
- CHANGE: add `RecommendedCache`, which automatically enables the file ID cache on Windows and MacOS
and disables it on Linux, where it is not needed

- FIX: ordering of debounced events when multiple files are modified and renamed (eg. during a safe save performed by Blender)

## debouncer-full 0.3.1 (2023-08-21)

- CHANGE: remove serde binary experiment opt-out after it got removed [#530]
Expand Down
182 changes: 131 additions & 51 deletions notify-debouncer-full/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ struct Queue {

impl Queue {
fn was_created(&self) -> bool {
self.events.front().map_or(false, |event| {
self.events.front().is_some_and(|event| {
matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
Expand All @@ -162,7 +162,7 @@ impl Queue {
}

fn was_removed(&self) -> bool {
self.events.front().map_or(false, |event| {
self.events.front().is_some_and(|event| {
matches!(
event.kind,
EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
Expand All @@ -171,9 +171,48 @@ impl Queue {
}
}

#[derive(Debug)]
pub struct BlockEntry {
pub blocker_path: PathBuf,
pub blocker_time: Instant,
pub blockee_path: PathBuf,
}

#[derive(Debug, Default)]
pub struct BlockManager {
entries: Vec<BlockEntry>,
}

impl BlockManager {
pub fn new() -> BlockManager {
BlockManager {
entries: Vec::new(),
}
}

pub fn add_blocker(&mut self, entry: BlockEntry) {
self.entries.push(entry);
}

pub fn remove_blocker(&mut self, path: &PathBuf, time: Instant) {
self.entries
.retain(|entry| entry.blocker_path != *path || entry.blocker_time != time);
}

pub fn is_blocked_by(&self, path: &PathBuf) -> Option<(&PathBuf, Instant)> {
for entry in &self.entries {
if entry.blockee_path == *path {
return Some((&entry.blocker_path, entry.blocker_time));
}
}
None
}
}

#[derive(Debug)]
pub(crate) struct DebounceDataInner<T> {
queues: HashMap<PathBuf, Queue>,
blocking: BlockManager,
roots: Vec<(PathBuf, RecursiveMode)>,
cache: T,
rename_event: Option<(DebouncedEvent, Option<FileId>)>,
Expand All @@ -186,6 +225,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
pub(crate) fn new(cache: T, timeout: Duration) -> Self {
Self {
queues: HashMap::new(),
blocking: BlockManager::new(),
roots: Vec::new(),
cache,
rename_event: None,
Expand All @@ -195,11 +235,17 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
}

fn contains_event(&self, path: &PathBuf, time: Instant) -> bool {
self.queues
.get(path)
.is_some_and(|queue| queue.events.iter().any(|event| event.time == time))
}

/// Retrieve a vec of debounced events, removing them if not continuous
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
let now = Instant::now();
let mut events_expired = Vec::with_capacity(self.queues.len());
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
let events_count = self.queues.values().map(|queue| queue.events.len()).sum();
let mut events_expired = Vec::with_capacity(events_count);

if let Some(event) = self.rescan_event.take() {
if now.saturating_duration_since(event.time) >= self.timeout {
Expand All @@ -210,48 +256,63 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
}

// TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
for (path, mut queue) in self.queues.drain() {
let mut kind_index = HashMap::new();

while let Some(event) = queue.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.timeout {
// remove previous event of the same kind
if let Some(idx) = kind_index.get(&event.kind).copied() {
events_expired.remove(idx);

kind_index.values_mut().for_each(|i| {
if *i > idx {
*i -= 1
}
})
}
let mut kind_index: HashMap<PathBuf, HashMap<EventKind, usize>> = HashMap::new();

kind_index.insert(event.kind, events_expired.len());
while let Some(path) = self
.queues
// iterate over all queues
.iter()
// get the first event of every queue
.filter_map(|(path, queue)| queue.events.front().map(|event| (path, event.time)))
// filter out all blocked events
.filter(|(path, _)| {
self.blocking
.is_blocked_by(path)
.map_or(true, |(path, time)| !self.contains_event(path, time))
})
// get the event with the earliest timestamp
.min_by_key(|(_, time)| *time)
// get the path of the event
.map(|(path, _)| path.clone())
{
// unwraps are safe because only paths for existing queues with at least one event are returned by the query above
let event = self
.queues
.get_mut(&path)
.unwrap()
.events
.pop_front()
.unwrap();

events_expired.push(event);
} else {
queue.events.push_front(event);
break;
if now.saturating_duration_since(event.time) >= self.timeout {
// remove previous event of the same kind
let kind_index = kind_index.entry(path.clone()).or_default();
if let Some(idx) = kind_index.get(&event.kind).copied() {
events_expired.remove(idx);

kind_index.values_mut().for_each(|i| {
if *i > idx {
*i -= 1
}
})
}
}
kind_index.insert(event.kind, events_expired.len());

if !queue.events.is_empty() {
queues_remaining.insert(path, queue);
self.blocking.remove_blocker(&path, event.time);

events_expired.push(event);
} else {
self.queues.get_mut(&path).unwrap().events.push_front(event); // unwrap is safe because only paths for existing queues are returned by the query above

break;
}
}

self.queues = queues_remaining;
self.queues.retain(|_, queue| !queue.events.is_empty());

// order events for different files chronologically, but keep the order of events for the same file
events_expired.sort_by(|event_a, event_b| {
// use the last path because rename events are emitted for the target path
if event_a.paths.last() == event_b.paths.last() {
std::cmp::Ordering::Equal
} else {
event_a.time.cmp(&event_b.time)
}
});
if self.queues.is_empty() {
self.blocking.entries.clear();
}

events_expired
}
Expand Down Expand Up @@ -426,18 +487,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
source_queue.events.remove(remove_index);
}

// split off remove or move out event and add it back to the events map
if source_queue.was_removed() {
let event = source_queue.events.pop_front().unwrap();

self.queues.insert(
event.paths[0].clone(),
Queue {
events: [event].into(),
},
);
}

// update paths
for e in &mut source_queue.events {
e.paths = vec![event.paths[0].clone()];
Expand All @@ -456,7 +505,12 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}

if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
if !target_queue.was_created() {
if target_queue.was_removed() {
let event = target_queue.events.pop_front().unwrap(); // unwrap is safe because `was_removed` implies that the queue is not empty
source_queue.events.push_front(event);
}

if !target_queue.was_created() && !source_queue.was_removed() {
let mut remove_event = DebouncedEvent {
event: Event {
kind: EventKind::Remove(RemoveKind::Any),
Expand All @@ -474,6 +528,8 @@ impl<T: FileIdCache> DebounceDataInner<T> {
} else {
self.queues.insert(event.paths[0].clone(), source_queue);
}

self.find_blocked_events(&event.paths[0]);
}

fn push_remove_event(&mut self, event: Event, time: Instant) {
Expand Down Expand Up @@ -519,6 +575,25 @@ impl<T: FileIdCache> DebounceDataInner<T> {
);
}
}

fn find_blocked_events(&mut self, path: &Path) {
for queue in self.queues.values_mut() {
for event in &mut queue.events {
if matches!(
event.event.kind,
EventKind::Modify(ModifyKind::Name(RenameMode::Both))
) && event.event.paths[0] == path
{
self.blocking.add_blocker(BlockEntry {
blocker_path: event.event.paths[1].clone(),
blocker_time: event.time,
blockee_path: path.to_path_buf(),
});
break;
}
}
}
}
}

/// Debouncer guard, stops the debouncer on drop.
Expand Down Expand Up @@ -756,6 +831,11 @@ mod tests {
"add_remove_parent_event_after_remove_child_event",
"add_errors",
"emit_continuous_modify_content_events",
"emit_create_event_after_safe_save_and_backup_override",
"emit_create_event_after_safe_save_and_backup_rotation_twice",
"emit_create_event_after_safe_save_and_backup_rotation",
"emit_create_event_after_safe_save_and_double_move",
"emit_create_event_after_safe_save_and_double_move_and_recreate",
"emit_events_in_chronological_order",
"emit_events_with_a_prepended_rename_event",
"emit_close_events_only_once",
Expand Down
3 changes: 2 additions & 1 deletion notify-debouncer-full/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use notify::{
Error, ErrorKind, Event, EventKind, RecursiveMode,
};

use crate::{DebounceDataInner, DebouncedEvent, FileIdCache, Queue};
use crate::{BlockManager, DebounceDataInner, DebouncedEvent, FileIdCache, Queue};

pub(crate) use schema::TestCase;

Expand Down Expand Up @@ -268,6 +268,7 @@ impl schema::State {

DebounceDataInner {
queues,
blocking: BlockManager::new(),
roots: Vec::new(),
cache,
rename_event,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Based on the Blender safe save scenario.
//
// In this scenario the backup file is not removed first.
{
state: {}
events: [
{ kind: "create-any", paths: ["/watch/file@"], time: 1 }
{ kind: "rename-from", paths: ["/watch/file"], tracker: 1, time: 3 }
{ kind: "rename-to", paths: ["/watch/file1"], tracker: 1, time: 4 }
{ kind: "rename-from", paths: ["/watch/file@"], tracker: 2, time: 5 }
{ kind: "rename-to", paths: ["/watch/file"], tracker: 2, time: 6 }
]
expected: {
queues: {
/watch/file: {
events: [
{ kind: "create-any", paths: ["*"], time: 1 }
]
}
/watch/file1: {
events: [
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
]
}
}
events: {
long: [
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
{ kind: "create-any", paths: ["/watch/file"], time: 1 }
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// https://github.com/notify-rs/notify/issues/587
//
// Blender causes the following events when saving a file:
//
// create test.blend@ (new content)
// delete test.blend1 (delete backup)
// rename test.blend -> test.blend1 (move current to backup)
// rename test.blend@ -> test.blend (move new to current)
{
state: {}
events: [
{ kind: "create-any", paths: ["/watch/file@"], time: 1 }
{ kind: "remove-any", paths: ["/watch/file1"], time: 2 }
{ kind: "rename-from", paths: ["/watch/file"], tracker: 1, time: 3 }
{ kind: "rename-to", paths: ["/watch/file1"], tracker: 1, time: 4 }
{ kind: "rename-from", paths: ["/watch/file@"], tracker: 2, time: 5 }
{ kind: "rename-to", paths: ["/watch/file"], tracker: 2, time: 6 }
]
expected: {
queues: {
/watch/file: {
events: [
{ kind: "create-any", paths: ["*"], time: 1 }
]
}
/watch/file1: {
events: [
{ kind: "remove-any", paths: ["*"], time: 2 }
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
]
}
}
events: {
long: [
{ kind: "remove-any", paths: ["/watch/file1"], time: 2 }
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
{ kind: "create-any", paths: ["/watch/file"], time: 1 }
]
}
}
}
Loading