Skip to content

[SPARK-26825][SS] Fix temp checkpoint creation in cluster mode when default filesystem is not local. #23764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

gaborgsomogyi
Copy link
Contributor

What changes were proposed in this pull request?

There are situations where temporary checkpoint directory created by Spark. One example when one uses console sink. Such cases in the actual implementation StreamingQueryManager creates directory with Utils.createTempDir which will be passed to the appropriate StreamExecution. StreamExecution then does the following:

  • Creates the directory again
  • Resolves the provided directory

The problem comes when resolving happens. The StreamingQueryManager provided path doesn't contain file:// scheme and because of this from local filesystem it can switch to HDFS for example (such case HDFS is the default filesystem).

In this PR I've added the following changes:

  • Creating the directory only in StreamExecution
  • file:// scheme added to the directory
  • As it was not clear that the checkpoint directory was not created because of permission issues I've added an exception when checkpoint directory doesn't exist and creation is not successful

How was this patch tested?

Existing unit tests + started a query in client/cluster mode.

val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
val tempDir = System.getProperty("java.io.tmpdir")
val cpTempDir = new Path("file://" + tempDir + "/temporary-"
+ UUID.randomUUID.toString).toString
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've intentionally not used toUri here because of #23733.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this handles cleaning up the tmp directory on shutdown which is already handled in Utils.createTempDir. I assume here we want the checkpoint location to be on the local file system and not in HDFS, so why not just re-use the Utils.createTempDir and just prefix "file://" to the canonical path to fix the issues with resolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @arunmahadevan for leveraging Utils.createTempDir, but file:// should be added as prefix of root parameter to Utils.createTempDir to prevent choosing default schema of filesystem and finally shutdown hook to delete directory from wrong filesystem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure adding "file://" prefix to root parameter would cause the canonical path to return the final path with the same prefix. The attempted resolution with HDFS Filesystem happens later so we may be ok to just prepend the "file://" to canonical path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure adding "file://" prefix to root parameter would cause the canonical path to return the final path with the same prefix.

That actually sounds serious to me and even sounds one reason to avoid.

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why Utils.createTempDir doesn't work here? What is the problem with mkdir being called twice?

Utils.createTempDir would work but an optional fix not to do it twice. The main problem is filesystem changes all of a sudden when default fs is not local. Please see the added test.

Also I'm not sure I understand why you're not using .toUri.

scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path

scala> val f = new java.io.File("/tmp/chk %#chk")
f: java.io.File = /tmp/chk %#chk

scala> new Path(new Path(f.toURI()), "foo").toString
res0: String = file:/tmp/chk %#chk/foo

scala> new Path(new Path(f.toURI()), "foo").toUri.toString
res1: String = file:/tmp/chk%20%25%23chk/foo

I know it can happen rarely but since it's discovered and we know the solution it's better to prepare the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the "file:" scheme doesn't generally like "//".

Not sure what you mean here (agree that manual URI creation is not the most appropriate way but considering the mentioned URI resolution this seemed the most reasonable way).

scala> val cpTempDir = new Path("file://" + tempDir + "/temporary1")
cpTempDir: org.apache.hadoop.fs.Path = file:/var/folders/t_/w90m85fn2gjb1v0c24wrfrxm0000gp/T/temporary1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There have been bugs in the past caused by the host part (the "//") being included in file URIs. It's generally better to not add it since I don't believe it's even part of the respective RFC (although I didn't bother to go look).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point and agree. Let's wait to see the approach and together with that I'll modify this as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi , me confused , how to overwrite this Utils.createTempDir in spark-streaming code which uses spark-sql.2.4.1 version ?

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102253 has finished for PR 23764 at commit 9d40037.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

cc @jose-torres @dongjoon-hyun

@HeartSaVioR
Copy link
Contributor

Could you rebase with current master? The code change itself looks good.

@gaborgsomogyi
Copy link
Contributor Author

@HeartSaVioR thanks for reviewing it, I've merged the master.

@gaborgsomogyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 21, 2019

Test build #102592 has finished for PR 23764 at commit e2697bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

It would be ideal to have UT preventing regression, but not sure whether it is easy to craft. Could you have a quick look whether we can add it or not?

@gaborgsomogyi
Copy link
Contributor Author

Before the PR I've checked the possibilities but couldn't come up something which can be added as UT. What I was trying is to set default FS to something which is not file (for instance hdfs) and then check the resolved checkpoint path not changes the schema. This approach fails inside the resolve process. To resolve this somehow the filesystem has to be mocked where I didn't find solution. If you have some idea feel free because testing manually is brittle for long term.

@HeartSaVioR
Copy link
Contributor

OK, I'll play with the PR when I get time. If I can achieve before merging this PR, I'll share the code. IMHO it doesn't need to be a blocker for merging.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102635 has finished for PR 23764 at commit 3e7a0df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 25, 2019

@gaborgsomogyi
Just raised PR on your fork regarding UT. gaborgsomogyi#1
Please review and merge if it makes sense. (The test failed without your patch, and passed with your patch.)
I'm OK to just raise follow-up PR on top of this patch when you would like to let me handle further reviews of my patch. Thanks!

Copy link
Contributor

@arunmahadevan arunmahadevan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want the tmp dir to be local, could just re-use the Utils.createTempDir to create the tmp checkpoint directory instead of a separate logic.

val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
val tempDir = System.getProperty("java.io.tmpdir")
val cpTempDir = new Path("file://" + tempDir + "/temporary-"
+ UUID.randomUUID.toString).toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this handles cleaning up the tmp directory on shutdown which is already handled in Utils.createTempDir. I assume here we want the checkpoint location to be on the local file system and not in HDFS, so why not just re-use the Utils.createTempDir and just prefix "file://" to the canonical path to fix the issues with resolution.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Feb 25, 2019

Utils.createTempDir is not required for the following reasons:

  • The separate logic is already there and works in a generic way but Utils.createTempDir works for local only
  • Not required to create directory 2 times
  • Shutdown hook doesn't help if the process killed with kill -9
  • StreamingQueryManager responsibility is to come up with a directory and send it to StreamExecution
  • StreamExecution responsibility is to create and delete these directories

@gaborgsomogyi
Copy link
Contributor Author

@HeartSaVioR not yet checked all the details of your test but high level rocks. Many thanks investing your time! I think it would be better put it into this PR in order to help committers in the evaluation. Hope I can help you back the same way...

@HeartSaVioR
Copy link
Contributor

@gaborgsomogyi Hey no worries! Happy to help. :)

@arunmahadevan
Copy link
Contributor

@gaborgsomogyi thanks for the clarification, the StreamExecution apparently handles the deletion. However theres a slight behavior change here where earlier Utils.CreateTempDir would handle automatic deletion on shutdown and with the proposed one theres a possibility of the tmp dir being left over on shutdown. However I don't think its a big deal so I am ok with the proposed approach as long as we have validated creating the local dir via HDFS apis (the proposed one) does not have other differences with Utils.CreateTempDir.

Add UT on checking temporary checkpoint creation in local filesystem
@gaborgsomogyi
Copy link
Contributor Author

@HeartSaVioR I've taken over your suggested test and made some minor modifications. Feel free to comment on this as well.

val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.getScheme === "file")
assert(fs.exists(checkpointDir))
assert(MkdirRecordingFileSystem.requests === 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have verifying with checkpointDir!

Btw, if we know the path of checkpointDir, we can record actual path of request in MkdirRecordingFileSystem (like before) and let them just created, and find whether recorded directories have checkpointDir. This would make test still pass when Spark will be changed to create multiple directories including checkpointDir.

This change may disable the impact of fs.exist(checkpointDir), but we can determine which filesystem creates that directory so IMHO it's not a big deal.


class StreamingCheckpointSuite extends SparkFunSuite with LocalSparkSession {

test("temp checkpoint dir should stay local even if default filesystem is not local") {
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess having SPARK-26825 as prefix helps to the future readers why the change was necessary. Please ignore if test name is too long to add it.

@jose-torres
Copy link
Contributor

Do queries actually break without this change, or is the checkpoint just located in a different place? I think I'm missing context, because it seems correct for a temporary checkpoint to use the default filesystem instead of file:// - after all, it is the default.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 27, 2019

Hmm... I might be wrong, I had to take a deep look at origin issue. I agree with @jose-torres statement.

We may need to re-analyze origin issue SPARK-26825 again - especially why file/directory creation fails in cluster mode.

If temporary directory is correctly given, as it is running on YARN container, I would suspect why it fails to make a change on temporary directory.

*Cluster mode:*
java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/
createTempDir(namePrefix = s"temporary") => /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_000001/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadata
org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
scala> val fs = checkpointPath.getFileSystem(spark.sessionState.newHadoopConf())
fs: org.apache.hadoop.fs.FileSystem = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_632752661_1, ugi=root (auth:SIMPLE)]]
scala> checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
res1: String = hdfs://ns1/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e

Could we find any weird thing here?

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102799 has finished for PR 23764 at commit 54f86b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Feb 27, 2019

Do queries actually break without this change, or is the checkpoint just located in a different place?

Yes, it fails to start. Additionally having an implementation where

  • One part of the checkpoint is on local FS and the other part is on default FS (which is HDFS in this case). Users are checking the checkpoint data if the query fails which is in 2 different locations now.
  • Assuming that locally created temp directory is writable on HDFS
  • Switching filesystem all of a sudden

is wrong in general.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 27, 2019

I'm not 100% sure which is right, but if we have to force creating temp checkpoint directory in local filesystem, we may need to know available temp directory of local filesystem instead of HDFS path what YARN provides us, because I'm not sure below directory can be created in local filesystem by account running Spark (this is picked up from tmp dir in cluster mode):

/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/

@gaborgsomogyi
Copy link
Contributor Author

What do you think @jose-torres ?

@gaborgsomogyi
Copy link
Contributor Author

@HeartSaVioR usercache is local all the time on each node manager host, so creating it on HDFS is just bad.

@gaborgsomogyi
Copy link
Contributor Author

If the suggestion is to create a temp dir on default FS (HDFS for instance) then a mechanism should be suggested to find a proper temp directory which I'm not aware of.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 7, 2019

My bad. Now I realized the temporary directory YARN provides for container is local filesystem, not HDFS. If my understanding is right, then creating the path in HDFS as it is sounds me as a bug and the patch fixes it.

LGTM on the patch.

If the suggestion is to create a temp dir on default FS (HDFS for instance) then a mechanism should be suggested to find a proper temp directory which I'm not aware of.

Same understanding as of now.

@gaborgsomogyi
Copy link
Contributor Author

cc @zsxwing since Jose not responding.

@jose-torres
Copy link
Contributor

My concern is that this change could break other scenarios that currently work. For example, I don't think stateful operations will work properly with a local FS checkpoint; executors won't be able to see the same checkpoint data.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 12, 2019

We're only dealing with temporary checkpoint here, so while I can see what @jose-torres mentioned, it is unlikely to happen in happy case and I haven't seen cache miss from running query. That could still happen in bad case like if some executors are becoming bad.

Btw, may be better to document somewhere that the checkpoint should be accessible from executors if query is stateful? I guess that's only thing where non-driver reads and writes into checkpoint directory, but I might be missing something. Maybe better to also mention the statement means end users should point to remote storage unless they run all executors as well as driver in local machine.

@gaborgsomogyi
Copy link
Contributor Author

My concern is that this change could break other scenarios that currently work. For example, I don't think stateful operations will work properly with a local FS checkpoint; executors won't be able to see the same checkpoint data.

@jose-torres Thank you for your time and good point. Not considered this scenario and agree that it may happen when executor becoming bad. I've double checked the sinks where temporary checkpoints are created and none of them guarantee fault tolerance. This change still fulfills this.

On the other hand if you still has concerns we can change the approach to create the temp checkpoint on default FS but then I would document that the directory configured by java.io.tmpdir system variable should be writable on the default filesystem. Please share your opinion.

@HeartSaVioR I agree with you and I would write out a warning when checkpoint (not just temp) is created on local FS (a good example is what Jose mentioned).

@gaborgsomogyi
Copy link
Contributor Author

@zsxwing what do you think?

@gaborgsomogyi
Copy link
Contributor Author

ping @jose-torres

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@gaborgsomogyi
Copy link
Contributor Author

Having another view on this I think this has to be solved in a different way so closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants