-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26825][SS] Fix temp checkpoint creation in cluster mode when default filesystem is not local. #23764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26825][SS] Fix temp checkpoint creation in cluster mode when default filesystem is not local. #23764
Changes from all commits
9d40037
e2697bf
3e7a0df
eccf55a
c6a283d
54f86b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution.streaming | ||
|
||
import java.net.URI | ||
|
||
import org.apache.hadoop.fs.{LocalFileSystem, Path, RawLocalFileSystem} | ||
import org.apache.hadoop.fs.permission.FsPermission | ||
|
||
import org.apache.spark.{SparkConf, SparkFunSuite} | ||
import org.apache.spark.sql.{Encoder, LocalSparkSession, SparkSession, SQLContext} | ||
|
||
class StreamingCheckpointSuite extends SparkFunSuite with LocalSparkSession { | ||
|
||
test("temp checkpoint dir should stay local even if default filesystem is not local") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess having SPARK-26825 as prefix helps to the future readers why the change was necessary. Please ignore if test name is too long to add it. |
||
val conf = new SparkConf() | ||
.set("spark.hadoop.fs.file.impl", classOf[LocalFileSystem].getName) | ||
.set("spark.hadoop.fs.mockfs.impl", classOf[MkdirRecordingFileSystem].getName) | ||
.set("spark.hadoop.fs.defaultFS", "mockfs:///") | ||
|
||
spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate() | ||
|
||
implicit val intEncoder: Encoder[Int] = spark.implicits.newIntEncoder | ||
implicit val sqlContext: SQLContext = spark.sqlContext | ||
|
||
MkdirRecordingFileSystem.reset() | ||
val query = MemoryStream[Int].toDF().writeStream.format("console").start() | ||
try { | ||
val checkpointDir = new Path( | ||
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) | ||
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) | ||
assert(fs.getScheme === "file") | ||
assert(fs.exists(checkpointDir)) | ||
assert(MkdirRecordingFileSystem.requests === 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice to have verifying with checkpointDir! Btw, if we know the path of checkpointDir, we can record actual path of request in MkdirRecordingFileSystem (like before) and let them just created, and find whether recorded directories have checkpointDir. This would make test still pass when Spark will be changed to create multiple directories including checkpointDir. This change may disable the impact of |
||
"Unexpected mkdir happens in mocked filesystem") | ||
} finally { | ||
query.stop() | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* FileSystem to record requests for mkdir. | ||
* All tests relying on this should call reset() first. | ||
*/ | ||
class MkdirRecordingFileSystem extends RawLocalFileSystem { | ||
override def getScheme: String = "mockfs" | ||
|
||
override def getUri: URI = URI.create(s"$getScheme:///") | ||
|
||
override def mkdirs(f: Path): Boolean = mkdirs(f, null) | ||
|
||
override def mkdirs(f: Path, permission: FsPermission): Boolean = { | ||
MkdirRecordingFileSystem.requests += 1 | ||
true | ||
} | ||
} | ||
|
||
object MkdirRecordingFileSystem { | ||
var requests = 0 | ||
|
||
def reset(): Unit = requests = 0 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've intentionally not used
toUri
here because of #23733.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think this handles cleaning up the tmp directory on shutdown which is already handled in
Utils.createTempDir
. I assume here we want the checkpoint location to be on the local file system and not in HDFS, so why not just re-use the Utils.createTempDir and just prefix "file://" to the canonical path to fix the issues with resolution.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with @arunmahadevan for leveraging
Utils.createTempDir
, butfile://
should be added as prefix ofroot
parameter toUtils.createTempDir
to prevent choosing default schema of filesystem and finally shutdown hook to delete directory from wrong filesystem.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure adding "file://" prefix to root parameter would cause the canonical path to return the final path with the same prefix. The attempted resolution with HDFS Filesystem happens later so we may be ok to just prepend the "file://" to canonical path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That actually sounds serious to me and even sounds one reason to avoid.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utils.createTempDir
would work but an optional fix not to do it twice. The main problem is filesystem changes all of a sudden when default fs is not local. Please see the added test.I know it can happen rarely but since it's discovered and we know the solution it's better to prepare the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean here (agree that manual URI creation is not the most appropriate way but considering the mentioned URI resolution this seemed the most reasonable way).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There have been bugs in the past caused by the host part (the "//") being included in file URIs. It's generally better to not add it since I don't believe it's even part of the respective RFC (although I didn't bother to go look).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point and agree. Let's wait to see the approach and together with that I'll modify this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaborgsomogyi , me confused , how to overwrite this Utils.createTempDir in spark-streaming code which uses spark-sql.2.4.1 version ?