Skip to content

Commit 22c4db2

Browse files
committed
GH-8792: Fix File StreamingMS for one file filter
Fixes #8792 1. Use a `SftpStreamingMessageSource` with a `maxFetchSize = 5` and a `ChainFileListFilter` filter composed with `SftpSystemMarkerFilePresentFileListFilter` which `supportsSingleFileFiltering == false` 2. Put 2 files in the folder and invoke `SftpStreamingMessageSource.receive()` method twice. 3. Put 5 files in the folder and invoke `SftpStreamingMessageSource.receive()`` method five times. 4. The last two files won't be received. When you set max fetch size to a number bigger than one (for example 5) and at a certain point it is necessary to `this.toBeReceived.clear()` inside `AbstractRemoteFileStreamingMessageSource.doReceive()`, those removed elements from toBeReceived are not rolled back. * Fix `AbstractRemoteFileStreamingMessageSource.listFiles()` to calculate `maxFetchSize` as `getMaxFetchSize() - this.fetched.get()` **Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`**
1 parent d95bc68 commit 22c4db2

File tree

2 files changed

+74
-4
lines changed

2 files changed

+74
-4
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ protected Object doReceive(int maxFetchSize) {
215215
if (this.filter != null && this.filter.supportsSingleFileFiltering()
216216
&& !this.filter.accept(file.getFileInfo())) {
217217

218-
if (this.toBeReceived.size() > 0) { // don't re-fetch already filtered files
218+
if (!this.toBeReceived.isEmpty()) { // don't re-fetch already filtered files
219219
file = poll();
220220
continue;
221221
}
@@ -267,7 +267,7 @@ private Object remoteFileToMessage(AbstractFileInfo<F> file) {
267267
}
268268

269269
protected AbstractFileInfo<F> poll() {
270-
if (this.toBeReceived.size() == 0) {
270+
if (this.toBeReceived.isEmpty()) {
271271
listFiles();
272272
}
273273
return this.toBeReceived.poll();
@@ -297,7 +297,7 @@ private void listFiles() {
297297
if (!ObjectUtils.isEmpty(files)) {
298298
List<AbstractFileInfo<F>> fileInfoList;
299299
if (this.filter != null && !this.filter.supportsSingleFileFiltering()) {
300-
int maxFetchSize = getMaxFetchSize();
300+
int maxFetchSize = getMaxFetchSize() - this.fetched.get();
301301
List<F> filteredFiles = this.filter.filterFiles(files);
302302
if (maxFetchSize > 0 && filteredFiles.size() > maxFetchSize) {
303303
rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize));

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java

+71-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,13 +16,17 @@
1616

1717
package org.springframework.integration.sftp.inbound;
1818

19+
import java.io.File;
20+
import java.io.IOException;
1921
import java.io.InputStream;
22+
import java.nio.charset.StandardCharsets;
2023
import java.time.Duration;
2124
import java.util.Arrays;
2225
import java.util.Comparator;
2326
import java.util.concurrent.ConcurrentHashMap;
2427
import java.util.concurrent.ConcurrentMap;
2528

29+
import org.apache.commons.io.FileUtils;
2630
import org.apache.sshd.sftp.client.SftpClient;
2731
import org.junit.jupiter.api.Test;
2832

@@ -39,11 +43,14 @@
3943
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
4044
import org.springframework.integration.file.FileHeaders;
4145
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
46+
import org.springframework.integration.file.filters.ChainFileListFilter;
4247
import org.springframework.integration.file.remote.session.SessionFactory;
4348
import org.springframework.integration.metadata.SimpleMetadataStore;
4449
import org.springframework.integration.scheduling.PollerMetadata;
4550
import org.springframework.integration.sftp.SftpTestSupport;
4651
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
52+
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
53+
import org.springframework.integration.sftp.filters.SftpSystemMarkerFilePresentFileListFilter;
4754
import org.springframework.integration.sftp.session.SftpFileInfo;
4855
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
4956
import org.springframework.integration.transformer.StreamTransformer;
@@ -168,6 +175,69 @@ public void testMaxFetchLambdaFilter() throws Exception {
168175
StaticMessageHeaderAccessor.getCloseableResource(received).close();
169176
}
170177

178+
179+
@Test
180+
public void maxFetchIsAdjustedWhenNoSupportsSingleFileFiltering() throws Exception {
181+
SftpStreamingMessageSource messageSource = buildSource();
182+
ChainFileListFilter<SftpClient.DirEntry> chainFileListFilter = new ChainFileListFilter<>();
183+
SftpSystemMarkerFilePresentFileListFilter sftpSystemMarkerFilePresentFileListFilter =
184+
new SftpSystemMarkerFilePresentFileListFilter(
185+
new SftpSimplePatternFileListFilter("*"), ".trg");
186+
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter =
187+
new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "prefix");
188+
chainFileListFilter.addFilter(sftpSystemMarkerFilePresentFileListFilter);
189+
chainFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);
190+
messageSource.setFilter(chainFileListFilter);
191+
messageSource.setMaxFetchSize(5);
192+
messageSource.afterPropertiesSet();
193+
messageSource.start();
194+
195+
addFileAndTrigger("file001");
196+
addFileAndTrigger("file002");
197+
198+
Message<InputStream> received = messageSource.receive();
199+
assertThat(received).isNotNull();
200+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file001");
201+
202+
received = messageSource.receive();
203+
assertThat(received).isNotNull();
204+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file002");
205+
206+
addFileAndTrigger("file003");
207+
addFileAndTrigger("file004");
208+
addFileAndTrigger("file005");
209+
addFileAndTrigger("file006");
210+
addFileAndTrigger("file007");
211+
212+
received = messageSource.receive();
213+
assertThat(received).isNotNull();
214+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file003");
215+
216+
received = messageSource.receive();
217+
assertThat(received).isNotNull();
218+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file004");
219+
220+
received = messageSource.receive();
221+
assertThat(received).isNotNull();
222+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file005");
223+
224+
received = messageSource.receive();
225+
assertThat(received).isNotNull();
226+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file006");
227+
228+
received = messageSource.receive();
229+
assertThat(received).isNotNull();
230+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file007");
231+
}
232+
233+
private void addFileAndTrigger(String filename) throws IOException {
234+
File file = new File(this.sourceRemoteDirectory, filename);
235+
FileUtils.writeStringToFile(file, "source1", StandardCharsets.UTF_8);
236+
237+
file = new File(this.sourceRemoteDirectory, filename + ".trg");
238+
file.createNewFile();
239+
}
240+
171241
private SftpStreamingMessageSource buildSource() {
172242
SftpStreamingMessageSource messageSource =
173243
new SftpStreamingMessageSource(this.config.template(),

0 commit comments

Comments
 (0)