Skip to content

Commit

Permalink
fixed an edge case for incremental run failures on cloud filesystems
Browse files Browse the repository at this point in the history
  • Loading branch information
bashir2 committed Sep 13, 2024
1 parent 5727920 commit 464ab39
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.nio.file.FileAlreadyExistsException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -72,6 +74,8 @@ public class DwhFiles {

private static final String INCREMENTAL_DIR = "incremental_run";

static final String TIMESTAMP_PREFIX = "_TIMESTAMP_";

// TODO: It is probably better if we build all DWH files related operations using Beam's
// filesystem API such that when a new filesystem is registered, it automatically works
// everywhere in our code. Note that currently we have hardcoded the valid schema in some places,
Expand Down Expand Up @@ -121,6 +125,10 @@ static DwhFiles forRoot(String dwhRoot, FhirContext fhirContext) {
return new DwhFiles(dwhRoot, fhirContext);
}

public static String safeTimestampSuffix() {
return Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
}

public String getRoot() {
return dwhRoot;
}
Expand All @@ -145,50 +153,75 @@ public String getFilePattern(String resourceType) {
"%s*%s", getResourcePath(resourceType).toString(), ParquetUtil.PARQUET_EXTENSION);
}

// TODO: Move this to a util class and make it non-static.
/**
* This returns the default incremental run path; each incremental run is relative to a full path,
* hence we put this directory under the full-run root.
* Returns all the child directories under the given base directory which are 1-level deep. Note
* in many cloud/distributed file-systems, we do not have "directories"; there are only buckets
* and files in those buckets. We use file-seprators (e.g., `/`) to simulate the concept of
* directories. So for example, this method returns an empty set if `baseDir` is `bucket/test` and
* the only file in that bucket is `bucket/test/dir1/dir2/file.txt`. If `baseDir` is
* `bucket/test/dir1`, in the above example, `dir2` is returned.
*
* @return the default incremental run path
* @param baseDir the path under which "directories" are looked for.
* @return The list of all child directories under the base directory
* @throws IOException
*/
public ResourceId getIncrementalRunPath() {
return FileSystems.matchNewResource(getRoot(), true)
.resolve(INCREMENTAL_DIR, StandardResolveOptions.RESOLVE_DIRECTORY);
}

/** This is used when we want to keep a backup of the old incremental run output. */
public ResourceId getIncrementalRunPathWithTimestamp() {
return FileSystems.matchNewResource(getRoot(), true)
.resolve(
String.format("%s_old_%d", INCREMENTAL_DIR, System.currentTimeMillis()),
StandardResolveOptions.RESOLVE_DIRECTORY);
static Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException {
String fileSeparator = getFileSeparatorForDwhFiles(baseDir);
// Avoid using ResourceId.resolve(..) method to resolve the files when the path contains glob
// expressions with multiple special characters like **, */* etc as this api only supports
// single special characters like `*` or `..`. Rather use the FileSystems.match(..) if the path
// contains glob expressions.
List<MatchResult> matchResultList =
FileSystems.match(
List.of(
getPathEndingWithFileSeparator(baseDir, fileSeparator)
+ "*"
+ fileSeparator
+ "*"));
Set<ResourceId> childDirectories = new HashSet<>();
for (MatchResult matchResult : matchResultList) {
if (matchResult.status() == Status.OK && !matchResult.metadata().isEmpty()) {
for (Metadata metadata : matchResult.metadata()) {
childDirectories.add(metadata.resourceId().getCurrentDirectory());
}
} else if (matchResult.status() == Status.ERROR) {
String errorMessage = String.format("Error matching files under directory %s", baseDir);
log.error(errorMessage);
throw new IOException(errorMessage);
}
}
log.info("Child directories of {} are {}", baseDir, childDirectories);
return childDirectories;
}

/**
* Similar to {@link #getIncrementalRunPath} but also checks if that directory exists and if so,
* moves it to {@link #getIncrementalRunPathWithTimestamp()}.
*
* @return same as {@link #getIncrementalRunPath()}
* @throws IOException if the directory move fails
* Also see {@link #newIncrementalRunPath()}
* @return the current incremental run path if one found; null otherwise.
*/
public ResourceId newIncrementalRunPath() throws IOException {
ResourceId incPath = getIncrementalRunPath();
if (hasIncrementalDir()) {
ResourceId movePath = getIncrementalRunPathWithTimestamp();
log.info("Moving the old {} directory to {}", INCREMENTAL_DIR, movePath);
FileSystems.rename(Collections.singletonList(incPath), Collections.singletonList(movePath));
}
return incPath;
@Nullable
public ResourceId getIncrementalRunPath() throws IOException {
List<ResourceId> dirs =
getAllChildDirectories(getRoot()).stream()
.filter(dir -> dir.getFilename().contains(INCREMENTAL_DIR + TIMESTAMP_PREFIX))
.collect(Collectors.toList());
if (dirs.isEmpty()) return null;

Collections.sort(dirs, Comparator.comparing(ResourceId::toString));
return dirs.get(dirs.size() - 1);
}

/**
* @return true iff there is already an incremental run subdirectory in this DWH.
* This returns a new incremental-run path based on the current timestamp. Note that each
* incremental-run is relative to a full-run, hence we put this directory under the full-run root.
*
* @return a new incremental run path based on the current timestamp.
*/
public boolean hasIncrementalDir() throws IOException {
List<MatchResult> matches =
FileSystems.matchResources(Collections.singletonList(getIncrementalRunPath()));
MatchResult matchResult = Iterables.getOnlyElement(matches);
return matchResult.status() == Status.OK;
public ResourceId newIncrementalRunPath() {
return FileSystems.matchNewResource(getRoot(), true)
.resolve(
String.format("%s%s%s", INCREMENTAL_DIR, TIMESTAMP_PREFIX, safeTimestampSuffix()),
StandardResolveOptions.RESOLVE_DIRECTORY);
}

public Set<String> findNonEmptyResourceDirs() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;

import ca.uhn.fhir.context.FhirContext;
import com.google.api.services.storage.model.Objects;
Expand Down Expand Up @@ -92,7 +93,9 @@ public void newIncrementalRunPathTest() throws IOException {
Mockito.when(mockGcsUtil.getObjects(Mockito.anyList())).thenReturn(items);

ResourceId resourceId = dwhFiles.newIncrementalRunPath();
assertThat(resourceId.toString(), equalTo("gs://testbucket/testdirectory/incremental_run/"));
assertThat(
resourceId.toString(),
startsWith("gs://testbucket/testdirectory/incremental_run" + DwhFiles.TIMESTAMP_PREFIX));
}

@Test
Expand Down Expand Up @@ -244,4 +247,41 @@ private StorageObject createStorageObject(String gcsFilename, long fileSize) {
.setName(gcsPath.getObject())
.setSize(size);
}

@Test
public void testGetAllChildDirectoriesOneLevelDeep() throws IOException {
Objects modelObjects = new Objects();
List<StorageObject> items = new ArrayList<>();
// Files within the directory
items.add(
createStorageObject(
"gs://testbucket/testdirectory/Patient/patient.parquet", 1L /* fileSize */));
items.add(
createStorageObject(
"gs://testbucket/testdirectory/Observation/observation.parquet", 2L /* fileSize */));
// This is not returned in this case of GCS because there is no files right "under" TEST1.
// Note in GCS we do not have "directories", we are just simulating them by `/` separators.
items.add(
createStorageObject(
"gs://testbucket/testdirectory/TEST1/TEST2/file.txt", 2L /* fileSize */));
modelObjects.setItems(items);

Mockito.when(
mockGcsUtil.listObjects(
Mockito.eq("testbucket"), Mockito.anyString(), Mockito.isNull()))
.thenReturn(modelObjects);

Set<ResourceId> childDirectories =
DwhFiles.getAllChildDirectories("gs://testbucket/testdirectory");

assertThat(childDirectories.size(), equalTo(2));
assertThat(
childDirectories.contains(
FileSystems.matchNewResource("gs://testbucket/testdirectory/Patient", true)),
equalTo(true));
assertThat(
childDirectories.contains(
FileSystems.matchNewResource("gs://testbucket/testdirectory/Observation", true)),
equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;

import ca.uhn.fhir.context.FhirContext;
import com.google.common.io.Resources;
Expand All @@ -33,6 +34,8 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Assert;
Expand All @@ -54,12 +57,31 @@ public void getResourcePathTestWindows() {
assertThat(dwhFiles.getResourcePath("Patient").toString(), equalTo("C:\\tmp\\Patient\\"));
}

@Test
public void getIncrementalRunPathTest() throws IOException {
Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
DwhFiles instance = new DwhFiles("/tmp", FhirContext.forR4Cached());
ResourceId incrementalRunPath1 = instance.newIncrementalRunPath();
ResourceId file1 =
incrementalRunPath1.resolve("file1.txt", StandardResolveOptions.RESOLVE_FILE);
FileSystems.create(file1, "test");
ResourceId incrementalRunPath2 = instance.newIncrementalRunPath();
ResourceId file2 =
incrementalRunPath2.resolve("file2.txt", StandardResolveOptions.RESOLVE_FILE);
FileSystems.create(file2, "test");
// making sure that the last incremental path is returned
assertThat(
instance.getIncrementalRunPath().toString(), equalTo(incrementalRunPath2.toString()));
}

@Test
public void newIncrementalRunPathTestNonWindows() throws IOException {
Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
DwhFiles instance = new DwhFiles("/tmp", FhirContext.forR4Cached());
ResourceId incrementalRunPath = instance.newIncrementalRunPath();
assertThat(incrementalRunPath.toString(), equalTo("/tmp/incremental_run/"));
assertThat(
incrementalRunPath.toString(),
startsWith("/tmp/incremental_run" + DwhFiles.TIMESTAMP_PREFIX));
}

@Test
Expand Down Expand Up @@ -242,4 +264,34 @@ public void passWindowsLocalPathDwhRootPrefix_returnsFileSeparator() {
private void createFile(Path path, byte[] bytes) throws IOException {
Files.write(path, bytes);
}

@Test
public void testGetAllChildDirectoriesOneLevelDeep() throws IOException {
Path rootDir = Files.createTempDirectory("DWH_FILES_TEST");
Path childDir1 = Paths.get(rootDir.toString(), "childDir1");
Files.createDirectories(childDir1);
Path fileAtChildDir1 = Path.of(childDir1.toString(), "file1.txt");
createFile(fileAtChildDir1, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));
Path childDir2 = Paths.get(rootDir.toString(), "childDir2");
Files.createDirectories(childDir2);
Path fileAtChildDir2 = Path.of(childDir2.toString(), "file2.txt");
createFile(fileAtChildDir2, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));

// The following directory should not appear in the results of `getAllChildDirectories`
// because only dirs at one-level deep should be returned.
Path childDir21 = Paths.get(childDir2.toString(), "childDir21");
Files.createDirectories(childDir21);
Path fileAtChildDir21 = Path.of(childDir21.toString(), "file3.txt");
createFile(fileAtChildDir21, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));

Set<ResourceId> childDirectories = DwhFiles.getAllChildDirectories(rootDir.toString());

assertThat(childDirectories.size(), equalTo(2));
assertThat(
childDirectories.contains(FileSystems.matchNewResource(childDir1.toString(), true)),
equalTo(true));
assertThat(
childDirectories.contains(FileSystems.matchNewResource(childDir2.toString(), true)),
equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.base.Strings;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -53,8 +52,6 @@ public class DataProperties {

private static final Logger logger = LoggerFactory.getLogger(DataProperties.class.getName());

static final String TIMESTAMP_PREFIX = "_TIMESTAMP_";

private static final String GET_PREFIX = "get";

private static final Set<String> EXCLUDED_ARGS =
Expand Down Expand Up @@ -212,17 +209,16 @@ PipelineConfig createBatchOptions() {
}

// Using underscore for suffix as hyphens are discouraged in hive table names.
String timestampSuffix =
Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix);
String timestampSuffix = DwhFiles.safeTimestampSuffix();
options.setOutputParquetPath(dwhRootPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix);

PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options);

// Get hold of thrift server parquet directory from dwhRootPrefix config.
String thriftServerParquetPathPrefix =
dwhRootPrefix.substring(dwhRootPrefix.lastIndexOf("/") + 1, dwhRootPrefix.length());
pipelineConfigBuilder.thriftServerParquetPath(
thriftServerParquetPathPrefix + TIMESTAMP_PREFIX + timestampSuffix);
thriftServerParquetPathPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix);
pipelineConfigBuilder.timestampSuffix(timestampSuffix);

return pipelineConfigBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void purgeDwhFiles() {
try {
String prefix = getPrefix(dwhRootPrefix);
List<ResourceId> paths =
getAllChildDirectories(baseDir).stream()
DwhFiles.getAllChildDirectories(baseDir).stream()
.filter(dir -> dir.getFilename().startsWith(prefix))
.collect(Collectors.toList());

Expand Down Expand Up @@ -336,7 +336,7 @@ String getPrefix(String dwhRootPrefix) {
}

List<String> findExistingResources(String dwhRoot) throws IOException {
Set<ResourceId> childPaths = getAllChildDirectories(dwhRoot);
Set<ResourceId> childPaths = DwhFiles.getAllChildDirectories(dwhRoot);
Set<String> configuredSet =
new HashSet<>(Arrays.asList(dataProperties.getResourceList().split(",")));
return childPaths.stream()
Expand All @@ -345,42 +345,6 @@ List<String> findExistingResources(String dwhRoot) throws IOException {
.collect(Collectors.toList());
}

/**
* Returns all the child directories under the given base directory which are 1-level deep.
*
* @param baseDir
* @return The list of all child directories under the base directory
* @throws IOException
*/
Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException {
String fileSeparator = DwhFiles.getFileSeparatorForDwhFiles(baseDir);
// Avoid using ResourceId.resolve(..) method to resolve the files when the path contains glob
// expressions with multiple special characters like **, */* etc as this api only supports
// single special characters like `*` or `..`. Rather use the FileSystems.match(..) if the path
// contains glob expressions.
List<MatchResult> matchResultList =
FileSystems.match(
List.of(
DwhFiles.getPathEndingWithFileSeparator(baseDir, fileSeparator)
+ "*"
+ fileSeparator
+ "*"));
Set<ResourceId> childDirectories = new HashSet<>();
for (MatchResult matchResult : matchResultList) {
if (matchResult.status() == Status.OK && !matchResult.metadata().isEmpty()) {
for (Metadata metadata : matchResult.metadata()) {
childDirectories.add(metadata.resourceId().getCurrentDirectory());
}
} else if (matchResult.status() == Status.ERROR) {
String errorMessage = String.format("Error matching files under directory %s", baseDir);
logger.error(errorMessage);
throw new IOException(errorMessage);
}
}
logger.info("Child directories : {}", childDirectories);
return childDirectories;
}

private int getLastIndexOfSlash(String dwhRootPrefix) {
CloudPath cloudPath = DwhFiles.parsePath(dwhRootPrefix);
int index = -1;
Expand Down
Loading

0 comments on commit 464ab39

Please sign in to comment.