Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -229,4 +235,58 @@ private static String fetchUrl(URL url, String authToken)
}
return IOUtils.toString(connection.getInputStream(), StandardCharsets.UTF_8);
}


/**
* @param pinotFs root directory fs
* @param fileUri root directory uri
* @param includePattern optional glob patterns for files to include
* @param excludePattern optional glob patterns for files to exclude
* @param searchRecursively if ture, search files recursively from directory specified in fileUri
* @return list of matching files.
* @throws IOException on IO failure for list files in root directory.
* @throws URISyntaxException for matching file URIs
* @throws RuntimeException if there is no matching file.
*/
public static List<String> listMatchedFilesWithRecursiveOption(PinotFS pinotFs, URI fileUri,
@Nullable String includePattern, @Nullable String excludePattern, boolean searchRecursively)
throws Exception {
String[] files;
// listFiles throws IOException
files = pinotFs.listFiles(fileUri, searchRecursively);
//TODO: sort input files based on creation time
PathMatcher includeFilePathMatcher = null;
if (includePattern != null) {
includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(includePattern);
}
PathMatcher excludeFilePathMatcher = null;
if (excludePattern != null) {
excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(excludePattern);
}
List<String> filteredFiles = new ArrayList<>();
for (String file : files) {
if (includeFilePathMatcher != null) {
if (!includeFilePathMatcher.matches(Paths.get(file))) {
continue;
}
}
if (excludeFilePathMatcher != null) {
if (excludeFilePathMatcher.matches(Paths.get(file))) {
continue;
}
}
if (!pinotFs.isDirectory(new URI(file))) {
// In case PinotFS implementations list files without a scheme (e.g. hdfs://), then we may lose it in the
// input file path. Call SegmentGenerationUtils.getFileURI() to fix this up.
// getFileURI throws URISyntaxException
filteredFiles.add(SegmentGenerationUtils.getFileURI(file, fileUri).toString());
}
}
if (filteredFiles.isEmpty()) {
throw new RuntimeException(String.format(
"No file found in the input directory: %s matching includeFileNamePattern: %s,"
+ " excludeFileNamePattern: %s", fileUri, includePattern, excludePattern));
}
return filteredFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@

package org.apache.pinot.common.segment.generation;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.file.Files;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -124,4 +132,114 @@ private void validateFileURI(URI directoryURI)
throws URISyntaxException {
validateFileURI(directoryURI, directoryURI.toString());
}

// Test that we search files recursively when recursiveSearch option is set to true.
@Test
public void testMatchFilesRecursiveSearchOnRecursiveInputFilePattern()
throws Exception {
File testDir = makeTestDir();
File inputDir = new File(testDir, "input");
File inputSubDir1 = new File(inputDir, "2009");
inputSubDir1.mkdirs();

File inputFile1 = new File(inputDir, "input.csv");
FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2"));

File inputFile2 = new File(inputSubDir1, "input.csv");
FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4"));
URI inputDirURI = new URI(inputDir.getAbsolutePath());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.csv";
List<String> files =
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern, null, true);
Assert.assertEquals(files.size(), 2);
}

// Test that we don't search files recursively when recursiveSearch option is set to false.
@Test
public void testMatchFilesRecursiveSearchOnNonRecursiveInputFilePattern()
throws Exception {
File testDir = makeTestDir();
File inputDir = new File(testDir, "dir");
File inputSubDir1 = new File(inputDir, "2009");
inputSubDir1.mkdirs();

File inputFile1 = new File(inputDir, "input.csv");
FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2"));

File inputFile2 = new File(inputSubDir1, "input.csv");
FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4"));
URI inputDirURI = new URI(inputDir.getAbsolutePath());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
String includePattern = "glob:" + inputDir.getAbsolutePath() + "/*.csv";

List<String> files =
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern, null,
false);
Assert.assertEquals(files.size(), 1);
}

// Test that we exclude files that match exclude pattern.
@Test
public void testMatchFilesRecursiveSearchExcludeFilePattern()
throws Exception {
File testDir = makeTestDir();
File inputDir = new File(testDir, "dir");
File inputSubDir1 = new File(inputDir, "2009");
inputSubDir1.mkdirs();

File inputFile1 = new File(inputDir, "input1.csv");
FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2"));

File inputFile2 = new File(inputSubDir1, "input2.csv");
FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4"));
URI inputDirURI = new URI(inputDir.getAbsolutePath());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.csv";
String excludePattern = "glob:" + inputDir.getAbsolutePath() + "/2009/input2.csv";

List<String> files =
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern,
excludePattern, true);
Assert.assertEquals(files.size(), 1);
}

// Test that we throw an exception when there is no file matching.
@Test
public void testEmptyMatchFiles()
throws Exception {
File testDir = makeTestDir();
File inputDir = new File(testDir, "dir");
File inputSubDir1 = new File(inputDir, "2009");
inputSubDir1.mkdirs();

File inputFile1 = new File(inputDir, "input1.csv");
FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2"));

File inputFile2 = new File(inputSubDir1, "input2.csv");
FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4"));
URI inputDirURI = new File(inputDir.getAbsolutePath()).toURI();
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.json";
Assert.assertThrows(RuntimeException.class, () -> {
SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern, null, true);
});
}

private File makeTestDir()
throws IOException {
File testDir = Files.createTempDirectory("testSegmentGeneration-").toFile();
testDir.delete();
testDir.mkdirs();
return testDir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -142,13 +140,14 @@ public void run()
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}

//Get pinotFS for input
//Get list of files to process
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());

//Get outputFS for writing output pinot segments
URI outputDirURI = new URI(_spec.getOutputDirURI());
Expand Down Expand Up @@ -181,71 +180,35 @@ public void run()
Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), SEGMENT_TAR_SUBDIR_NAME);
outputDirFS.mkdir(stagingSegmentTarUri.toUri());

//Get list of files to process
String[] files = inputDirFS.listFiles(inputDirURI, true);

//TODO: sort input files based on creation time
List<String> filteredFiles = new ArrayList<>();
PathMatcher includeFilePathMatcher = null;
if (_spec.getIncludeFileNamePattern() != null) {
includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
}
PathMatcher excludeFilePathMatcher = null;
if (_spec.getExcludeFileNamePattern() != null) {
excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
}
// numDataFiles is guaranteed to be greater than zero since listMatchedFilesWithRecursiveOption will throw
// runtime exception if the matched files list is empty.
int numDataFiles = filteredFiles.size();

for (String file : files) {
if (includeFilePathMatcher != null) {
if (!includeFilePathMatcher.matches(Paths.get(file))) {
continue;
LOGGER.info("Creating segments with data files: {}", filteredFiles);
if (!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
Map<String, List<String>> localDirIndex = new HashMap<>();
for (String filteredFile : filteredFiles) {
java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
if (!localDirIndex.containsKey(filteredParentPath.toString())) {
localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
}
localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
}
if (excludeFilePathMatcher != null) {
if (excludeFilePathMatcher.matches(Paths.get(file))) {
continue;
for (String parentPath : localDirIndex.keySet()) {
List<String> siblingFiles = localDirIndex.get(parentPath);
Collections.sort(siblingFiles);
for (int i = 0; i < siblingFiles.size(); i++) {
URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i),
SegmentGenerationUtils.getDirectoryURI(parentPath));
createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
}
}
if (!inputDirFS.isDirectory(new URI(file))) {
filteredFiles.add(file);
}
}

int numDataFiles = filteredFiles.size();
if (numDataFiles == 0) {
String errorMessage = String
.format("No data file founded in [%s], with include file pattern: [%s] and exclude file pattern [%s]",
_spec.getInputDirURI(), _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern());
LOGGER.error(errorMessage);
throw new RuntimeException(errorMessage);
} else {
LOGGER.info("Creating segments with data files: {}", filteredFiles);
if (!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
Map<String, List<String>> localDirIndex = new HashMap<>();
for (String filteredFile : filteredFiles) {
java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
if (!localDirIndex.containsKey(filteredParentPath.toString())) {
localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
}
localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
}
for (String parentPath : localDirIndex.keySet()) {
List<String> siblingFiles = localDirIndex.get(parentPath);
Collections.sort(siblingFiles);
for (int i = 0; i < siblingFiles.size(); i++) {
URI inputFileURI = SegmentGenerationUtils
.getFileURI(siblingFiles.get(i), SegmentGenerationUtils.getDirectoryURI(parentPath));
createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
}
}
} else {
for (int i = 0; i < numDataFiles; i++) {
URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
}
for (int i = 0; i < numDataFiles; i++) {
URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
}
}

try {
// Set up the job
Job job = Job.getInstance(getConf());
Expand Down
Loading