Skip to content

[SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars #23733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,14 @@ object SQLConf {
.internal()
.stringConf

val STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED =
buildConf("spark.sql.streaming.checkpoint.escapedPathCheck.enabled")
.doc("Whether to detect a streaming query may pick up an incorrect checkpoint path due " +
"to SPARK-26824.")
.internal()
.booleanConf
.createWithDefault(true)

val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION =
buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ case class DataSource(
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
sparkSession.sessionState.newHadoopConf(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.streaming
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration

object FileStreamSink extends Logging {
Expand All @@ -37,23 +39,54 @@ object FileStreamSink extends Logging {
* Returns true if there is a single path that has a metadata log indicating which files should
* be read.
*/
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
def hasMetadata(path: Seq[String], hadoopConf: Configuration, sqlConf: SQLConf): Boolean = {
path match {
case Seq(singlePath) =>
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
val metadataPath = new Path(hdfsPath, metadataDir)
checkEscapedMetadataPath(fs, metadataPath, sqlConf)
fs.exists(metadataPath)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed try-catch from fs.exists(metadataPath). We should not ignore random errors when failing to access metadataPath as it may make Spark ignore _spark_metadata and return wrong answers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

} else {
false
}
case _ => false
}
}

def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = {
if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
&& StreamExecution.containsSpecialCharsInPath(metadataPath)) {
val legacyMetadataPath = new Path(metadataPath.toUri.toString)
val legacyMetadataPathExists =
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
fs.exists(new Path(hdfsPath, metadataDir))
} else {
false
}
fs.exists(legacyMetadataPath)
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
// We may not have access to this directory. Don't fail the query if that happens.
logWarning(e.getMessage, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the directory may exist but some other (maybe intermittent) exception other than FileNotFoundException came isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

The reason I chose to ignore the error here is we may not have the permission to check the directory. For example, if I have some special chars in my user name such as foo bar. Then I try to write into my home directory /user/foo bar/a/b/c in Spark 3.0.0, it's likely I don't have access to /user/foo%20bar/a/b/c since that's in a different user home directory. Since checking the directory should be best effort and should not impact any users that don't hit this path issue, I prefer to ignore the error for safety.

This is different than try-catch in hasMetadata which is checking a sub directory in the current directory. In this case, the directory is usually accessible and the error is probably a real issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good edge case which I've not considered. I think user with name foo%20bar or something similar which contains escaped URI parts is junk which may highlight error in some application code. Wouldn't it be better to let the user decide what to do? For example:

  • delete/move the dir
  • add right to read
  • ignore the escaped path check with the added config

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi Making this check best effort is safer. I don't want to force people that don't hit this issue to use the config until we remove this check in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi By the way, the user name may be weird. Another more possible permission error case is an admin can set a S3 bucket to allow Spark accessing only paths matching some specified patterns, and a Spark user may not be able to change this.

false
}
case _ => false
if (legacyMetadataPathExists) {
throw new SparkException(
s"""Error: we detected a possible problem with the location of your "_spark_metadata"
|directory and you likely need to move it before restarting this query.
|
|Earlier version of Spark incorrectly escaped paths when writing out the
|"_spark_metadata" directory for structured streaming. While this was corrected in
|Spark 3.0, it appears that your query was started using an earlier version that
|incorrectly handled the "_spark_metadata" path.
|
|Correct "_spark_metadata" Directory: $metadataPath
|Incorrect "_spark_metadata" Directory: $legacyMetadataPath
|
|Please move the data from the incorrect directory to the correct one, delete the
|incorrect directory, and then restart this query. If you believe you are receiving
|this message in error, you can disable it with the SQL conf
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}."""
.stripMargin)
}
}
}

Expand Down Expand Up @@ -92,11 +125,16 @@ class FileStreamSink(
partitionColumnNames: Seq[String],
options: Map[String, String]) extends Sink with Logging {

private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
private val logPath = {
val metadataDir = new Path(basePath, FileStreamSink.metadataDir)
val fs = metadataDir.getFileSystem(hadoopConf)
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
metadataDir
}
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)

private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class FileStreamSource(
var allFiles: Seq[FileStatus] = null
sourceHasMetadata match {
case None =>
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
sourceHasMetadata = Some(true)
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
Expand All @@ -220,7 +220,7 @@ class FileStreamSource(
// double check whether source has metadata, preventing the extreme corner case that
// metadata log and data files are only generated after the previous
// `FileStreamSink.hasMetadata` check
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
sourceHasMetadata = Some(true)
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@ class MetadataLogFileIndex(
userSpecifiedSchema: Option[StructType])
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) {

private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
private val metadataDirectory = {
val metadataDir = new Path(path, FileStreamSink.metadataDir)
val fs = metadataDir.getFileSystem(sparkSession.sessionState.newHadoopConf())
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
metadataDir
}

logInfo(s"Reading streaming file log from $metadataDirectory")
private val metadataLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString)
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toString)
private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
private var cachedPartitionSpec: PartitionSpec = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,45 @@ abstract class StreamExecution(
val resolvedCheckpointRoot = {
val checkpointPath = new Path(checkpointRoot)
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.mkdirs(checkpointPath)
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it worth to wrap this part in a function just like FileStreamSink.checkEscapedMetadataPath?

&& StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
// In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString`
// calls). If this legacy checkpoint path exists, we will throw an error to tell the user how
// to migrate.
val legacyCheckpointDir =
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
val legacyCheckpointDirExists =
try {
fs.exists(new Path(legacyCheckpointDir))
} catch {
case NonFatal(e) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.

// We may not have access to this directory. Don't fail the query if that happens.
logWarning(e.getMessage, e)
false
}
if (legacyCheckpointDirExists) {
throw new SparkException(
s"""Error: we detected a possible problem with the location of your checkpoint and you
|likely need to move it before restarting this query.
|
|Earlier version of Spark incorrectly escaped paths when writing out checkpoints for
|structured streaming. While this was corrected in Spark 3.0, it appears that your
|query was started using an earlier version that incorrectly handled the checkpoint
|path.
|
|Correct Checkpoint Directory: $checkpointPath
|Incorrect Checkpoint Directory: $legacyCheckpointDir
|
|Please move the data from the incorrect directory to the correct one, delete the
|incorrect directory, and then restart this query. If you believe you are receiving
|this message in error, you can disable it with the SQL conf
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}."""
.stripMargin)
}
}
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.mkdirs(checkpointDir)
checkpointDir.toString
}

def logicalPlan: LogicalPlan
Expand Down Expand Up @@ -225,7 +262,7 @@ abstract class StreamExecution(

/** Returns the path of a file with `name` in the checkpoint directory. */
protected def checkpointFile(name: String): String =
new Path(new Path(resolvedCheckpointRoot), name).toUri.toString
new Path(new Path(resolvedCheckpointRoot), name).toString

/**
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
Expand Down Expand Up @@ -568,6 +605,11 @@ object StreamExecution {
case _ =>
false
}

/** Whether the path contains special chars that will be escaped when converting to a `URI`. */
def containsSpecialCharsInPath(path: Path): Boolean = {
path.toUri.getPath != new Path(path.toUri.toString).toUri.getPath
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
triggerClock: Clock): StreamingQueryWrapper = {
var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
new Path(userSpecified).toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"09be7fb3-49d8-48a6-840d-e9c2ad92a898"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1549649384149,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
0
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"path":"file://TEMPDIR/output%20%25@%23output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet","size":404,"isDir":false,"modificationTime":1549649385000,"blockReplication":1,"blockSize":33554432,"action":"add"}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEMPDIR in this file will be replaced with the real test path in the unit test.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.streaming

import java.io.File
import java.util.Locale

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -454,4 +455,27 @@ class FileStreamSinkSuite extends StreamTest {
}
}
}

test("special characters in output path") {
withTempDir { tempDir =>
val checkpointDir = new File(tempDir, "chk")
val outputDir = new File(tempDir, "output @#output")
val inputData = MemoryStream[Int]
inputData.addData(1, 2, 3)
val q = inputData.toDF()
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("parquet")
.start(outputDir.getCanonicalPath)
try {
q.processAllAvailable()
} finally {
q.stop()
}
// The "_spark_metadata" directory should be in "outputDir"
assert(outputDir.listFiles.map(_.getName).contains(FileStreamSink.metadataDir))
val outputDf = spark.read.parquet(outputDir.getCanonicalPath).as[Int]
checkDatasetUnorderly(outputDf, 1, 2, 3)
}
}
}
Loading