-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars #23733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @brkyvz |
Test build #102038 has finished for PR 23733 at commit
|
Marking this WIP as I'm also working on improving errors to include the migration instructions for people who hit this issue. |
Test build #102206 has finished for PR 23733 at commit
|
@@ -0,0 +1,2 @@ | |||
v1 | |||
{"path":"file://TEMPDIR/output%20%25@%23output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet","size":404,"isDir":false,"modificationTime":1549649385000,"blockReplication":1,"blockSize":33554432,"action":"add"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TEMPDIR
in this file will be replaced with the real test path in the unit test.
if (fs.isDirectory(hdfsPath)) { | ||
val metadataPath = new Path(hdfsPath, metadataDir) | ||
checkEscapedMetadataPath(fs, metadataPath, sqlConf) | ||
fs.exists(metadataPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed try-catch
from fs.exists(metadataPath)
. We should not ignore random errors when failing to access metadataPath
as it may make Spark ignore _spark_metadata
and return wrong answers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Test build #102215 has finished for PR 23733 at commit
|
Test build #102209 has finished for PR 23733 at commit
|
Test build #102213 has finished for PR 23733 at commit
|
Test build #102214 has finished for PR 23733 at commit
|
Test build #102217 has finished for PR 23733 at commit
|
Test build #102218 has finished for PR 23733 at commit
|
Test build #102220 has finished for PR 23733 at commit
|
} catch { | ||
case NonFatal(e) => | ||
logWarning(s"Error while looking for metadata directory.") | ||
// We may not have access to this directory. Don't fail the query if that happens. | ||
logWarning(e.getMessage, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means the directory may exist but some other (maybe intermittent) exception other than FileNotFoundException
came isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.
The reason I chose to ignore the error here is we may not have the permission to check the directory. For example, if I have some special chars in my user name such as foo bar
. Then I try to write into my home directory /user/foo bar/a/b/c
in Spark 3.0.0, it's likely I don't have access to /user/foo%20bar/a/b/c
since that's in a different user home directory. Since checking the directory should be best effort and should not impact any users that don't hit this path issue, I prefer to ignore the error for safety.
This is different than try-catch
in hasMetadata
which is checking a sub directory in the current directory. In this case, the directory is usually accessible and the error is probably a real issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good edge case which I've not considered. I think user with name foo%20bar
or something similar which contains escaped URI parts is junk which may highlight error in some application code. Wouldn't it be better to let the user decide what to do? For example:
- delete/move the dir
- add right to read
- ignore the escaped path check with the added config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaborgsomogyi Making this check best effort is safer. I don't want to force people that don't hit this issue to use the config until we remove this check in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaborgsomogyi By the way, the user name may be weird. Another more possible permission error case is an admin can set a S3 bucket to allow Spark accessing only paths matching some specified patterns, and a Spark user may not be able to change this.
@@ -89,8 +89,38 @@ abstract class StreamExecution( | |||
val resolvedCheckpointRoot = { | |||
val checkpointPath = new Path(checkpointRoot) | |||
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) | |||
fs.mkdirs(checkpointPath) | |||
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString | |||
if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it worth to wrap this part in a function just like FileStreamSink.checkEscapedMetadataPath
?
try { | ||
fs.exists(new Path(legacyCheckpointDir)) | ||
} catch { | ||
case NonFatal(e) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here.
} | ||
} | ||
|
||
withTempDir { tempDir => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to make withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key -> "true")
explicit (I know that's the default now)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicitly setting a config in a test is usually because we may change the default value in future and we don't want to break any tests.
However, for this special config, I prefer to not set it in test to make sure the default behavior is correct. If we decide to turn if off by default in future, we probably will just remove this check instead. I don't see a reason to turn this off by default and ask the user to turn it on in some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good argument and I agree.
if (fs.isDirectory(hdfsPath)) { | ||
val metadataPath = new Path(hdfsPath, metadataDir) | ||
checkEscapedMetadataPath(fs, metadataPath, sqlConf) | ||
fs.exists(metadataPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
This is great, but I found the error a bit hard to read. What do you think about something like:
|
Test build #102516 has started for PR 23733 at commit |
withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key -> "false") { | ||
// Verify that the batch query ignores the legacy "_spark_metadata" | ||
val df = spark.read.load(outputDir.getCanonicalPath) | ||
assert(!(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are amazing tests 🥇 💯
// path issue. | ||
"file://foo:b ar@bar/foo/bar", | ||
"file://foo:bar@b ar/foo/bar", | ||
"file://f oo:bar@bar/foo/bar").foreach { p => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the LocalFileSystem take an authority? Do we need to use a different file system?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brkyvz The APIs called in containsSpecialCharsInPath
don't involve FileSystem
s. The scheme file
here can be replaced with random strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is amazing. LGTM!
Thanks! Merging to master. |
retest this please. Triggering a new build before merging. |
Test build #4560 has finished for PR 23733 at commit
|
…n it contains special chars ## What changes were proposed in this pull request? When a user specifies a checkpoint location or a file sink output using a path containing special chars that need to be escaped in a path, the streaming query will store checkpoint and file sink metadata in a wrong place. In this PR, I uploaded a checkpoint that was generated by the following codes using Spark 2.4.0 to show this issue: ``` implicit val s = spark.sqlContext val input = org.apache.spark.sql.execution.streaming.MemoryStream[Int] input.addData(1, 2, 3) val q = input.toDF.writeStream.format("parquet").option("checkpointLocation", ".../chk %#chk").start(".../output %#output") q.stop() ``` Here is the structure of the directory: ``` sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0 ├── chk%252520%252525%252523chk │ ├── commits │ │ └── 0 │ ├── metadata │ └── offsets │ └── 0 ├── output %#output │ └── part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet └── output%20%25%23output └── _spark_metadata └── 0 ``` In this checkpoint, the user specified checkpoint location is `.../chk %#chk` but the real path to store the checkpoint is `.../chk%252520%252525%252523chk` (this is generated by escaping the original path three times). The user specified output path is `.../output %#output` but the path to store `_spark_metadata` is `.../output%20%25%23output/_spark_metadata` (this is generated by escaping the original path once). The data files are still in the correct path (such as `.../output %#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet`). This checkpoint will be used in unit tests in this PR. The fix is just simply removing improper `Path.toUri` calls to fix the issue. However, as the user may not read the release note and is not aware of this checkpoint location change, if they upgrade Spark without moving checkpoint to the new location, their query will just start from the scratch. In order to not surprise the users, this PR also adds a check to **detect the impacted paths and throws an error** to include the migration guide. This check can be turned off by an internal sql conf `spark.sql.streaming.checkpoint.escapedPathCheck.enabled`. Here are examples of errors that will be reported: - Streaming checkpoint error: ``` Error: we detected a possible problem with the location of your checkpoint and you likely need to move it before restarting this query. Earlier version of Spark incorrectly escaped paths when writing out checkpoints for structured streaming. While this was corrected in Spark 3.0, it appears that your query was started using an earlier version that incorrectly handled the checkpoint path. Correct Checkpoint Directory: /.../chk %#chk Incorrect Checkpoint Directory: /.../chk%252520%252525%252523chk Please move the data from the incorrect directory to the correct one, delete the incorrect directory, and then restart this query. If you believe you are receiving this message in error, you can disable it with the SQL conf spark.sql.streaming.checkpoint.escapedPathCheck.enabled. ``` - File sink error (`_spark_metadata`): ``` Error: we detected a possible problem with the location of your "_spark_metadata" directory and you likely need to move it before restarting this query. Earlier version of Spark incorrectly escaped paths when writing out the "_spark_metadata" directory for structured streaming. While this was corrected in Spark 3.0, it appears that your query was started using an earlier version that incorrectly handled the "_spark_metadata" path. Correct "_spark_metadata" Directory: /.../output %#output/_spark_metadata Incorrect "_spark_metadata" Directory: /.../output%20%25%23output/_spark_metadata Please move the data from the incorrect directory to the correct one, delete the incorrect directory, and then restart this query. If you believe you are receiving this message in error, you can disable it with the SQL conf spark.sql.streaming.checkpoint.escapedPathCheck.enabled. ``` ## How was this patch tested? The new unit tests. Closes apache#23733 from zsxwing/path-fix. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
What changes were proposed in this pull request?
When a user specifies a checkpoint location or a file sink output using a path containing special chars that need to be escaped in a path, the streaming query will store checkpoint and file sink metadata in a wrong place. In this PR, I uploaded a checkpoint that was generated by the following codes using Spark 2.4.0 to show this issue:
Here is the structure of the directory:
In this checkpoint, the user specified checkpoint location is
.../chk %@#chk
but the real path to store the checkpoint is.../chk%252520%252525@%252523chk
(this is generated by escaping the original path three times). The user specified output path is.../output %@#output
but the path to store_spark_metadata
is.../output%20%25@%23output/_spark_metadata
(this is generated by escaping the original path once). The data files are still in the correct path (such as.../output %@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet
).This checkpoint will be used in unit tests in this PR.
The fix is just simply removing improper
Path.toUri
calls to fix the issue.However, as the user may not read the release note and is not aware of this checkpoint location change, if they upgrade Spark without moving checkpoint to the new location, their query will just start from the scratch. In order to not surprise the users, this PR also adds a check to detect the impacted paths and throws an error to include the migration guide. This check can be turned off by an internal sql conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled
. Here are examples of errors that will be reported:_spark_metadata
):How was this patch tested?
The new unit tests.