-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS: Add check to create staging directory if not exists for S3OutputStream #3175
AWS: Add check to create staging directory if not exists for S3OutputStream #3175
Conversation
@thaismonti1912 @alex-shchetkov @fcvr1010 @jackye1995 Can you please review the PR. |
@@ -175,8 +175,15 @@ private void newStream() throws IOException { | |||
stream.close(); | |||
} | |||
|
|||
boolean createStagingDirectory = false; | |||
if (!stagingDirectory.exists()) { | |||
createStagingDirectory = stagingDirectory.mkdirs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on the documentation, this can also throw SecurityException
that needs to be handled. We should also add some logging if staging directory is created, and if creation failed for some reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled SecurityException
and added loggers for success scenario. Throwing the exception in case of failure scenario.
boolean createStagingDirectory = false; | ||
if (!stagingDirectory.exists()) { | ||
createStagingDirectory = stagingDirectory.mkdirs(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline needed after a control statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed. Also, separated out the logic in a method: createStagingDirectoryIfNotExists()
currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory); | ||
currentStagingFile.deleteOnExit(); | ||
if (createStagingDirectory) { | ||
stagingDirectory.deleteOnExit(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline needed after a control statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed. Also, separated out the logic in a method: createStagingDirectoryIfNotExists()
currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory); | ||
currentStagingFile.deleteOnExit(); | ||
if (createStagingDirectory) { | ||
stagingDirectory.deleteOnExit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to delete on exit? I think we can assume it's safe to keep the staging directory there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it should be safe. Made the changes accordingly.
Thanks for working on this! Can you also add a test case for it? |
Added the test case |
boolean createdStagingDirectory = stagingDirectory.mkdirs(); | ||
if (createdStagingDirectory) { | ||
LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else, add a LOG saying "Staging directory {} creation failed, or it is created by another process"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we can do better than just logging. When creation fails, we can check directory existence again, if it exists then it's created by another process, otherwise it's still an issue, and we can throw an IOException with error message indicating staging directory creation fails for some unknown reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I have made the changes accordingly.
@@ -328,6 +330,15 @@ private static InputStream uncheckedInputStream(File file) { | |||
} | |||
} | |||
|
|||
private void createStagingDirectoryIfNotExists() throws SecurityException { | |||
if (!stagingDirectory.exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a LOG saying something like "staging directoy {} not exist, trying to create one"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the logger.
@@ -170,11 +171,12 @@ public void write(byte[] b, int off, int len) throws IOException { | |||
} | |||
} | |||
|
|||
private void newStream() throws IOException { | |||
private void newStream() throws IOException, SecurityException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the back and forth, I read the documentation for mkdirs, it seems like SecurityException only catches JVM level permission and it might still just return false for OS level permission failure, so it's hard to have a consistent behavior for error handling. Because of that, plus the fact that SecurityException is a runtime exception, I think we can remove the special handling of it and just let it throw to the top level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the special handling for SecurityException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, can you also remove the SecurityException
after throws
? We don't need to throw runtime exception explicitly, just need to document it at top level which you already did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, need not to throw explicitly. Removed it.
@@ -62,6 +63,9 @@ public PositionOutputStream createOrOverwrite() { | |||
return new S3OutputStream(client(), uri(), awsProperties()); | |||
} catch (IOException e) { | |||
throw new UncheckedIOException("Failed to create output stream for location: " + uri(), e); | |||
} catch (SecurityException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed in the block below, we can remove this catch. AccessControlException also hides the original exception, so better to not use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the special handling for SecurityException
.
@@ -48,6 +49,7 @@ | |||
* | |||
* @return an output stream that can report its position | |||
* @throws RuntimeIOException If the implementation throws an {@link IOException} | |||
* @throws AccessControlException If the implementation throws an {@link SecurityException} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed in the block below, we can just document @throws SecurityException if staging directory creation fails due to missing JVM level permission.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the changes accordingly.
LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath()); | ||
} else { | ||
if (stagingDirectory.exists()) { | ||
LOG.info("Staging directory: {} is created by another process", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: match the log message at L339, "Successfully created staging directory by another process: {}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change.
stagingDirectory.getAbsolutePath()); | ||
} else { | ||
throw new IOException(String | ||
.format("Staging directory: %s creation failed due to some unknown reason", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: match the log message at L346, "Fail to create staging directory due to some unknown reason: {}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change.
@@ -86,7 +86,8 @@ | |||
private boolean closed = false; | |||
|
|||
@SuppressWarnings("StaticAssignmentInConstructor") | |||
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) throws IOException { | |||
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) | |||
throws IOException, SecurityException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as below, remove SecurityException after throws
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
Thank you for the work, looks good to me overall, just some styling changes then it should be good to go! |
Fix for #2991
Seems like for EMR 6.x (Spark 3.x) Iceberg attempts to write the temp files (S3FileIO Reference) before the
tmp
directory is created in some cases. This results injava.io.IOException: No such file or directory
exception.The
tmp
files look something like this by default (hereSystem.getProperty("java.io.tmpdir")
is/mnt1/yarn/usercache/hadoop/appcache/application_1632368478936_0042/container_1632368478936_0042_01_000001/tmp
):(or) something like this if
s3fileIoStagingDirectory
is set vias3.staging-dir
(/mnt/tmp
for this case):Adding a defensive check to ensure we check the existence of the staging directory before writing the temp files. With this check I could see the executor tasks succeeding in
cluster
deploy mode consistently.Configurations for testing:
Final Status of jobs deployed in cluster mode: