Skip to content

[SPARK-26825][SS] Fix temp checkpoint creation in cluster mode when default filesystem is not local. #23764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ abstract class StreamExecution(
}
}
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.mkdirs(checkpointDir)
if (!fs.isDirectory(checkpointDir) && !fs.mkdirs(checkpointDir)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDir")
}
checkpointDir.toString
}
logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}.getOrElse {
if (useTempCheckpointLocation) {
deleteCheckpointOnStop = true
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
val tempDir = System.getProperty("java.io.tmpdir")
val cpTempDir = new Path("file://" + tempDir + "/temporary-"
+ UUID.randomUUID.toString).toString
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've intentionally not used toUri here because of #23733.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this handles cleaning up the tmp directory on shutdown which is already handled in Utils.createTempDir. I assume here we want the checkpoint location to be on the local file system and not in HDFS, so why not just re-use the Utils.createTempDir and just prefix "file://" to the canonical path to fix the issues with resolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @arunmahadevan for leveraging Utils.createTempDir, but file:// should be added as prefix of root parameter to Utils.createTempDir to prevent choosing default schema of filesystem and finally shutdown hook to delete directory from wrong filesystem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure adding "file://" prefix to root parameter would cause the canonical path to return the final path with the same prefix. The attempted resolution with HDFS Filesystem happens later so we may be ok to just prepend the "file://" to canonical path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure adding "file://" prefix to root parameter would cause the canonical path to return the final path with the same prefix.

That actually sounds serious to me and even sounds one reason to avoid.

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why Utils.createTempDir doesn't work here? What is the problem with mkdir being called twice?

Utils.createTempDir would work but an optional fix not to do it twice. The main problem is filesystem changes all of a sudden when default fs is not local. Please see the added test.

Also I'm not sure I understand why you're not using .toUri.

scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path

scala> val f = new java.io.File("/tmp/chk %#chk")
f: java.io.File = /tmp/chk %#chk

scala> new Path(new Path(f.toURI()), "foo").toString
res0: String = file:/tmp/chk %#chk/foo

scala> new Path(new Path(f.toURI()), "foo").toUri.toString
res1: String = file:/tmp/chk%20%25%23chk/foo

I know it can happen rarely but since it's discovered and we know the solution it's better to prepare the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the "file:" scheme doesn't generally like "//".

Not sure what you mean here (agree that manual URI creation is not the most appropriate way but considering the mentioned URI resolution this seemed the most reasonable way).

scala> val cpTempDir = new Path("file://" + tempDir + "/temporary1")
cpTempDir: org.apache.hadoop.fs.Path = file:/var/folders/t_/w90m85fn2gjb1v0c24wrfrxm0000gp/T/temporary1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There have been bugs in the past caused by the host part (the "//") being included in file URIs. It's generally better to not add it since I don't believe it's even part of the respective RFC (although I didn't bother to go look).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point and agree. Let's wait to see the approach and together with that I'll modify this as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi , me confused , how to overwrite this Utils.createTempDir in spark-streaming code which uses spark-sql.2.4.1 version ?

logWarning("Temporary checkpoint location created which is deleted normally when" +
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
s" the query didn't fail: $cpTempDir. If it's required to delete it under any" +
s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
s" true. Important to know deleting temp checkpoint folder is best effort.")
tempDir
cpTempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.net.URI

import org.apache.hadoop.fs.{LocalFileSystem, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.sql.{Encoder, LocalSparkSession, SparkSession, SQLContext}

class StreamingCheckpointSuite extends SparkFunSuite with LocalSparkSession {

test("temp checkpoint dir should stay local even if default filesystem is not local") {
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess having SPARK-26825 as prefix helps to the future readers why the change was necessary. Please ignore if test name is too long to add it.

val conf = new SparkConf()
.set("spark.hadoop.fs.file.impl", classOf[LocalFileSystem].getName)
.set("spark.hadoop.fs.mockfs.impl", classOf[MkdirRecordingFileSystem].getName)
.set("spark.hadoop.fs.defaultFS", "mockfs:///")

spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()

implicit val intEncoder: Encoder[Int] = spark.implicits.newIntEncoder
implicit val sqlContext: SQLContext = spark.sqlContext

MkdirRecordingFileSystem.reset()
val query = MemoryStream[Int].toDF().writeStream.format("console").start()
try {
val checkpointDir = new Path(
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.getScheme === "file")
assert(fs.exists(checkpointDir))
assert(MkdirRecordingFileSystem.requests === 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have verifying with checkpointDir!

Btw, if we know the path of checkpointDir, we can record actual path of request in MkdirRecordingFileSystem (like before) and let them just created, and find whether recorded directories have checkpointDir. This would make test still pass when Spark will be changed to create multiple directories including checkpointDir.

This change may disable the impact of fs.exist(checkpointDir), but we can determine which filesystem creates that directory so IMHO it's not a big deal.

"Unexpected mkdir happens in mocked filesystem")
} finally {
query.stop()
}
}
}

/**
* FileSystem to record requests for mkdir.
* All tests relying on this should call reset() first.
*/
class MkdirRecordingFileSystem extends RawLocalFileSystem {
override def getScheme: String = "mockfs"

override def getUri: URI = URI.create(s"$getScheme:///")

override def mkdirs(f: Path): Boolean = mkdirs(f, null)

override def mkdirs(f: Path, permission: FsPermission): Boolean = {
MkdirRecordingFileSystem.requests += 1
true
}
}

object MkdirRecordingFileSystem {
var requests = 0

def reset(): Unit = requests = 0
}