Skip to content

Commit adc3cfe

Browse files
aashaStephen O'Donnell
authored andcommitted
CDPD-35109. HDFS-14869 Copy renamed files which are not excluded anymore by filter (apache#1530)
(cherry picked from commit fccccc9) Change-Id: I6acd2358f438849311c2e31e489bfdb5447fbefc
1 parent ae077cd commit adc3cfe

File tree

3 files changed

+185
-8
lines changed

3 files changed

+185
-8
lines changed

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private void prepareFileListing(Job job) throws Exception {
8484
if (context.shouldUseSnapshotDiff()) {
8585
// When "-diff" or "-rdiff" is passed, do sync() first, then
8686
// create copyListing based on snapshot diff.
87-
DistCpSync distCpSync = new DistCpSync(context, getConf());
87+
DistCpSync distCpSync = new DistCpSync(context, job.getConfiguration());
8888
if (distCpSync.sync()) {
8989
createInputFileListingWithDiff(job, distCpSync);
9090
} else {

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,13 @@ class DistCpSync {
5757
//
5858
private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
5959
private DiffInfo[] renameDiffs;
60+
private CopyFilter copyFilter;
6061

6162
DistCpSync(DistCpContext context, Configuration conf) {
6263
this.context = context;
6364
this.conf = conf;
65+
this.copyFilter = CopyFilter.getCopyFilter(conf);
66+
this.copyFilter.initialize();
6467
}
6568

6669
private boolean isRdiff() {
@@ -213,18 +216,32 @@ private boolean getAllDiffs() throws IOException {
213216
}
214217
SnapshotDiffReport.DiffType dt = entry.getType();
215218
List<DiffInfo> list = diffMap.get(dt);
219+
final Path source =
220+
new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
221+
final Path relativeSource = new Path(Path.SEPARATOR + source);
216222
if (dt == SnapshotDiffReport.DiffType.MODIFY ||
217223
dt == SnapshotDiffReport.DiffType.CREATE ||
218224
dt == SnapshotDiffReport.DiffType.DELETE) {
219-
final Path source =
220-
new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
221-
list.add(new DiffInfo(source, null, dt));
225+
if (copyFilter.shouldCopy(relativeSource)) {
226+
list.add(new DiffInfo(source, null, dt));
227+
}
222228
} else if (dt == SnapshotDiffReport.DiffType.RENAME) {
223-
final Path source =
224-
new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
225229
final Path target =
226-
new Path(DFSUtilClient.bytes2String(entry.getTargetPath()));
227-
list.add(new DiffInfo(source, target, dt));
230+
new Path(DFSUtilClient.bytes2String(entry.getTargetPath()));
231+
final Path relativeTarget = new Path(Path.SEPARATOR + target);
232+
if (copyFilter.shouldCopy(relativeSource)) {
233+
if (copyFilter.shouldCopy(relativeTarget)) {
234+
list.add(new DiffInfo(source, target, dt));
235+
} else {
236+
list = diffMap.get(SnapshotDiffReport.DiffType.DELETE);
237+
list.add(new DiffInfo(source, target,
238+
SnapshotDiffReport.DiffType.DELETE));
239+
}
240+
} else if (copyFilter.shouldCopy(relativeTarget)) {
241+
list = diffMap.get(SnapshotDiffReport.DiffType.CREATE);
242+
list.add(new DiffInfo(target, null,
243+
SnapshotDiffReport.DiffType.CREATE));
244+
}
228245
}
229246
}
230247
return true;

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@
3939
import org.junit.Before;
4040
import org.junit.Test;
4141

42+
43+
import java.io.IOException;
44+
import java.io.FileWriter;
45+
import java.io.BufferedWriter;
46+
import java.nio.file.Files;
47+
import java.util.Arrays;
48+
import java.util.ArrayList;
4249
import java.util.Collections;
4350
import java.util.HashMap;
4451
import java.util.Map;
@@ -747,4 +754,157 @@ public void testSyncSnapshotTimeStampChecking() throws Exception {
747754
}
748755
Assert.assertTrue(threwException);
749756
}
757+
758+
private void initData10(Path dir) throws Exception {
759+
final Path staging = new Path(dir, ".staging");
760+
final Path stagingF1 = new Path(staging, "f1");
761+
final Path data = new Path(dir, "data");
762+
final Path dataF1 = new Path(data, "f1");
763+
764+
DFSTestUtil.createFile(dfs, stagingF1, BLOCK_SIZE, DATA_NUM, 0L);
765+
DFSTestUtil.createFile(dfs, dataF1, BLOCK_SIZE, DATA_NUM, 0L);
766+
}
767+
768+
private void changeData10(Path dir) throws Exception {
769+
final Path staging = new Path(dir, ".staging");
770+
final Path prod = new Path(dir, "prod");
771+
dfs.rename(staging, prod);
772+
}
773+
774+
private java.nio.file.Path generateFilterFile(String fileName)
775+
throws IOException {
776+
java.nio.file.Path tmpFile = Files.createTempFile(fileName, "txt");
777+
String str = ".*\\.staging.*";
778+
try (BufferedWriter writer = new BufferedWriter(
779+
new FileWriter(tmpFile.toString()))) {
780+
writer.write(str);
781+
}
782+
return tmpFile;
783+
}
784+
785+
private void deleteFilterFile(java.nio.file.Path filePath)
786+
throws IOException {
787+
Files.delete(filePath);
788+
}
789+
790+
@Test
791+
public void testSync10() throws Exception {
792+
java.nio.file.Path filterFile = null;
793+
try {
794+
Path sourcePath = new Path(dfs.getWorkingDirectory(), "source");
795+
initData10(sourcePath);
796+
dfs.allowSnapshot(sourcePath);
797+
dfs.createSnapshot(sourcePath, "s1");
798+
filterFile = generateFilterFile("filters");
799+
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
800+
new ArrayList<>(Arrays.asList(sourcePath)),
801+
target)
802+
.withFiltersFile(filterFile.toString())
803+
.withSyncFolder(true);
804+
new DistCp(conf, builder.build()).execute();
805+
verifySync(dfs.getFileStatus(sourcePath),
806+
dfs.getFileStatus(target), false, ".staging");
807+
808+
dfs.allowSnapshot(target);
809+
dfs.createSnapshot(target, "s1");
810+
changeData10(sourcePath);
811+
dfs.createSnapshot(sourcePath, "s2");
812+
813+
final DistCpOptions.Builder diffBuilder = new DistCpOptions.Builder(
814+
new ArrayList<>(Arrays.asList(sourcePath)),
815+
target)
816+
.withUseDiff("s1", "s2")
817+
.withFiltersFile(filterFile.toString())
818+
.withSyncFolder(true);
819+
new DistCp(conf, diffBuilder.build()).execute();
820+
verifyCopy(dfs.getFileStatus(sourcePath),
821+
dfs.getFileStatus(target), false);
822+
} finally {
823+
deleteFilterFile(filterFile);
824+
}
825+
}
826+
827+
private void initData11(Path dir) throws Exception {
828+
final Path staging = new Path(dir, "prod");
829+
final Path stagingF1 = new Path(staging, "f1");
830+
final Path data = new Path(dir, "data");
831+
final Path dataF1 = new Path(data, "f1");
832+
833+
DFSTestUtil.createFile(dfs, stagingF1, BLOCK_SIZE, DATA_NUM, 0L);
834+
DFSTestUtil.createFile(dfs, dataF1, BLOCK_SIZE, DATA_NUM, 0L);
835+
}
836+
837+
private void changeData11(Path dir) throws Exception {
838+
final Path staging = new Path(dir, "prod");
839+
final Path prod = new Path(dir, ".staging");
840+
dfs.rename(staging, prod);
841+
}
842+
843+
private void verifySync(FileStatus s, FileStatus t, boolean compareName,
844+
String deletedName)
845+
throws Exception {
846+
Assert.assertEquals(s.isDirectory(), t.isDirectory());
847+
if (compareName) {
848+
Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
849+
}
850+
if (!s.isDirectory()) {
851+
// verify the file content is the same
852+
byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
853+
byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
854+
Assert.assertArrayEquals(sbytes, tbytes);
855+
} else {
856+
FileStatus[] slist = dfs.listStatus(s.getPath());
857+
FileStatus[] tlist = dfs.listStatus(t.getPath());
858+
int minFiles = tlist.length;
859+
if (slist.length < tlist.length) {
860+
minFiles = slist.length;
861+
}
862+
for (int i = 0; i < minFiles; i++) {
863+
if (slist[i].getPath().getName().contains(deletedName)) {
864+
if (tlist[i].getPath().getName().contains(deletedName)) {
865+
throw new Exception("Target is not synced as per exclusion filter");
866+
}
867+
continue;
868+
}
869+
verifySync(slist[i], tlist[i], true, deletedName);
870+
}
871+
}
872+
}
873+
874+
@Test
875+
public void testSync11() throws Exception {
876+
java.nio.file.Path filterFile = null;
877+
try {
878+
Path sourcePath = new Path(dfs.getWorkingDirectory(), "source");
879+
initData11(sourcePath);
880+
dfs.allowSnapshot(sourcePath);
881+
dfs.createSnapshot(sourcePath, "s1");
882+
filterFile = generateFilterFile("filters");
883+
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
884+
new ArrayList<>(Arrays.asList(sourcePath)),
885+
target)
886+
.withFiltersFile(filterFile.toString())
887+
.withSyncFolder(true);
888+
new DistCp(conf, builder.build()).execute();
889+
verifyCopy(dfs.getFileStatus(sourcePath),
890+
dfs.getFileStatus(target), false);
891+
892+
dfs.allowSnapshot(target);
893+
dfs.createSnapshot(target, "s1");
894+
changeData11(sourcePath);
895+
dfs.createSnapshot(sourcePath, "s2");
896+
897+
final DistCpOptions.Builder diffBuilder = new DistCpOptions.Builder(
898+
new ArrayList<>(Arrays.asList(sourcePath)),
899+
target)
900+
.withUseDiff("s1", "s2")
901+
.withFiltersFile(filterFile.toString())
902+
.withSyncFolder(true);
903+
new DistCp(conf, diffBuilder.build()).execute();
904+
verifySync(dfs.getFileStatus(sourcePath),
905+
dfs.getFileStatus(target), false, ".staging");
906+
} finally {
907+
deleteFilterFile(filterFile);
908+
}
909+
}
750910
}

0 commit comments

Comments
 (0)