Skip to content

Commit

Permalink
spring-projectsGH-3557: Add maxDepth, dirPredicate to FileReadMS
Browse files Browse the repository at this point in the history
Fixes spring-projects#3557

* Expose a `watchMaxDepth` on the `FileReadingMessageSource` for its `Files.walkFileTree()` API usage
* Add `watchDirPredicate` option ot the `FileReadingMessageSource` to skip sub-tree for `Files.walkFileTree()`
scanning according to some condition against directory `Path`
  • Loading branch information
artembilan committed Apr 13, 2023
1 parent aaaa489 commit ab72795
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 77 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
Expand All @@ -29,6 +30,7 @@
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand All @@ -39,6 +41,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Matcher;

import org.springframework.context.Lifecycle;
Expand Down Expand Up @@ -122,6 +125,10 @@ public class FileReadingMessageSource extends AbstractMessageSource<File> implem

private WatchEventType[] watchEvents = {WatchEventType.CREATE};

private int watchMaxDepth = Integer.MAX_VALUE;

private Predicate<Path> watchDirPredicate = path -> true;

/**
* Create a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
*/
Expand Down Expand Up @@ -237,15 +244,14 @@ public void setLocker(FileLocker locker) {
* Set this flag if you want to make sure the internal queue is
* refreshed with the latest content of the input directory on each poll.
* <p>
* By default this implementation will empty its queue before looking at the
* By default, this implementation will empty its queue before looking at the
* directory again. In cases where order is relevant it is important to
* consider the effects of setting this flag. The internal
* {@link java.util.concurrent.BlockingQueue} that this class is keeping
* will more likely be out of sync with the file system if this flag is set
* to false, but it will change more often (causing expensive reordering) if it is set to true.
* @param scanEachPoll
* whether or not the component should re-scan (as opposed to not
* rescanning until the entire backlog has been delivered)
* @param scanEachPoll whether the component should re-scan (as opposed to not
* rescanning until the entire backlog has been delivered)
*/
public void setScanEachPoll(boolean scanEachPoll) {
this.scanEachPoll = scanEachPoll;
Expand Down Expand Up @@ -282,6 +288,28 @@ public void setWatchEvents(WatchEventType... watchEvents) {
this.watchEvents = Arrays.copyOf(watchEvents, watchEvents.length);
}

/**
* Set a max depth for the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} API when
* {@link #useWatchService} is enabled.
* Defaults to {@link Integer#MAX_VALUE} - walk the whole tree.
* @param watchMaxDepth the depth for {@link Files#walkFileTree(Path, Set, int, FileVisitor)}.
* @since 6.1
*/
public void setWatchMaxDepth(int watchMaxDepth) {
this.watchMaxDepth = watchMaxDepth;
}

/**
* Set a {@link Predicate} to check a directory in the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} call
* if it is eligible for {@link WatchService}.
* @param watchDirPredicate the {@link Predicate} to check dirs for walking.
* @since 6.1
*/
public void setWatchDirPredicate(Predicate<Path> watchDirPredicate) {
Assert.notNull(watchDirPredicate, "'watchDirPredicate' must not be null.");
this.watchDirPredicate = watchDirPredicate;
}

@Override
public String getComponentType() {
return "file:inbound-channel-adapter";
Expand All @@ -299,16 +327,16 @@ public void start() {
() -> "Source path [" + this.directory + "] does not point to a directory.");
Assert.isTrue(this.directory.canRead(),
() -> "Source directory [" + this.directory + "] is not readable.");
if (this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).start();
if (this.scanner instanceof Lifecycle lifecycle) {
lifecycle.start();
}
}
}

@Override
public void stop() {
if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).stop();
if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle lifecycle) {
lifecycle.stop();
}
}

Expand Down Expand Up @@ -418,8 +446,8 @@ private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner imple

@Override
public void setFilter(FileListFilter<File> filter) {
if (filter instanceof DiscardAwareFileListFilter) {
((DiscardAwareFileListFilter<File>) filter).addDiscardCallback(this.filesToPoll::add);
if (filter instanceof DiscardAwareFileListFilter<File> discardAwareFileListFilter) {
discardAwareFileListFilter.addDiscardCallback(this.filesToPoll::add);
}
super.setFilter(filter);
}
Expand Down Expand Up @@ -505,8 +533,8 @@ private void processFilesFromNormalEvent(Set<File> files, File parentDir, WatchE
logger.debug(() -> "Watch event [" + event.kind() + "] for file [" + file + "]");

if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind())) {
if (getFilter() instanceof ResettableFileListFilter) {
((ResettableFileListFilter<File>) getFilter()).remove(file);
if (getFilter() instanceof ResettableFileListFilter<File> resettableFileListFilter) {
resettableFileListFilter.remove(file);
}
boolean fileRemoved = files.remove(file);
if (fileRemoved) {
Expand Down Expand Up @@ -540,8 +568,8 @@ private void processFilesFromOverflowEvent(Set<File> files, WatchEvent<?> event)
}
this.pathKeys.clear();

if (event.context() != null && event.context() instanceof Path) {
files.addAll(walkDirectory((Path) event.context(), event.kind()));
if (event.context() != null && event.context() instanceof Path path) {
files.addAll(walkDirectory(path, event.kind()));
}
else {
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind()));
Expand All @@ -552,25 +580,32 @@ private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
final Set<File> walkedFiles = new LinkedHashSet<>();
try {
registerWatch(directory);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs);
registerWatch(dir);
return fileVisitResult;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
walkedFiles.add(file.toFile());
}
return fileVisitResult;
}

});
Files.walkFileTree(directory, Collections.emptySet(), FileReadingMessageSource.this.watchMaxDepth,
new SimpleFileVisitor<>() {

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {

if (FileReadingMessageSource.this.watchDirPredicate.test(dir)) {
registerWatch(dir);
return FileVisitResult.CONTINUE;
}
else {
return FileVisitResult.SKIP_SUBTREE;
}
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
walkedFiles.add(file.toFile());
}
return fileVisitResult;
}

});
}
catch (IOException ex) {
logger.error(ex, () -> "Failed to walk directory: " + directory.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,12 @@
package org.springframework.integration.file.dsl;

import java.io.File;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.dsl.ComponentsRegistration;
Expand Down Expand Up @@ -276,6 +278,32 @@ public FileInboundChannelAdapterSpec watchEvents(FileReadingMessageSource.WatchE
return this;
}

/**
* Set a depth for files walk API.
* @param watchMaxDepth the depth for files walk.
* @return the spec.
* @since 6.1
* @see #useWatchService
* @see FileReadingMessageSource#setWatchMaxDepth(int)
*/
public FileInboundChannelAdapterSpec watchMaxDepth(int watchMaxDepth) {
target.setWatchMaxDepth(watchMaxDepth);
return this;
}

/**
* Set a {@link Predicate} to check if it is eligible for {@link java.nio.file.WatchService}.
* @param watchDirPredicate the {@link Predicate} to check dirs for walking.
* @return the spec.
* @since 6.1
* @see #useWatchService
* @see FileReadingMessageSource#setWatchDirPredicate(Predicate)
*/
public FileInboundChannelAdapterSpec watchDirPredicate(Predicate<Path> watchDirPredicate) {
target.setWatchDirPredicate(watchDirPredicate);
return this;
}

@Override
public Map<Object, String> getComponentsToRegister() {
if (this.scanner == null || this.filtersSet) {
Expand Down
Loading

0 comments on commit ab72795

Please sign in to comment.