Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed an edge case for incremental run failures on cloud file-systems #1185

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class DwhFiles {

private static final String INCREMENTAL_DIR = "incremental_run";

static final String TIMESTAMP_PREFIX = "_TIMESTAMP_";

// TODO: It is probably better if we build all DWH files related operations using Beam's
// filesystem API such that when a new filesystem is registered, it automatically works
// everywhere in our code. Note that currently we have hardcoded the valid schema in some places,
Expand Down Expand Up @@ -121,6 +124,10 @@ static DwhFiles forRoot(String dwhRoot, FhirContext fhirContext) {
return new DwhFiles(dwhRoot, fhirContext);
}

public static String safeTimestampSuffix() {
return Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
}

public String getRoot() {
return dwhRoot;
}
Expand All @@ -145,50 +152,76 @@ public String getFilePattern(String resourceType) {
"%s*%s", getResourcePath(resourceType).toString(), ParquetUtil.PARQUET_EXTENSION);
}

// TODO: Move this to a util class and make it non-static.
/**
* This returns the default incremental run path; each incremental run is relative to a full path,
* hence we put this directory under the full-run root.
* Returns all the child directories under the given base directory which are 1-level deep. Note
* in many cloud/distributed file-systems, we do not have "directories"; there are only buckets
* and files in those buckets. We use file-seprators (e.g., `/`) to simulate the concept of
* directories. So for example, this method returns an empty set if `baseDir` is `bucket/test` and
* the only file in that bucket is `bucket/test/dir1/dir2/file.txt`. If `baseDir` is
* `bucket/test/dir1`, in the above example, `dir2` is returned.
*
* @return the default incremental run path
* @param baseDir the path under which "directories" are looked for.
* @return The list of all child directories under the base directory
* @throws IOException
*/
public ResourceId getIncrementalRunPath() {
return FileSystems.matchNewResource(getRoot(), true)
.resolve(INCREMENTAL_DIR, StandardResolveOptions.RESOLVE_DIRECTORY);
}

/** This is used when we want to keep a backup of the old incremental run output. */
public ResourceId getIncrementalRunPathWithTimestamp() {
return FileSystems.matchNewResource(getRoot(), true)
.resolve(
String.format("%s_old_%d", INCREMENTAL_DIR, System.currentTimeMillis()),
StandardResolveOptions.RESOLVE_DIRECTORY);
static Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException {
String fileSeparator = getFileSeparatorForDwhFiles(baseDir);
// Avoid using ResourceId.resolve(..) method to resolve the files when the path contains glob
// expressions with multiple special characters like **, */* etc as this api only supports
// single special characters like `*` or `..`. Rather use the FileSystems.match(..) if the path
// contains glob expressions.
List<MatchResult> matchResultList =
FileSystems.match(
List.of(
getPathEndingWithFileSeparator(baseDir, fileSeparator)
+ "*"
+ fileSeparator
+ "*"));
Set<ResourceId> childDirectories = new HashSet<>();
for (MatchResult matchResult : matchResultList) {
if (matchResult.status() == Status.OK && !matchResult.metadata().isEmpty()) {
for (Metadata metadata : matchResult.metadata()) {
childDirectories.add(metadata.resourceId().getCurrentDirectory());
}
} else if (matchResult.status() == Status.ERROR) {
String errorMessage = String.format("Error matching files under directory %s", baseDir);
log.error(errorMessage);
throw new IOException(errorMessage);
}
}
log.info("Child directories of {} are {}", baseDir, childDirectories);
return childDirectories;
}

/**
* Similar to {@link #getIncrementalRunPath} but also checks if that directory exists and if so,
* moves it to {@link #getIncrementalRunPathWithTimestamp()}.
* Also see {@link #newIncrementalRunPath()}
*
* @return same as {@link #getIncrementalRunPath()}
* @throws IOException if the directory move fails
* @return the current incremental run path if one found; null otherwise.
*/
public ResourceId newIncrementalRunPath() throws IOException {
ResourceId incPath = getIncrementalRunPath();
if (hasIncrementalDir()) {
ResourceId movePath = getIncrementalRunPathWithTimestamp();
log.info("Moving the old {} directory to {}", INCREMENTAL_DIR, movePath);
FileSystems.rename(Collections.singletonList(incPath), Collections.singletonList(movePath));
}
return incPath;
@Nullable
public ResourceId getLatestIncrementalRunPath() throws IOException {
List<ResourceId> dirs =
getAllChildDirectories(getRoot()).stream()
.filter(dir -> dir.getFilename().contains(INCREMENTAL_DIR + TIMESTAMP_PREFIX))
.collect(Collectors.toList());
if (dirs.isEmpty()) return null;

Collections.sort(dirs, Comparator.comparing(ResourceId::toString));
return dirs.get(dirs.size() - 1);
}

/**
* @return true iff there is already an incremental run subdirectory in this DWH.
* This returns a new incremental-run path based on the current timestamp. Note that each
* incremental-run is relative to a full-run, hence we put this directory under the full-run root.
*
* @return a new incremental run path based on the current timestamp.
*/
public boolean hasIncrementalDir() throws IOException {
List<MatchResult> matches =
FileSystems.matchResources(Collections.singletonList(getIncrementalRunPath()));
MatchResult matchResult = Iterables.getOnlyElement(matches);
return matchResult.status() == Status.OK;
public ResourceId newIncrementalRunPath() {
return FileSystems.matchNewResource(getRoot(), true)
.resolve(
String.format("%s%s%s", INCREMENTAL_DIR, TIMESTAMP_PREFIX, safeTimestampSuffix()),
StandardResolveOptions.RESOLVE_DIRECTORY);
}

public Set<String> findNonEmptyResourceDirs() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;

import ca.uhn.fhir.context.FhirContext;
import com.google.api.services.storage.model.Objects;
Expand Down Expand Up @@ -92,7 +93,9 @@ public void newIncrementalRunPathTest() throws IOException {
Mockito.when(mockGcsUtil.getObjects(Mockito.anyList())).thenReturn(items);

ResourceId resourceId = dwhFiles.newIncrementalRunPath();
assertThat(resourceId.toString(), equalTo("gs://testbucket/testdirectory/incremental_run/"));
assertThat(
resourceId.toString(),
startsWith("gs://testbucket/testdirectory/incremental_run" + DwhFiles.TIMESTAMP_PREFIX));
}

@Test
Expand Down Expand Up @@ -244,4 +247,41 @@ private StorageObject createStorageObject(String gcsFilename, long fileSize) {
.setName(gcsPath.getObject())
.setSize(size);
}

@Test
public void testGetAllChildDirectoriesOneLevelDeep() throws IOException {
Objects modelObjects = new Objects();
List<StorageObject> items = new ArrayList<>();
// Files within the directory
items.add(
createStorageObject(
"gs://testbucket/testdirectory/Patient/patient.parquet", 1L /* fileSize */));
items.add(
createStorageObject(
"gs://testbucket/testdirectory/Observation/observation.parquet", 2L /* fileSize */));
// This is not returned in this case of GCS because there is no files right "under" TEST1.
// Note in GCS we do not have "directories", we are just simulating them by `/` separators.
items.add(
createStorageObject(
"gs://testbucket/testdirectory/TEST1/TEST2/file.txt", 2L /* fileSize */));
modelObjects.setItems(items);

Mockito.when(
mockGcsUtil.listObjects(
Mockito.eq("testbucket"), Mockito.anyString(), Mockito.isNull()))
.thenReturn(modelObjects);

Set<ResourceId> childDirectories =
DwhFiles.getAllChildDirectories("gs://testbucket/testdirectory");

assertThat(childDirectories.size(), equalTo(2));
assertThat(
childDirectories.contains(
FileSystems.matchNewResource("gs://testbucket/testdirectory/Patient", true)),
equalTo(true));
assertThat(
childDirectories.contains(
FileSystems.matchNewResource("gs://testbucket/testdirectory/Observation", true)),
equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;

import ca.uhn.fhir.context.FhirContext;
import com.google.common.io.Resources;
Expand All @@ -33,6 +34,8 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Assert;
Expand All @@ -54,12 +57,31 @@ public void getResourcePathTestWindows() {
assertThat(dwhFiles.getResourcePath("Patient").toString(), equalTo("C:\\tmp\\Patient\\"));
}

@Test
public void getIncrementalRunPathTest() throws IOException {
Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
DwhFiles instance = new DwhFiles("/tmp", FhirContext.forR4Cached());
ResourceId incrementalRunPath1 = instance.newIncrementalRunPath();
ResourceId file1 =
incrementalRunPath1.resolve("file1.txt", StandardResolveOptions.RESOLVE_FILE);
FileSystems.create(file1, "test");
ResourceId incrementalRunPath2 = instance.newIncrementalRunPath();
ResourceId file2 =
incrementalRunPath2.resolve("file2.txt", StandardResolveOptions.RESOLVE_FILE);
FileSystems.create(file2, "test");
// making sure that the last incremental path is returned
assertThat(
instance.getLatestIncrementalRunPath().toString(), equalTo(incrementalRunPath2.toString()));
}

@Test
public void newIncrementalRunPathTestNonWindows() throws IOException {
Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
DwhFiles instance = new DwhFiles("/tmp", FhirContext.forR4Cached());
ResourceId incrementalRunPath = instance.newIncrementalRunPath();
assertThat(incrementalRunPath.toString(), equalTo("/tmp/incremental_run/"));
assertThat(
incrementalRunPath.toString(),
startsWith("/tmp/incremental_run" + DwhFiles.TIMESTAMP_PREFIX));
}

@Test
Expand Down Expand Up @@ -242,4 +264,34 @@ public void passWindowsLocalPathDwhRootPrefix_returnsFileSeparator() {
private void createFile(Path path, byte[] bytes) throws IOException {
Files.write(path, bytes);
}

@Test
public void testGetAllChildDirectoriesOneLevelDeep() throws IOException {
Path rootDir = Files.createTempDirectory("DWH_FILES_TEST");
Path childDir1 = Paths.get(rootDir.toString(), "childDir1");
Files.createDirectories(childDir1);
Path fileAtChildDir1 = Path.of(childDir1.toString(), "file1.txt");
createFile(fileAtChildDir1, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));
Path childDir2 = Paths.get(rootDir.toString(), "childDir2");
Files.createDirectories(childDir2);
Path fileAtChildDir2 = Path.of(childDir2.toString(), "file2.txt");
createFile(fileAtChildDir2, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));

// The following directory should not appear in the results of `getAllChildDirectories`
// because only dirs at one-level deep should be returned.
Path childDir21 = Paths.get(childDir2.toString(), "childDir21");
Files.createDirectories(childDir21);
Path fileAtChildDir21 = Path.of(childDir21.toString(), "file3.txt");
createFile(fileAtChildDir21, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));

Set<ResourceId> childDirectories = DwhFiles.getAllChildDirectories(rootDir.toString());

assertThat(childDirectories.size(), equalTo(2));
assertThat(
childDirectories.contains(FileSystems.matchNewResource(childDir1.toString(), true)),
equalTo(true));
assertThat(
childDirectories.contains(FileSystems.matchNewResource(childDir2.toString(), true)),
equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.base.Strings;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -53,8 +52,6 @@ public class DataProperties {

private static final Logger logger = LoggerFactory.getLogger(DataProperties.class.getName());

static final String TIMESTAMP_PREFIX = "_TIMESTAMP_";

private static final String GET_PREFIX = "get";

private static final Set<String> EXCLUDED_ARGS =
Expand Down Expand Up @@ -212,17 +209,16 @@ PipelineConfig createBatchOptions() {
}

// Using underscore for suffix as hyphens are discouraged in hive table names.
String timestampSuffix =
Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix);
String timestampSuffix = DwhFiles.safeTimestampSuffix();
options.setOutputParquetPath(dwhRootPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix);

PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options);

// Get hold of thrift server parquet directory from dwhRootPrefix config.
String thriftServerParquetPathPrefix =
dwhRootPrefix.substring(dwhRootPrefix.lastIndexOf("/") + 1, dwhRootPrefix.length());
pipelineConfigBuilder.thriftServerParquetPath(
thriftServerParquetPathPrefix + TIMESTAMP_PREFIX + timestampSuffix);
thriftServerParquetPathPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix);
pipelineConfigBuilder.timestampSuffix(timestampSuffix);

return pipelineConfigBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void purgeDwhFiles() {
try {
String prefix = getPrefix(dwhRootPrefix);
List<ResourceId> paths =
getAllChildDirectories(baseDir).stream()
DwhFiles.getAllChildDirectories(baseDir).stream()
.filter(dir -> dir.getFilename().startsWith(prefix))
.collect(Collectors.toList());

Expand Down Expand Up @@ -336,7 +336,7 @@ String getPrefix(String dwhRootPrefix) {
}

List<String> findExistingResources(String dwhRoot) throws IOException {
Set<ResourceId> childPaths = getAllChildDirectories(dwhRoot);
Set<ResourceId> childPaths = DwhFiles.getAllChildDirectories(dwhRoot);
Set<String> configuredSet =
new HashSet<>(Arrays.asList(dataProperties.getResourceList().split(",")));
return childPaths.stream()
Expand All @@ -345,42 +345,6 @@ List<String> findExistingResources(String dwhRoot) throws IOException {
.collect(Collectors.toList());
}

/**
* Returns all the child directories under the given base directory which are 1-level deep.
*
* @param baseDir
* @return The list of all child directories under the base directory
* @throws IOException
*/
Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException {
String fileSeparator = DwhFiles.getFileSeparatorForDwhFiles(baseDir);
// Avoid using ResourceId.resolve(..) method to resolve the files when the path contains glob
// expressions with multiple special characters like **, */* etc as this api only supports
// single special characters like `*` or `..`. Rather use the FileSystems.match(..) if the path
// contains glob expressions.
List<MatchResult> matchResultList =
FileSystems.match(
List.of(
DwhFiles.getPathEndingWithFileSeparator(baseDir, fileSeparator)
+ "*"
+ fileSeparator
+ "*"));
Set<ResourceId> childDirectories = new HashSet<>();
for (MatchResult matchResult : matchResultList) {
if (matchResult.status() == Status.OK && !matchResult.metadata().isEmpty()) {
for (Metadata metadata : matchResult.metadata()) {
childDirectories.add(metadata.resourceId().getCurrentDirectory());
}
} else if (matchResult.status() == Status.ERROR) {
String errorMessage = String.format("Error matching files under directory %s", baseDir);
logger.error(errorMessage);
throw new IOException(errorMessage);
}
}
logger.info("Child directories : {}", childDirectories);
return childDirectories;
}

private int getLastIndexOfSlash(String dwhRootPrefix) {
CloudPath cloudPath = DwhFiles.parsePath(dwhRootPrefix);
int index = -1;
Expand Down
Loading
Loading