Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: Add check to create staging directory if not exists for S3OutputStream #3175

Conversation

rajarshisarkar
Copy link
Contributor

@rajarshisarkar rajarshisarkar commented Sep 24, 2021

Fix for #2991

Seems like for EMR 6.x (Spark 3.x) Iceberg attempts to write the temp files (S3FileIO Reference) before the tmp directory is created in some cases. This results in java.io.IOException: No such file or directory exception.

The tmp files look something like this by default (here System.getProperty("java.io.tmpdir") is /mnt1/yarn/usercache/hadoop/appcache/application_1632368478936_0042/container_1632368478936_0042_01_000001/tmp):

/mnt1/yarn/usercache/hadoop/appcache/application_1632368478936_0042/container_1632368478936_0042_01_000001/tmp/s3fileio-2279038237918274355.tmp
/mnt1/yarn/usercache/hadoop/appcache/application_1632368478936_0042/container_1632368478936_0042_01_000001/tmp/s3fileio-2279038237918274355.tmp
/mnt1/yarn/usercache/hadoop/appcache/application_1632368478936_0042/container_1632368478936_0042_01_000001/tmp/s3fileio-2279038237918274355.tmp

(or) something like this if s3fileIoStagingDirectory is set via s3.staging-dir (/mnt/tmp for this case):

/mnt/tmp/s3fileio-6715538145691725671.tmp
/mnt/tmp/s3fileio-5266666342669697861.tmp
/mnt/tmp/s3fileio-4592572815837116260.tmp

Adding a defensive check to ensure we check the existence of the staging directory before writing the temp files. With this check I could see the executor tasks succeeding in cluster deploy mode consistently.

Configurations for testing:

  • EMR 6.4.0, Iceberg 0.12.0, Master node: 1 instance, Core node: 10 instances, Spark 3.1.2
  • EMR 6.2.0, Iceberg 0.11.1, Master node: 1 instance, Core node: 10 instances, Spark 3.0.1

Final Status of jobs deployed in cluster mode:

Screenshot 2021-09-27 at 4 54 27 PM

@rajarshisarkar
Copy link
Contributor Author

@thaismonti1912 @alex-shchetkov @fcvr1010 @jackye1995 Can you please review the PR.

@@ -175,8 +175,15 @@ private void newStream() throws IOException {
stream.close();
}

boolean createStagingDirectory = false;
if (!stagingDirectory.exists()) {
createStagingDirectory = stagingDirectory.mkdirs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on the documentation, this can also throw SecurityException that needs to be handled. We should also add some logging if staging directory is created, and if creation failed for some reason.

Copy link
Contributor Author

@rajarshisarkar rajarshisarkar Sep 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled SecurityException and added loggers for success scenario. Throwing the exception in case of failure scenario.

boolean createStagingDirectory = false;
if (!stagingDirectory.exists()) {
createStagingDirectory = stagingDirectory.mkdirs();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline needed after a control statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Also, separated out the logic in a method: createStagingDirectoryIfNotExists()

currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
currentStagingFile.deleteOnExit();
if (createStagingDirectory) {
stagingDirectory.deleteOnExit();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline needed after a control statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Also, separated out the logic in a method: createStagingDirectoryIfNotExists()

currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
currentStagingFile.deleteOnExit();
if (createStagingDirectory) {
stagingDirectory.deleteOnExit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to delete on exit? I think we can assume it's safe to keep the staging directory there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should be safe. Made the changes accordingly.

@jackye1995
Copy link
Contributor

Thanks for working on this! Can you also add a test case for it?

@github-actions github-actions bot added the API label Sep 27, 2021
@rajarshisarkar
Copy link
Contributor Author

Thanks for working on this! Can you also add a test case for it?

Added the test case testStagingDirectoryCreation() in S3OutputStreamTest

boolean createdStagingDirectory = stagingDirectory.mkdirs();
if (createdStagingDirectory) {
LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else, add a LOG saying "Staging directory {} creation failed, or it is created by another process"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we can do better than just logging. When creation fails, we can check directory existence again, if it exists then it's created by another process, otherwise it's still an issue, and we can throw an IOException with error message indicating staging directory creation fails for some unknown reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I have made the changes accordingly.

@@ -328,6 +330,15 @@ private static InputStream uncheckedInputStream(File file) {
}
}

private void createStagingDirectoryIfNotExists() throws SecurityException {
if (!stagingDirectory.exists()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a LOG saying something like "staging directoy {} not exist, trying to create one"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the logger.

@@ -170,11 +171,12 @@ public void write(byte[] b, int off, int len) throws IOException {
}
}

private void newStream() throws IOException {
private void newStream() throws IOException, SecurityException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the back and forth, I read the documentation for mkdirs, it seems like SecurityException only catches JVM level permission and it might still just return false for OS level permission failure, so it's hard to have a consistent behavior for error handling. Because of that, plus the fact that SecurityException is a runtime exception, I think we can remove the special handling of it and just let it throw to the top level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the special handling for SecurityException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, can you also remove the SecurityException after throws? We don't need to throw runtime exception explicitly, just need to document it at top level which you already did.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, need not to throw explicitly. Removed it.

@@ -62,6 +63,9 @@ public PositionOutputStream createOrOverwrite() {
return new S3OutputStream(client(), uri(), awsProperties());
} catch (IOException e) {
throw new UncheckedIOException("Failed to create output stream for location: " + uri(), e);
} catch (SecurityException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the block below, we can remove this catch. AccessControlException also hides the original exception, so better to not use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the special handling for SecurityException.

@@ -48,6 +49,7 @@
*
* @return an output stream that can report its position
* @throws RuntimeIOException If the implementation throws an {@link IOException}
* @throws AccessControlException If the implementation throws an {@link SecurityException}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the block below, we can just document @throws SecurityException if staging directory creation fails due to missing JVM level permission.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the changes accordingly.

LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath());
} else {
if (stagingDirectory.exists()) {
LOG.info("Staging directory: {} is created by another process",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: match the log message at L339, "Successfully created staging directory by another process: {}"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change.

stagingDirectory.getAbsolutePath());
} else {
throw new IOException(String
.format("Staging directory: %s creation failed due to some unknown reason",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: match the log message at L346, "Fail to create staging directory due to some unknown reason: {}"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change.

@@ -86,7 +86,8 @@
private boolean closed = false;

@SuppressWarnings("StaticAssignmentInConstructor")
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) throws IOException {
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties)
throws IOException, SecurityException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as below, remove SecurityException after throws

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

@jackye1995
Copy link
Contributor

Thank you for the work, looks good to me overall, just some styling changes then it should be good to go!

@jackye1995 jackye1995 merged commit 1fde388 into apache:master Sep 28, 2021
@kbendick kbendick added this to the Java 0.12.1 Release milestone Oct 26, 2021
kbendick pushed a commit to kbendick/iceberg that referenced this pull request Oct 27, 2021
kbendick pushed a commit to kbendick/iceberg that referenced this pull request Oct 27, 2021
izchen pushed a commit to izchen/iceberg that referenced this pull request Dec 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants