From 5fed35ddfeca21697d59f49dd6a0794662cc59cf Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Sat, 23 Mar 2024 14:12:44 +0100 Subject: [PATCH] Implement DirectoryEvent Flow.Publisher and Flow.Subscriber Consumers can asynchronously subscribe to directory events with the Flow components. Ref: #1798 --- .../io_watcher/DirectoryEventPublisher.java | 42 +++++++++++++++ .../io_watcher/DirectoryEventSubscriber.java | 53 +++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 common/src/main/java/bisq/common/io_watcher/DirectoryEventPublisher.java create mode 100644 common/src/main/java/bisq/common/io_watcher/DirectoryEventSubscriber.java diff --git a/common/src/main/java/bisq/common/io_watcher/DirectoryEventPublisher.java b/common/src/main/java/bisq/common/io_watcher/DirectoryEventPublisher.java new file mode 100644 index 0000000000..356361a128 --- /dev/null +++ b/common/src/main/java/bisq/common/io_watcher/DirectoryEventPublisher.java @@ -0,0 +1,42 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.common.io_watcher; + +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchService; +import java.util.Set; +import java.util.concurrent.Flow; + +public class DirectoryEventPublisher implements Flow.Publisher { + private final WatchService watchService; + private final Path directoryPath; + private final Set> watchEventKinds; + + public DirectoryEventPublisher(WatchService watchService, Path directoryPath, Set> watchEventKinds) { + this.watchService = watchService; + this.directoryPath = directoryPath; + this.watchEventKinds = watchEventKinds; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + var subscription = new DirectoryEventSubscription(subscriber, watchService, directoryPath, watchEventKinds); + subscriber.onSubscribe(subscription); + } +} diff --git a/common/src/main/java/bisq/common/io_watcher/DirectoryEventSubscriber.java b/common/src/main/java/bisq/common/io_watcher/DirectoryEventSubscriber.java new file mode 100644 index 0000000000..a7845e6c86 --- /dev/null +++ b/common/src/main/java/bisq/common/io_watcher/DirectoryEventSubscriber.java @@ -0,0 +1,53 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.common.io_watcher; + +import java.nio.file.Path; +import java.util.concurrent.Flow; +import java.util.function.Consumer; + +public class DirectoryEventSubscriber implements Flow.Subscriber { + private final Consumer consumer; + private Flow.Subscription subscription; + + public DirectoryEventSubscriber(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(Path path) { + consumer.accept(path); + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + throw new RuntimeException(throwable); + } + + @Override + public void onComplete() { + throw new UnsupportedOperationException(); + } +}