Skip to content

Commit

Permalink
[libprocess] Add io::Watcher for fs notifications.
Browse files Browse the repository at this point in the history
Adds basic watcher class for filesystem watch notifications. We
currently only support Linux with inotify.

We currently support inotify events for writing, deleting, and renaming
a file. We do not support watching directories.

Review: https://reviews.apache.org/r/75182/
  • Loading branch information
ZhouJas authored and bmahler committed Aug 20, 2024
1 parent 8ff65de commit 23703fb
Show file tree
Hide file tree
Showing 3 changed files with 531 additions and 0 deletions.
120 changes: 120 additions & 0 deletions 3rdparty/libprocess/include/process/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <string>

#include <process/future.hpp>
#include <process/queue.hpp>

#include <stout/nothing.hpp>
#ifdef __WINDOWS__
Expand Down Expand Up @@ -166,6 +167,125 @@ Future<Nothing> redirect(
size_t chunk = 4096,
const std::vector<lambda::function<void(const std::string&)>>& hooks = {});


// Forward declarations.
class Watcher;
namespace testing {
Future<Nothing> watcher_read_loop(Watcher w);
} // namespace testing {


// This provides a high level interface for cross-platform filesystem watch
// notifications. Currently, only Linux is supported via inotify, but macOS
// BSD, and Windows implementations can be added.
//
// On Linux, inotify provides a vast set of features and comes with a vast
// amount of subtleties to deal with and providing a cross-platform filesystem
// watcher while exposing all these subtleties is quite challenging. Therefore,
// our initial implementation only provides basic functionality in order to
// simplify the life of the user, and to make cross platform implementation
// viable.
//
// TODO(bmahler): Add support for directories.
class Watcher
{
public:
struct Event
{
// Path to the file for the event. In the case of a Failure event type,
// this will be a failure message instead.
std::string path;

// TODO(bmahler): Add more events (e.g. access events, close events,
// attribute changes).
enum {
// The read loop encountered a unrecoverable failure, the watcher is
// no longer running and the caller must create a new watcher if desired!
Failure,

// File was modified, note that more writes may follow.
Write,

// The path was removed; any watches on it will be removed.
// Some "remove" operations may trigger a Rename if the file is
// actually moved (for example "remove to trash" is often a rename).
Remove,

// The path was renamed to something else; any watches on it will be
// removed.
Rename,
} type;
};

// Adds the file for event monitoring.
//
// Returns an error if:
// * we don't have read access to the provided path
// * the path has already been watched (and not implicitly or
// explicitly removed)
// * the path doesn't exist
// * the path is a directory (not currently supported)
//
// In order for the caller to not miss any updates to the file, you
// *must* read the file yourself after calling add(). Otherwise, if
// you were to read the file first, updates between reading the file
// and add() the file will be missed!
Try<Nothing> add(const std::string& path);

// Removes the file for event monitoring, removing an already removed
// file is a no-op and also returns Nothing.
Try<Nothing> remove(const std::string& path);

Queue<Event> events();

private:
friend Try<Watcher> create_watcher();
friend Future<Nothing> testing::watcher_read_loop(Watcher w);

Watcher(int inotify_fd);

// Start the inotify read loop.
void run();

struct Data
{
Data() = default;

~Data();

// Rather than use a process to serialize access to the queue's
// internal data we use a 'std::atomic_flag' which will spin lock.
std::atomic_flag lock = ATOMIC_FLAG_INIT;

// We need a bidirectional mapping between watch descriptors and
// the path the watch descriptor maps to.
hashmap<int, std::string> wd_to_path;
hashmap<std::string, int> path_to_wd;

process::Future<Nothing> read_loop;

// Queue is already thread safe and doesn't require locking.
Queue<Watcher::Event> events;
};

const int inotify_fd;
std::shared_ptr<Data> data;
};


// Creates a watcher that can be used to monitor for fs changes.
Try<Watcher> create_watcher();


namespace testing {

// Exposed to test read loop discard.
inline Future<Nothing> watcher_read_loop(Watcher w)
{
return w.data->read_loop;
}

} // namespace testing {
} // namespace io {
} // namespace process {

Expand Down
213 changes: 213 additions & 0 deletions 3rdparty/libprocess/src/posix/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@
// See the License for the specific language governing permissions and
// limitations under the License

#ifdef __linux__
#include <memory>

#include <sys/inotify.h>
#endif

#include <process/future.hpp>
#include <process/io.hpp>
#include <process/loop.hpp>

#include <stout/error.hpp>
#include <stout/hashmap.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
Expand All @@ -26,6 +33,11 @@

#include "io_internal.hpp"

using std::default_delete;
using std::shared_ptr;
using std::string;
using std::weak_ptr;

namespace process {
namespace io {
namespace internal {
Expand Down Expand Up @@ -136,5 +148,206 @@ Try<bool> is_async(int_fd fd)
}

} // namespace internal {

#ifdef __linux__

Watcher::Watcher(int inotify_fd) : inotify_fd(inotify_fd), data(new Data()) {}


Watcher::Data::~Data()
{
// When the last reference of data goes away, we discard the read loop
// which should ensure that the loop future transitions, at which point
// the inotify fd will get closed.
read_loop.discard();
}


void Watcher::run()
{
// For now, we only use a small buffer that is sufficient for reading
// *at least* 32 events, but the caller may want to customize the buffer
// size depending on the expected volume of events.
size_t buffer_size = 32 * (sizeof(inotify_event) + NAME_MAX + 1);
shared_ptr<char> buffer(new char[buffer_size], default_delete<char[]>());

// We take a weak pointer here to avoid keeping Data alive forever, when
// the caller throws away the last Watcher copy, we want Data to be destroyed
// and the loop to be stopped.
weak_ptr<Data> weak_data = data;
int fd = inotify_fd;
data->read_loop = loop(
[fd, buffer, buffer_size]() {
return io::read(fd, buffer.get(), buffer_size);
},
[weak_data, buffer](size_t read) -> Future<ControlFlow<Nothing>> {
if (read == 0) {
return Failure("Unexpected EOF");
}

// If we can't get the shared pointer, Data is destroyed and we
// need to stop the loop.
shared_ptr<Data> data = weak_data.lock();
if (!data) {
return Break();
}

size_t offset = 0;
for (offset = 0; offset <= read - sizeof(inotify_event);) {
inotify_event* event = (inotify_event*) &(buffer.get()[offset]);
offset += sizeof(inotify_event) + event->len;

if (event-> mask & IN_Q_OVERFLOW) {
return Failure("inotify event overflow");
}

// For IN_IGNORED generated by inotify_rm_watch, we've already
// remove the path from our tracking maps. For other cases of
// IN_IGNORED (e.g. IN_DELETE_SELF, IN_UNMOUNT, etc), we remove the
// path during those events rather than the subsequent IN_IGNORED.
if (event->mask & IN_IGNORED) {
continue;
}

Event e;

if (event->mask & IN_MODIFY) {
e.type = Event::Write;
}
if (event->mask & IN_MOVE_SELF) {
e.type = Event::Rename;
}
if (event->mask & IN_DELETE_SELF || event->mask & IN_UNMOUNT) {
e.type = Event::Remove;
}

synchronized (data->lock) {
Option<string> path = data->wd_to_path.get(event->wd);

if (path.isNone()) {
continue; // Unknown watch, likely we just removed this watch.
}

e.path = std::move(*path);

if (event->mask & IN_MOVE_SELF
|| event->mask & IN_DELETE_SELF
|| event->mask & IN_UNMOUNT) {
data->wd_to_path.erase(event->wd);
data->path_to_wd.erase(*path);
}

data->events.put(std::move(e));
}
}

if (offset != read) {
return Failure("Unexpected partial read from inotify");
}

return Continue();
});

data->read_loop
.onFailed([weak_data](const string& message) {
shared_ptr<Data> data = weak_data.lock();
if (data) {
Watcher::Event e;
e.type = Watcher::Event::Failure;
e.path = message;
data->events.put(e);
}
});

// We need to close the inotify fd whenever the loop stops.
data->read_loop
.onAny([fd]() {
::close(fd);
});
}


Try<Nothing> Watcher::add(const string& path)
{
// Since we only currently support watching a file (not directories),
// and we're only interested in modifications to the file contents,
// we only need to watch for the following relevant events:
int mask = IN_MODIFY | IN_DELETE_SELF | IN_MOVE_SELF;

#ifdef IN_MASK_CREATE
// Fail with EEXIST if a watch already exists. This ensures that new
// watches don't modify existing ones, either because the path gets
// watched multiple times, or when two paths refer to the same inode.
mask = mask | IN_MASK_CREATE;
#endif


if (os::stat::isdir(path)) {
return Error("Directories are not supported");
}

synchronized (data->lock) {
if (data->path_to_wd.get(path).isSome()) {
return Error("Path is already added");
}

int wd = inotify_add_watch(inotify_fd, path.c_str(), mask);
if (wd < 0) {
return ErrnoError("Failed to inotify_add_watch");
}

data->wd_to_path[wd] = path;
data->path_to_wd[path] = wd;
}

return Nothing();
}


Try<Nothing> Watcher::remove(const string& path)
{
Option<int> wd;
synchronized (data->lock) {
wd = data->path_to_wd.get(path);
if (wd.isNone()) {
// Note that the path may have been implicitly removed via the
// read loop when the file gets removed. Should we treat this as
// a failure? Or a no-op?
return Nothing();
}

data->wd_to_path.erase(*wd);
data->path_to_wd.erase(path);
}

// Note that removing the watch will trigger an IN_IGNORED event.
if (inotify_rm_watch(inotify_fd, *wd) < 0) {
return ErrnoError("Failed to inotify_rm_watch");
}

return Nothing();
}


Queue<Watcher::Event> Watcher::events()
{
return data->events;
}


Try<Watcher> create_watcher()
{
int inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
if (inotify_fd < 0) {
return ErrnoError("Failed to inotify_init1");
}

Watcher watcher(inotify_fd);
watcher.run();
return watcher;
}

#endif // __linux__

} // namespace io {
} // namespace process {
Loading

0 comments on commit 23703fb

Please sign in to comment.