From ab72795d825614a56f2d48f52251f5e67a0b4cea Mon Sep 17 00:00:00 2001 From: abilan Date: Thu, 13 Apr 2023 16:32:00 -0400 Subject: [PATCH] GH-3557: Add maxDepth, dirPredicate to FileReadMS Fixes https://github.com/spring-projects/spring-integration/issues/3557 * Expose a `watchMaxDepth` on the `FileReadingMessageSource` for its `Files.walkFileTree()` API usage * Add `watchDirPredicate` option ot the `FileReadingMessageSource` to skip sub-tree for `Files.walkFileTree()` scanning according to some condition against directory `Path` --- .../file/FileReadingMessageSource.java | 103 ++++++++++++------ .../dsl/FileInboundChannelAdapterSpec.java | 30 ++++- .../WatchServiceDirectoryScannerTests.java | 92 +++++++++------- src/reference/asciidoc/file.adoc | 5 + src/reference/asciidoc/whats-new.adoc | 6 + 5 files changed, 159 insertions(+), 77 deletions(-) diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java b/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java index 5cb7df4abbb..13a519733bc 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.file.FileSystems; import java.nio.file.FileVisitResult; +import java.nio.file.FileVisitor; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; @@ -29,6 +30,7 @@ import java.nio.file.WatchService; import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashSet; @@ -39,6 +41,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.regex.Matcher; import org.springframework.context.Lifecycle; @@ -122,6 +125,10 @@ public class FileReadingMessageSource extends AbstractMessageSource implem private WatchEventType[] watchEvents = {WatchEventType.CREATE}; + private int watchMaxDepth = Integer.MAX_VALUE; + + private Predicate watchDirPredicate = path -> true; + /** * Create a FileReadingMessageSource with a naturally ordered queue of unbounded capacity. */ @@ -237,15 +244,14 @@ public void setLocker(FileLocker locker) { * Set this flag if you want to make sure the internal queue is * refreshed with the latest content of the input directory on each poll. *

- * By default this implementation will empty its queue before looking at the + * By default, this implementation will empty its queue before looking at the * directory again. In cases where order is relevant it is important to * consider the effects of setting this flag. The internal * {@link java.util.concurrent.BlockingQueue} that this class is keeping * will more likely be out of sync with the file system if this flag is set * to false, but it will change more often (causing expensive reordering) if it is set to true. - * @param scanEachPoll - * whether or not the component should re-scan (as opposed to not - * rescanning until the entire backlog has been delivered) + * @param scanEachPoll whether the component should re-scan (as opposed to not + * rescanning until the entire backlog has been delivered) */ public void setScanEachPoll(boolean scanEachPoll) { this.scanEachPoll = scanEachPoll; @@ -282,6 +288,28 @@ public void setWatchEvents(WatchEventType... watchEvents) { this.watchEvents = Arrays.copyOf(watchEvents, watchEvents.length); } + /** + * Set a max depth for the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} API when + * {@link #useWatchService} is enabled. + * Defaults to {@link Integer#MAX_VALUE} - walk the whole tree. + * @param watchMaxDepth the depth for {@link Files#walkFileTree(Path, Set, int, FileVisitor)}. + * @since 6.1 + */ + public void setWatchMaxDepth(int watchMaxDepth) { + this.watchMaxDepth = watchMaxDepth; + } + + /** + * Set a {@link Predicate} to check a directory in the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} call + * if it is eligible for {@link WatchService}. + * @param watchDirPredicate the {@link Predicate} to check dirs for walking. + * @since 6.1 + */ + public void setWatchDirPredicate(Predicate watchDirPredicate) { + Assert.notNull(watchDirPredicate, "'watchDirPredicate' must not be null."); + this.watchDirPredicate = watchDirPredicate; + } + @Override public String getComponentType() { return "file:inbound-channel-adapter"; @@ -299,16 +327,16 @@ public void start() { () -> "Source path [" + this.directory + "] does not point to a directory."); Assert.isTrue(this.directory.canRead(), () -> "Source directory [" + this.directory + "] is not readable."); - if (this.scanner instanceof Lifecycle) { - ((Lifecycle) this.scanner).start(); + if (this.scanner instanceof Lifecycle lifecycle) { + lifecycle.start(); } } } @Override public void stop() { - if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle) { - ((Lifecycle) this.scanner).stop(); + if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle lifecycle) { + lifecycle.stop(); } } @@ -418,8 +446,8 @@ private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner imple @Override public void setFilter(FileListFilter filter) { - if (filter instanceof DiscardAwareFileListFilter) { - ((DiscardAwareFileListFilter) filter).addDiscardCallback(this.filesToPoll::add); + if (filter instanceof DiscardAwareFileListFilter discardAwareFileListFilter) { + discardAwareFileListFilter.addDiscardCallback(this.filesToPoll::add); } super.setFilter(filter); } @@ -505,8 +533,8 @@ private void processFilesFromNormalEvent(Set files, File parentDir, WatchE logger.debug(() -> "Watch event [" + event.kind() + "] for file [" + file + "]"); if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind())) { - if (getFilter() instanceof ResettableFileListFilter) { - ((ResettableFileListFilter) getFilter()).remove(file); + if (getFilter() instanceof ResettableFileListFilter resettableFileListFilter) { + resettableFileListFilter.remove(file); } boolean fileRemoved = files.remove(file); if (fileRemoved) { @@ -540,8 +568,8 @@ private void processFilesFromOverflowEvent(Set files, WatchEvent event) } this.pathKeys.clear(); - if (event.context() != null && event.context() instanceof Path) { - files.addAll(walkDirectory((Path) event.context(), event.kind())); + if (event.context() != null && event.context() instanceof Path path) { + files.addAll(walkDirectory(path, event.kind())); } else { files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind())); @@ -552,25 +580,32 @@ private Set walkDirectory(Path directory, final WatchEvent.Kind kind) { final Set walkedFiles = new LinkedHashSet<>(); try { registerWatch(directory); - Files.walkFileTree(directory, new SimpleFileVisitor() { - - @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { - FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs); - registerWatch(dir); - return fileVisitResult; - } - - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - FileVisitResult fileVisitResult = super.visitFile(file, attrs); - if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) { - walkedFiles.add(file.toFile()); - } - return fileVisitResult; - } - - }); + Files.walkFileTree(directory, Collections.emptySet(), FileReadingMessageSource.this.watchMaxDepth, + new SimpleFileVisitor<>() { + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + + if (FileReadingMessageSource.this.watchDirPredicate.test(dir)) { + registerWatch(dir); + return FileVisitResult.CONTINUE; + } + else { + return FileVisitResult.SKIP_SUBTREE; + } + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult fileVisitResult = super.visitFile(file, attrs); + if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) { + walkedFiles.add(file.toFile()); + } + return fileVisitResult; + } + + }); } catch (IOException ex) { logger.error(ex, () -> "Failed to walk directory: " + directory.toString()); diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/dsl/FileInboundChannelAdapterSpec.java b/spring-integration-file/src/main/java/org/springframework/integration/file/dsl/FileInboundChannelAdapterSpec.java index f026a974515..342297a30fc 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/dsl/FileInboundChannelAdapterSpec.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/dsl/FileInboundChannelAdapterSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,12 @@ package org.springframework.integration.file.dsl; import java.io.File; +import java.nio.file.Path; import java.util.Collections; import java.util.Comparator; import java.util.Map; import java.util.function.Function; +import java.util.function.Predicate; import org.springframework.beans.DirectFieldAccessor; import org.springframework.integration.dsl.ComponentsRegistration; @@ -276,6 +278,32 @@ public FileInboundChannelAdapterSpec watchEvents(FileReadingMessageSource.WatchE return this; } + /** + * Set a depth for files walk API. + * @param watchMaxDepth the depth for files walk. + * @return the spec. + * @since 6.1 + * @see #useWatchService + * @see FileReadingMessageSource#setWatchMaxDepth(int) + */ + public FileInboundChannelAdapterSpec watchMaxDepth(int watchMaxDepth) { + target.setWatchMaxDepth(watchMaxDepth); + return this; + } + + /** + * Set a {@link Predicate} to check if it is eligible for {@link java.nio.file.WatchService}. + * @param watchDirPredicate the {@link Predicate} to check dirs for walking. + * @return the spec. + * @since 6.1 + * @see #useWatchService + * @see FileReadingMessageSource#setWatchDirPredicate(Predicate) + */ + public FileInboundChannelAdapterSpec watchDirPredicate(Predicate watchDirPredicate) { + target.setWatchDirPredicate(watchDirPredicate); + return this; + } + @Override public Map getComponentsToRegister() { if (this.scanner == null || this.filtersSet) { diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/WatchServiceDirectoryScannerTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/WatchServiceDirectoryScannerTests.java index 2a5efce2730..e22e355f549 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/WatchServiceDirectoryScannerTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/WatchServiceDirectoryScannerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,10 +25,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.file.filters.ChainFileListFilter; @@ -44,13 +43,14 @@ /** * @author Gary Russell * @author Artem Bilan + * * @since 4.2 * */ public class WatchServiceDirectoryScannerTests { - @Rule - public TemporaryFolder folder = new TemporaryFolder(); + @TempDir + public File rootDir; private File foo; @@ -62,24 +62,33 @@ public class WatchServiceDirectoryScannerTests { private File bar1; - @Before + private File skipped; + + private File skippedFile; + + @BeforeEach public void setUp() throws IOException { - this.foo = this.folder.newFolder("foo"); - this.bar = this.folder.newFolder("bar"); - this.top1 = this.folder.newFile(); + this.foo = new File(rootDir, "foo"); + this.foo.mkdir(); + this.bar = new File(rootDir, "bar"); + this.bar.mkdir(); + this.top1 = File.createTempFile("tmp", null, this.rootDir); this.foo1 = File.createTempFile("foo", ".txt", this.foo); this.bar1 = File.createTempFile("bar", ".txt", this.bar); + this.skipped = new File(rootDir, "skipped"); + this.skipped.mkdir(); + this.skippedFile = File.createTempFile("skippedFile", null, this.skipped); } @Test - @SuppressWarnings("unchecked") public void testWatchServiceDirectoryScanner() throws Exception { FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource(); - fileReadingMessageSource.setDirectory(folder.getRoot()); + fileReadingMessageSource.setDirectory(this.rootDir); fileReadingMessageSource.setUseWatchService(true); fileReadingMessageSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY, FileReadingMessageSource.WatchEventType.DELETE); + fileReadingMessageSource.setWatchDirPredicate(path -> !path.getFileName().toString().equals("skipped")); fileReadingMessageSource.setBeanFactory(mock(BeanFactory.class)); final CountDownLatch removeFileLatch = new CountDownLatch(1); @@ -107,35 +116,36 @@ public boolean remove(File fileToRemove) { assertThat(scanner.getClass().getName()).contains("FileReadingMessageSource$WatchServiceDirectoryScanner"); // Files are skipped by the LastModifiedFileListFilter - List files = scanner.listFiles(folder.getRoot()); - assertThat(files.size()).isEqualTo(0); + List files = scanner.listFiles(this.rootDir); + assertThat(files).hasSize(0); // Consider all the files as one day old fileLastModifiedFileListFilter.setAge(-60 * 60 * 24); - files = scanner.listFiles(folder.getRoot()); - assertThat(files.size()).isEqualTo(3); - assertThat(files.contains(top1)).isTrue(); - assertThat(files.contains(foo1)).isTrue(); - assertThat(files.contains(bar1)).isTrue(); + files = scanner.listFiles(this.rootDir); + assertThat(files).hasSize(3); + assertThat(files).contains(top1); + assertThat(files).contains(foo1); + assertThat(files).contains(bar1); + assertThat(files).doesNotContain(this.skippedFile); fileReadingMessageSource.start(); - File top2 = this.folder.newFile(); + File top2 = File.createTempFile("tmp", null, this.rootDir); File foo2 = File.createTempFile("foo", ".txt", this.foo); File bar2 = File.createTempFile("bar", ".txt", this.bar); File baz = new File(this.foo, "baz"); baz.mkdir(); File baz1 = File.createTempFile("baz", ".txt", baz); - files = scanner.listFiles(folder.getRoot()); + files = scanner.listFiles(this.rootDir); int n = 0; - Set accum = new HashSet(files); + Set accum = new HashSet<>(files); while (n++ < 300 && accum.size() != 4) { Thread.sleep(100); - files = scanner.listFiles(folder.getRoot()); + files = scanner.listFiles(this.rootDir); accum.addAll(files); } - assertThat(accum.size()).isEqualTo(4); - assertThat(accum.contains(top2)).isTrue(); - assertThat(accum.contains(foo2)).isTrue(); - assertThat(accum.contains(bar2)).isTrue(); - assertThat(accum.contains(baz1)).isTrue(); + assertThat(accum).hasSize(4); + assertThat(accum).contains(top2); + assertThat(accum).contains(foo2); + assertThat(accum).contains(bar2); + assertThat(accum).contains(baz1); /*See AbstractWatchKey#signalEvent source code: if(var5 >= 512) { @@ -143,35 +153,33 @@ public boolean remove(File fileToRemove) { } */ fileReadingMessageSource.start(); - List filesForOverflow = new ArrayList(600); + List filesForOverflow = new ArrayList<>(600); for (int i = 0; i < 600; i++) { - filesForOverflow.add(this.folder.newFile("" + i)); + filesForOverflow.add(File.createTempFile("tmp" + i, null, this.rootDir)); } n = 0; while (n++ < 300 && accum.size() < 604) { Thread.sleep(100); - files = scanner.listFiles(folder.getRoot()); + files = scanner.listFiles(this.rootDir); accum.addAll(files); } - assertThat(accum.size()).isEqualTo(604); + assertThat(accum).hasSize(604); - for (File fileForOverFlow : filesForOverflow) { - accum.contains(fileForOverFlow); - } + assertThat(accum).containsAll(filesForOverflow); File baz2 = File.createTempFile("baz2", ".txt", baz); n = 0; while (n++ < 300 && accum.size() < 605) { Thread.sleep(100); - files = scanner.listFiles(folder.getRoot()); + files = scanner.listFiles(this.rootDir); accum.addAll(files); } - assertThat(accum.contains(baz2)).isTrue(); + assertThat(accum).contains(baz2); File baz2Copy = new File(baz2.getAbsolutePath()); @@ -181,12 +189,12 @@ public boolean remove(File fileToRemove) { files.clear(); while (n++ < 300 && files.size() < 1) { Thread.sleep(100); - files = scanner.listFiles(folder.getRoot()); + files = scanner.listFiles(this.rootDir); accum.addAll(files); } - assertThat(files.size()).isEqualTo(1); - assertThat(files.contains(baz2)).isTrue(); + assertThat(files).hasSize(1); + assertThat(files).contains(baz2); baz2.delete(); @@ -194,7 +202,7 @@ public boolean remove(File fileToRemove) { n = 0; while (n++ < 300 && removeFileLatch.getCount() > 0) { Thread.sleep(100); - scanner.listFiles(folder.getRoot()); + scanner.listFiles(this.rootDir); } assertThat(removeFileLatch.await(10, TimeUnit.SECONDS)).isTrue(); diff --git a/src/reference/asciidoc/file.adoc b/src/reference/asciidoc/file.adoc index c307ccee2d4..a8f8b5e7b2f 100644 --- a/src/reference/asciidoc/file.adoc +++ b/src/reference/asciidoc/file.adoc @@ -386,6 +386,11 @@ The following example shows how to configure different logic for create and modi ---- ==== +Starting with version 6.1, the `FileReadingMessageSource` exposes two new `WatchService`-related options: + +* `watchMaxDepth` - an argument for the `Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)` API; +* `watchDirPredicate` - a `Predicate` to test if directory in the scanned tree should be walked and registered for `WatchService` with its modification events. + ==== Limiting Memory Consumption You can use a `HeadDirectoryScanner` to limit the number of files retained in memory. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 3ead59b3e4d..d3a153078f4 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -63,3 +63,9 @@ See <<./jms.adoc#jms-inbound-gateway, JMS Inbound Gateway>> for more information The (previously deprecated) `ImapIdleChannelAdapter.sendingTaskExecutor` property has been removed in favor of an asynchronous message process downstream in the flow. See <<./mail.adoc#mail-inbound, Mail-receiving Channel Adapter>> for more information. + +[[x6.1-file]] +=== Files Changes + +The `FileReadingMessageSource` exposes now `watchMaxDepth` and `watchDirPredicate` options for the `WatchService`. +See <<./file.adoc#watch-service-directory-scanner, `WatchServiceDirectoryScanner`>> for more information.