-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars #23733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars #23733
Changes from all commits
679dca2
3b357e5
3c5852a
3f55c47
7c62458
ca7bbed
8f408f0
4832ce5
6d7d1d8
2b82f3f
91542ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.streaming | |
import scala.util.control.NonFatal | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.Path | ||
import org.apache.hadoop.fs.{FileSystem, Path} | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.internal.io.FileCommitProtocol | ||
import org.apache.spark.sql.{DataFrame, SparkSession} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter} | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.util.SerializableConfiguration | ||
|
||
object FileStreamSink extends Logging { | ||
|
@@ -37,23 +39,54 @@ object FileStreamSink extends Logging { | |
* Returns true if there is a single path that has a metadata log indicating which files should | ||
* be read. | ||
*/ | ||
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = { | ||
def hasMetadata(path: Seq[String], hadoopConf: Configuration, sqlConf: SQLConf): Boolean = { | ||
path match { | ||
case Seq(singlePath) => | ||
val hdfsPath = new Path(singlePath) | ||
val fs = hdfsPath.getFileSystem(hadoopConf) | ||
if (fs.isDirectory(hdfsPath)) { | ||
val metadataPath = new Path(hdfsPath, metadataDir) | ||
checkEscapedMetadataPath(fs, metadataPath, sqlConf) | ||
fs.exists(metadataPath) | ||
} else { | ||
false | ||
} | ||
case _ => false | ||
} | ||
} | ||
|
||
def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = { | ||
if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) | ||
&& StreamExecution.containsSpecialCharsInPath(metadataPath)) { | ||
val legacyMetadataPath = new Path(metadataPath.toUri.toString) | ||
val legacyMetadataPathExists = | ||
try { | ||
val hdfsPath = new Path(singlePath) | ||
val fs = hdfsPath.getFileSystem(hadoopConf) | ||
if (fs.isDirectory(hdfsPath)) { | ||
fs.exists(new Path(hdfsPath, metadataDir)) | ||
} else { | ||
false | ||
} | ||
fs.exists(legacyMetadataPath) | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning(s"Error while looking for metadata directory.") | ||
// We may not have access to this directory. Don't fail the query if that happens. | ||
logWarning(e.getMessage, e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means the directory may exist but some other (maybe intermittent) exception other than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's correct. The reason I chose to ignore the error here is we may not have the permission to check the directory. For example, if I have some special chars in my user name such as This is different than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good edge case which I've not considered. I think user with name
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gaborgsomogyi Making this check best effort is safer. I don't want to force people that don't hit this issue to use the config until we remove this check in future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gaborgsomogyi By the way, the user name may be weird. Another more possible permission error case is an admin can set a S3 bucket to allow Spark accessing only paths matching some specified patterns, and a Spark user may not be able to change this. |
||
false | ||
} | ||
case _ => false | ||
if (legacyMetadataPathExists) { | ||
throw new SparkException( | ||
s"""Error: we detected a possible problem with the location of your "_spark_metadata" | ||
|directory and you likely need to move it before restarting this query. | ||
| | ||
|Earlier version of Spark incorrectly escaped paths when writing out the | ||
|"_spark_metadata" directory for structured streaming. While this was corrected in | ||
|Spark 3.0, it appears that your query was started using an earlier version that | ||
|incorrectly handled the "_spark_metadata" path. | ||
| | ||
|Correct "_spark_metadata" Directory: $metadataPath | ||
|Incorrect "_spark_metadata" Directory: $legacyMetadataPath | ||
| | ||
|Please move the data from the incorrect directory to the correct one, delete the | ||
|incorrect directory, and then restart this query. If you believe you are receiving | ||
|this message in error, you can disable it with the SQL conf | ||
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.""" | ||
.stripMargin) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -92,11 +125,16 @@ class FileStreamSink( | |
partitionColumnNames: Seq[String], | ||
options: Map[String, String]) extends Sink with Logging { | ||
|
||
private val hadoopConf = sparkSession.sessionState.newHadoopConf() | ||
private val basePath = new Path(path) | ||
private val logPath = new Path(basePath, FileStreamSink.metadataDir) | ||
private val logPath = { | ||
val metadataDir = new Path(basePath, FileStreamSink.metadataDir) | ||
val fs = metadataDir.getFileSystem(hadoopConf) | ||
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) | ||
metadataDir | ||
} | ||
private val fileLog = | ||
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) | ||
private val hadoopConf = sparkSession.sessionState.newHadoopConf() | ||
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) | ||
|
||
private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = { | ||
val serializableHadoopConf = new SerializableConfiguration(hadoopConf) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,8 +89,45 @@ abstract class StreamExecution( | |
val resolvedCheckpointRoot = { | ||
val checkpointPath = new Path(checkpointRoot) | ||
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) | ||
fs.mkdirs(checkpointPath) | ||
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString | ||
if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it worth to wrap this part in a function just like |
||
&& StreamExecution.containsSpecialCharsInPath(checkpointPath)) { | ||
// In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString` | ||
// calls). If this legacy checkpoint path exists, we will throw an error to tell the user how | ||
// to migrate. | ||
val legacyCheckpointDir = | ||
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString | ||
val legacyCheckpointDirExists = | ||
try { | ||
fs.exists(new Path(legacyCheckpointDir)) | ||
} catch { | ||
case NonFatal(e) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question here. |
||
// We may not have access to this directory. Don't fail the query if that happens. | ||
logWarning(e.getMessage, e) | ||
false | ||
} | ||
if (legacyCheckpointDirExists) { | ||
throw new SparkException( | ||
s"""Error: we detected a possible problem with the location of your checkpoint and you | ||
|likely need to move it before restarting this query. | ||
| | ||
|Earlier version of Spark incorrectly escaped paths when writing out checkpoints for | ||
|structured streaming. While this was corrected in Spark 3.0, it appears that your | ||
|query was started using an earlier version that incorrectly handled the checkpoint | ||
|path. | ||
| | ||
|Correct Checkpoint Directory: $checkpointPath | ||
|Incorrect Checkpoint Directory: $legacyCheckpointDir | ||
| | ||
|Please move the data from the incorrect directory to the correct one, delete the | ||
|incorrect directory, and then restart this query. If you believe you are receiving | ||
|this message in error, you can disable it with the SQL conf | ||
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.""" | ||
.stripMargin) | ||
} | ||
} | ||
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) | ||
fs.mkdirs(checkpointDir) | ||
checkpointDir.toString | ||
} | ||
|
||
def logicalPlan: LogicalPlan | ||
|
@@ -225,7 +262,7 @@ abstract class StreamExecution( | |
|
||
/** Returns the path of a file with `name` in the checkpoint directory. */ | ||
protected def checkpointFile(name: String): String = | ||
new Path(new Path(resolvedCheckpointRoot), name).toUri.toString | ||
new Path(new Path(resolvedCheckpointRoot), name).toString | ||
|
||
/** | ||
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] | ||
|
@@ -568,6 +605,11 @@ object StreamExecution { | |
case _ => | ||
false | ||
} | ||
|
||
/** Whether the path contains special chars that will be escaped when converting to a `URI`. */ | ||
def containsSpecialCharsInPath(path: Path): Boolean = { | ||
path.toUri.getPath != new Path(path.toUri.toString).toUri.getPath | ||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
v1 | ||
{"nextBatchWatermarkMs":0} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
{"id":"09be7fb3-49d8-48a6-840d-e9c2ad92a898"} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
v1 | ||
{"batchWatermarkMs":0,"batchTimestampMs":1549649384149,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} | ||
0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
v1 | ||
{"path":"file://TEMPDIR/output%20%25@%23output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet","size":404,"isDir":false,"modificationTime":1549649385000,"blockReplication":1,"blockSize":33554432,"action":"add"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed
try-catch
fromfs.exists(metadataPath)
. We should not ignore random errors when failing to accessmetadataPath
as it may make Spark ignore_spark_metadata
and return wrong answers.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1