Skip to content

[SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars #23733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Feb 4, 2019

What changes were proposed in this pull request?

When a user specifies a checkpoint location or a file sink output using a path containing special chars that need to be escaped in a path, the streaming query will store checkpoint and file sink metadata in a wrong place. In this PR, I uploaded a checkpoint that was generated by the following codes using Spark 2.4.0 to show this issue:

implicit val s = spark.sqlContext
val input = org.apache.spark.sql.execution.streaming.MemoryStream[Int]
input.addData(1, 2, 3)
val q = input.toDF.writeStream.format("parquet").option("checkpointLocation", ".../chk %@#chk").start(".../output %@#output")
q.stop()

Here is the structure of the directory:

sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0
├── chk%252520%252525@%252523chk
│   ├── commits
│   │   └── 0
│   ├── metadata
│   └── offsets
│       └── 0
├── output %@#output
│   └── part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet
└── output%20%25@%23output
    └── _spark_metadata
        └── 0

In this checkpoint, the user specified checkpoint location is .../chk %@#chk but the real path to store the checkpoint is .../chk%252520%252525@%252523chk (this is generated by escaping the original path three times). The user specified output path is .../output %@#output but the path to store _spark_metadata is .../output%20%25@%23output/_spark_metadata (this is generated by escaping the original path once). The data files are still in the correct path (such as .../output %@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet).

This checkpoint will be used in unit tests in this PR.

The fix is just simply removing improper Path.toUri calls to fix the issue.

However, as the user may not read the release note and is not aware of this checkpoint location change, if they upgrade Spark without moving checkpoint to the new location, their query will just start from the scratch. In order to not surprise the users, this PR also adds a check to detect the impacted paths and throws an error to include the migration guide. This check can be turned off by an internal sql conf spark.sql.streaming.checkpoint.escapedPathCheck.enabled. Here are examples of errors that will be reported:

  • Streaming checkpoint error:
Error: we detected a possible problem with the location of your checkpoint and you
likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out checkpoints for
structured streaming. While this was corrected in Spark 3.0, it appears that your
query was started using an earlier version that incorrectly handled the checkpoint
path.

Correct Checkpoint Directory: /.../chk %@#chk
Incorrect Checkpoint Directory: /.../chk%252520%252525@%252523chk

Please move the data from the incorrect directory to the correct one, delete the
incorrect directory, and then restart this query. If you believe you are receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
  • File sink error (_spark_metadata):
Error: we detected a possible problem with the location of your "_spark_metadata"
directory and you likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out the
"_spark_metadata" directory for structured streaming. While this was corrected in
Spark 3.0, it appears that your query was started using an earlier version that
incorrectly handled the "_spark_metadata" path.

Correct "_spark_metadata" Directory: /.../output %@#output/_spark_metadata
Incorrect "_spark_metadata" Directory: /.../output%20%25@%23output/_spark_metadata

Please move the data from the incorrect directory to the correct one, delete the
incorrect directory, and then restart this query. If you believe you are receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.

How was this patch tested?

The new unit tests.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 4, 2019

cc @brkyvz

@SparkQA
Copy link

SparkQA commented Feb 4, 2019

Test build #102038 has finished for PR 23733 at commit 3b357e5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing zsxwing changed the title [SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars [SPARK-26824][SS][WIP]Fix the checkpoint location and _spark_metadata when it contains special chars Feb 4, 2019
@zsxwing
Copy link
Member Author

zsxwing commented Feb 4, 2019

Marking this WIP as I'm also working on improving errors to include the migration instructions for people who hit this issue.

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102206 has finished for PR 23733 at commit 3c5852a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -0,0 +1,2 @@
v1
{"path":"file://TEMPDIR/output%20%25@%23output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet","size":404,"isDir":false,"modificationTime":1549649385000,"blockReplication":1,"blockSize":33554432,"action":"add"}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEMPDIR in this file will be replaced with the real test path in the unit test.

@zsxwing zsxwing changed the title [SPARK-26824][SS][WIP]Fix the checkpoint location and _spark_metadata when it contains special chars [SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars Feb 11, 2019
if (fs.isDirectory(hdfsPath)) {
val metadataPath = new Path(hdfsPath, metadataDir)
checkEscapedMetadataPath(fs, metadataPath, sqlConf)
fs.exists(metadataPath)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed try-catch from fs.exists(metadataPath). We should not ignore random errors when failing to access metadataPath as it may make Spark ignore _spark_metadata and return wrong answers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102215 has finished for PR 23733 at commit 8f408f0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102209 has finished for PR 23733 at commit 3f55c47.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102213 has finished for PR 23733 at commit 7c62458.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102214 has finished for PR 23733 at commit ca7bbed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102217 has finished for PR 23733 at commit 4832ce5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102218 has finished for PR 23733 at commit 6d7d1d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102220 has finished for PR 23733 at commit 2b82f3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
// We may not have access to this directory. Don't fail the query if that happens.
logWarning(e.getMessage, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the directory may exist but some other (maybe intermittent) exception other than FileNotFoundException came isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

The reason I chose to ignore the error here is we may not have the permission to check the directory. For example, if I have some special chars in my user name such as foo bar. Then I try to write into my home directory /user/foo bar/a/b/c in Spark 3.0.0, it's likely I don't have access to /user/foo%20bar/a/b/c since that's in a different user home directory. Since checking the directory should be best effort and should not impact any users that don't hit this path issue, I prefer to ignore the error for safety.

This is different than try-catch in hasMetadata which is checking a sub directory in the current directory. In this case, the directory is usually accessible and the error is probably a real issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good edge case which I've not considered. I think user with name foo%20bar or something similar which contains escaped URI parts is junk which may highlight error in some application code. Wouldn't it be better to let the user decide what to do? For example:

  • delete/move the dir
  • add right to read
  • ignore the escaped path check with the added config

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi Making this check best effort is safer. I don't want to force people that don't hit this issue to use the config until we remove this check in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi By the way, the user name may be weird. Another more possible permission error case is an admin can set a S3 bucket to allow Spark accessing only paths matching some specified patterns, and a Spark user may not be able to change this.

@@ -89,8 +89,38 @@ abstract class StreamExecution(
val resolvedCheckpointRoot = {
val checkpointPath = new Path(checkpointRoot)
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.mkdirs(checkpointPath)
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it worth to wrap this part in a function just like FileStreamSink.checkEscapedMetadataPath?

try {
fs.exists(new Path(legacyCheckpointDir))
} catch {
case NonFatal(e) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.

}
}

withTempDir { tempDir =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to make withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key -> "true") explicit (I know that's the default now)?

Copy link
Member Author

@zsxwing zsxwing Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly setting a config in a test is usually because we may change the default value in future and we don't want to break any tests.

However, for this special config, I prefer to not set it in test to make sure the default behavior is correct. If we decide to turn if off by default in future, we probably will just remove this check instead. I don't see a reason to turn this off by default and ask the user to turn it on in some cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good argument and I agree.

if (fs.isDirectory(hdfsPath)) {
val metadataPath = new Path(hdfsPath, metadataDir)
checkEscapedMetadataPath(fs, metadataPath, sqlConf)
fs.exists(metadataPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@marmbrus
Copy link
Contributor

This is great, but I found the error a bit hard to read. What do you think about something like:

Error: we detected a possible problem with the location of your checkpoint 
and you likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out 
checkpoints for structured streaming.  While this was corrected in 
Spark 3.0, it appears that your query was started using an earlier 
version that incorrectly handled the checkpoint path.

Correct Checkpoint Directory: ....
Incorrect Checkpoint Directory: ...

Please move the data from the incorrect directory to the the 
correct one and then restart this query. If you believe you are 
receiving this message in error, you can disable it with (...)

@SparkQA
Copy link

SparkQA commented Feb 19, 2019

Test build #102516 has started for PR 23733 at commit 91542ff.

withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key -> "false") {
// Verify that the batch query ignores the legacy "_spark_metadata"
val df = spark.read.load(outputDir.getCanonicalPath)
assert(!(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are amazing tests 🥇 💯

// path issue.
"file://foo:b ar@bar/foo/bar",
"file://foo:bar@b ar/foo/bar",
"file://f oo:bar@bar/foo/bar").foreach { p =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the LocalFileSystem take an authority? Do we need to use a different file system?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz The APIs called in containsSpecialCharsInPath don't involve FileSystems. The scheme file here can be replaced with random strings.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing. LGTM!

@zsxwing
Copy link
Member Author

zsxwing commented Feb 20, 2019

Thanks! Merging to master.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 20, 2019

retest this please.

Triggering a new build before merging.

@SparkQA
Copy link

SparkQA commented Feb 20, 2019

Test build #4560 has finished for PR 23733 at commit 91542ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 77b99af Feb 20, 2019
@zsxwing zsxwing deleted the path-fix branch February 20, 2019 23:48
mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
…n it contains special chars

## What changes were proposed in this pull request?

When a user specifies a checkpoint location or a file sink output using a path containing special chars that need to be escaped in a path, the streaming query will store checkpoint and file sink metadata in a wrong place. In this PR, I uploaded a checkpoint that was generated by the following codes using Spark 2.4.0 to show this issue:

```
implicit val s = spark.sqlContext
val input = org.apache.spark.sql.execution.streaming.MemoryStream[Int]
input.addData(1, 2, 3)
val q = input.toDF.writeStream.format("parquet").option("checkpointLocation", ".../chk %#chk").start(".../output %#output")
q.stop()
```
Here is the structure of the directory:
```
sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0
├── chk%252520%252525%252523chk
│   ├── commits
│   │   └── 0
│   ├── metadata
│   └── offsets
│       └── 0
├── output %#output
│   └── part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet
└── output%20%25%23output
    └── _spark_metadata
        └── 0
```

In this checkpoint, the user specified checkpoint location is `.../chk %#chk` but the real path to store the checkpoint is `.../chk%252520%252525%252523chk` (this is generated by escaping the original path three times). The user specified output path is `.../output %#output` but the path to store `_spark_metadata` is `.../output%20%25%23output/_spark_metadata` (this is generated by escaping the original path once). The data files are still in the correct path (such as `.../output %#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet`).

This checkpoint will be used in unit tests in this PR.

The fix is just simply removing improper `Path.toUri` calls to fix the issue.

However, as the user may not read the release note and is not aware of this checkpoint location change, if they upgrade Spark without moving checkpoint to the new location, their query will just start from the scratch. In order to not surprise the users, this PR also adds a check to **detect the impacted paths and throws an error** to include the migration guide. This check can be turned off by an internal sql conf `spark.sql.streaming.checkpoint.escapedPathCheck.enabled`. Here are examples of errors that will be reported:

- Streaming checkpoint error:
```
Error: we detected a possible problem with the location of your checkpoint and you
likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out checkpoints for
structured streaming. While this was corrected in Spark 3.0, it appears that your
query was started using an earlier version that incorrectly handled the checkpoint
path.

Correct Checkpoint Directory: /.../chk %#chk
Incorrect Checkpoint Directory: /.../chk%252520%252525%252523chk

Please move the data from the incorrect directory to the correct one, delete the
incorrect directory, and then restart this query. If you believe you are receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
```

- File sink error (`_spark_metadata`):
```
Error: we detected a possible problem with the location of your "_spark_metadata"
directory and you likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out the
"_spark_metadata" directory for structured streaming. While this was corrected in
Spark 3.0, it appears that your query was started using an earlier version that
incorrectly handled the "_spark_metadata" path.

Correct "_spark_metadata" Directory: /.../output %#output/_spark_metadata
Incorrect "_spark_metadata" Directory: /.../output%20%25%23output/_spark_metadata

Please move the data from the incorrect directory to the correct one, delete the
incorrect directory, and then restart this query. If you believe you are receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
```

## How was this patch tested?

The new unit tests.

Closes apache#23733 from zsxwing/path-fix.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants