-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26825][SS] Fix temp checkpoint creation in cluster mode when default filesystem is not local. #23764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
when default filesystem is not local.
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath | ||
val tempDir = System.getProperty("java.io.tmpdir") | ||
val cpTempDir = new Path("file://" + tempDir + "/temporary-" | ||
+ UUID.randomUUID.toString).toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've intentionally not used toUri
here because of #23733.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think this handles cleaning up the tmp directory on shutdown which is already handled in Utils.createTempDir
. I assume here we want the checkpoint location to be on the local file system and not in HDFS, so why not just re-use the Utils.createTempDir and just prefix "file://" to the canonical path to fix the issues with resolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with @arunmahadevan for leveraging Utils.createTempDir
, but file://
should be added as prefix of root
parameter to Utils.createTempDir
to prevent choosing default schema of filesystem and finally shutdown hook to delete directory from wrong filesystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure adding "file://" prefix to root parameter would cause the canonical path to return the final path with the same prefix. The attempted resolution with HDFS Filesystem happens later so we may be ok to just prepend the "file://" to canonical path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure adding "file://" prefix to root parameter would cause the canonical path to return the final path with the same prefix.
That actually sounds serious to me and even sounds one reason to avoid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why Utils.createTempDir doesn't work here? What is the problem with mkdir being called twice?
Utils.createTempDir
would work but an optional fix not to do it twice. The main problem is filesystem changes all of a sudden when default fs is not local. Please see the added test.
Also I'm not sure I understand why you're not using
.toUri
.
scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path
scala> val f = new java.io.File("/tmp/chk %#chk")
f: java.io.File = /tmp/chk %#chk
scala> new Path(new Path(f.toURI()), "foo").toString
res0: String = file:/tmp/chk %#chk/foo
scala> new Path(new Path(f.toURI()), "foo").toUri.toString
res1: String = file:/tmp/chk%20%25%23chk/foo
I know it can happen rarely but since it's discovered and we know the solution it's better to prepare the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the "file:" scheme doesn't generally like "//".
Not sure what you mean here (agree that manual URI creation is not the most appropriate way but considering the mentioned URI resolution this seemed the most reasonable way).
scala> val cpTempDir = new Path("file://" + tempDir + "/temporary1")
cpTempDir: org.apache.hadoop.fs.Path = file:/var/folders/t_/w90m85fn2gjb1v0c24wrfrxm0000gp/T/temporary1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There have been bugs in the past caused by the host part (the "//") being included in file URIs. It's generally better to not add it since I don't believe it's even part of the respective RFC (although I didn't bother to go look).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point and agree. Let's wait to see the approach and together with that I'll modify this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaborgsomogyi , me confused , how to overwrite this Utils.createTempDir in spark-streaming code which uses spark-sql.2.4.1 version ?
Test build #102253 has finished for PR 23764 at commit
|
Could you rebase with current master? The code change itself looks good. |
@HeartSaVioR thanks for reviewing it, I've merged the master. |
retest this please |
Test build #102592 has finished for PR 23764 at commit
|
It would be ideal to have UT preventing regression, but not sure whether it is easy to craft. Could you have a quick look whether we can add it or not? |
Before the PR I've checked the possibilities but couldn't come up something which can be added as UT. What I was trying is to set default FS to something which is not |
OK, I'll play with the PR when I get time. If I can achieve before merging this PR, I'll share the code. IMHO it doesn't need to be a blocker for merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Test build #102635 has finished for PR 23764 at commit
|
@gaborgsomogyi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we want the tmp dir to be local, could just re-use the Utils.createTempDir
to create the tmp checkpoint directory instead of a separate logic.
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath | ||
val tempDir = System.getProperty("java.io.tmpdir") | ||
val cpTempDir = new Path("file://" + tempDir + "/temporary-" | ||
+ UUID.randomUUID.toString).toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think this handles cleaning up the tmp directory on shutdown which is already handled in Utils.createTempDir
. I assume here we want the checkpoint location to be on the local file system and not in HDFS, so why not just re-use the Utils.createTempDir and just prefix "file://" to the canonical path to fix the issues with resolution.
|
@HeartSaVioR not yet checked all the details of your test but high level rocks. Many thanks investing your time! I think it would be better put it into this PR in order to help committers in the evaluation. Hope I can help you back the same way... |
@gaborgsomogyi Hey no worries! Happy to help. :) |
@gaborgsomogyi thanks for the clarification, the |
Add UT on checking temporary checkpoint creation in local filesystem
@HeartSaVioR I've taken over your suggested test and made some minor modifications. Feel free to comment on this as well. |
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) | ||
assert(fs.getScheme === "file") | ||
assert(fs.exists(checkpointDir)) | ||
assert(MkdirRecordingFileSystem.requests === 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to have verifying with checkpointDir!
Btw, if we know the path of checkpointDir, we can record actual path of request in MkdirRecordingFileSystem (like before) and let them just created, and find whether recorded directories have checkpointDir. This would make test still pass when Spark will be changed to create multiple directories including checkpointDir.
This change may disable the impact of fs.exist(checkpointDir)
, but we can determine which filesystem creates that directory so IMHO it's not a big deal.
|
||
class StreamingCheckpointSuite extends SparkFunSuite with LocalSparkSession { | ||
|
||
test("temp checkpoint dir should stay local even if default filesystem is not local") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess having SPARK-26825 as prefix helps to the future readers why the change was necessary. Please ignore if test name is too long to add it.
Do queries actually break without this change, or is the checkpoint just located in a different place? I think I'm missing context, because it seems correct for a temporary checkpoint to use the default filesystem instead of |
Hmm... I might be wrong, I had to take a deep look at origin issue. I agree with @jose-torres statement. We may need to re-analyze origin issue SPARK-26825 again - especially why file/directory creation fails in cluster mode. If temporary directory is correctly given, as it is running on YARN container, I would suspect why it fails to make a change on temporary directory.
Could we find any weird thing here? |
Test build #102799 has finished for PR 23764 at commit
|
Yes, it fails to start. Additionally having an implementation where
is wrong in general. |
I'm not 100% sure which is right, but if we have to force creating temp checkpoint directory in local filesystem, we may need to know available temp directory of local filesystem instead of HDFS path what YARN provides us, because I'm not sure below directory can be created in local filesystem by account running Spark (this is picked up from tmp dir in cluster mode):
|
What do you think @jose-torres ? |
@HeartSaVioR |
If the suggestion is to create a temp dir on default FS (HDFS for instance) then a mechanism should be suggested to find a proper temp directory which I'm not aware of. |
My bad. Now I realized the temporary directory YARN provides for container is local filesystem, not HDFS. If my understanding is right, then LGTM on the patch.
Same understanding as of now. |
cc @zsxwing since Jose not responding. |
My concern is that this change could break other scenarios that currently work. For example, I don't think stateful operations will work properly with a local FS checkpoint; executors won't be able to see the same checkpoint data. |
We're only dealing with temporary checkpoint here, so while I can see what @jose-torres mentioned, it is unlikely to happen in happy case and I haven't seen cache miss from running query. That could still happen in bad case like if some executors are becoming bad. Btw, may be better to document somewhere that the checkpoint should be accessible from executors if query is stateful? I guess that's only thing where non-driver reads and writes into checkpoint directory, but I might be missing something. Maybe better to also mention the statement means end users should point to remote storage unless they run all executors as well as driver in local machine. |
@jose-torres Thank you for your time and good point. Not considered this scenario and agree that it may happen when executor becoming bad. I've double checked the sinks where temporary checkpoints are created and none of them guarantee fault tolerance. This change still fulfills this. On the other hand if you still has concerns we can change the approach to create the temp checkpoint on default FS but then I would document that the directory configured by @HeartSaVioR I agree with you and I would write out a warning when checkpoint (not just temp) is created on local FS (a good example is what Jose mentioned). |
@zsxwing what do you think? |
ping @jose-torres |
Can one of the admins verify this patch? |
Having another view on this I think this has to be solved in a different way so closing. |
What changes were proposed in this pull request?
There are situations where temporary checkpoint directory created by Spark. One example when one uses console sink. Such cases in the actual implementation
StreamingQueryManager
creates directory withUtils.createTempDir
which will be passed to the appropriateStreamExecution
.StreamExecution
then does the following:The problem comes when resolving happens. The
StreamingQueryManager
provided path doesn't containfile://
scheme and because of this from local filesystem it can switch to HDFS for example (such case HDFS is the default filesystem).In this PR I've added the following changes:
StreamExecution
file://
scheme added to the directoryHow was this patch tested?
Existing unit tests + started a query in client/cluster mode.