Skip to content

[SPARK-28784][SS] Use CheckpointFileManager in StreamExecution/StreamingQueryManager for checkpoint dirs #25514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

shrutig
Copy link
Contributor

@shrutig shrutig commented Aug 20, 2019

What changes were proposed in this pull request?

After SPARK-23966 Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface, the CheckpointFileManager interface was created to handle all structured streaming checkpointing operations and helps users to choose how they wish to write checkpointing files atomically.
StreamExecution and StreamingQueryManager still uses some FileSystem operations without using the CheckpointFileManager.
For instance,

Instead, StreamExecution and StreamingQueryManager should use CheckpointFileManager for these operations.

Why are the changes needed?

This change will allow users to use CheckpointFileManager for structured streaming checkpointing files without need for a separate FileSystem implementation for the same.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests

@shrutig
Copy link
Contributor Author

shrutig commented Aug 20, 2019

@tdas Could you please take a look at this PR ?

@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109428 has finished for PR 25514 at commit 97bcf5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shrutig shrutig changed the title [SPARK-28784]StreamExecution and StreamingQueryManager should utilize CheckpointFileManager to interact with checkpoint directories [SPARK-28784][SS]StreamExecution and StreamingQueryManager should utilize CheckpointFileManager to interact with checkpoint directories Aug 21, 2019
@dongjoon-hyun
Copy link
Member

Thank you for your first contribution, @shrutig .
Could you add a unit test case for your contribution? That will protect your contribution from the accidental future regressions.

@dongjoon-hyun
Copy link
Member

Retest this please.

@shrutig
Copy link
Contributor Author

shrutig commented Aug 21, 2019

Updated DataStreamReaderWriterSuite tests which were relying on resolvedCheckpointRoot variable in StreamExecution to use the checkpointFileManager method.
Plus, there is another test "should resolve the checkpoint path" already existing in StreamSuite file.

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109519 has finished for PR 25514 at commit 97bcf5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2019

Test build #109528 has finished for PR 25514 at commit 9a92b5e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore this. (I removed my previous comment since it's outdated.)

@zsxwing
Copy link
Member

zsxwing commented Aug 22, 2019

IMO, it's not worth to switch to CheckpointFileManager considering we just found some issues in FileContext's checksum. See https://issues.apache.org/jira/browse/SPARK-28025. I'm inclined to use mature FileSystem APIs for now.

… tests to reflect changes make for CheckpointFileManager usages
@shrutig
Copy link
Contributor Author

shrutig commented Aug 26, 2019

@dongjoon-hyun I was able to replicate the failing test in my local and I have submitted my changes to test files to fix that.

@shrutig
Copy link
Contributor Author

shrutig commented Aug 27, 2019

@zsxwing CheckpointFileManager is already being used to handle checkpoint files in multiple places, such as HDFSMetadataLog, HDFSBackedStateStoreProvider and others as part of PR #21048 .
This PR aims to make that change uniformly in other classes.

As far as the issue in https://issues.apache.org/jira/browse/SPARK-28025 is concerned, in my opinion that should not hinder this PR. That issue has been approved and merged as part of a PR - #25565 . Also, it was caused due to a bug in a specific implementation of CheckpointFileManager (FileContextBasedCheckpointFileManager).

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109753 has finished for PR 25514 at commit 73ff29b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-28784][SS]StreamExecution and StreamingQueryManager should utilize CheckpointFileManager to interact with checkpoint directories [SPARK-28784][SS] Use CheckpointFileManager in StreamExecution/StreamingQueryManager for checkpoint dirs Aug 30, 2019
@dongjoon-hyun
Copy link
Member

Hi, @zsxwing .
If this PR makes these consistently, isn't it easier for us to change this together later without leaving something behind?

@zsxwing
Copy link
Member

zsxwing commented Aug 30, 2019

IMO, I don't feel comfortable to move to a known broken API when the existing codes work.

@dongjoon-hyun
Copy link
Member

I got it. Thank you for the guide, @zsxwing .

In this case, @shrutig . We had better close this PR. I know your intention, but we cannot merge this according to the above advice. Could you close this?

@holdenk
Copy link
Contributor

holdenk commented Aug 30, 2019

Isn't the issues @zsxwing refers to fixed in #25488 ? Also if this is still broken then maybe we should revert all uses of CheckPointManager? It seems that whichever we do we should do consistently.

@SparkQA
Copy link

SparkQA commented Aug 30, 2019

Test build #109960 has finished for PR 25514 at commit 73ff29b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

The issue not yet fixed, it may happen in https://issues.apache.org/jira/browse/HADOOP-16255.

@shrutig
Copy link
Contributor Author

shrutig commented Sep 5, 2019

I want to clear up some concerns which were mentioned here:

cc @tdas

@holdenk
Copy link
Contributor

holdenk commented Sep 14, 2019

It also looks like the default implementation issue has been fixed in https://issues.apache.org/jira/browse/HADOOP-16255

Since the FileContextCheckpointFileManager doesn't have any known issues with these code paths that are being added, unifying access to the same data is probably better, and from the original PR (cbb41a0) adding the CheckpointFileManager it sounds like the goal is to make this plug-gable which only works if it is used consistently inside of Spark (although @tdas can correct me on that point).

@holdenk
Copy link
Contributor

holdenk commented Sep 19, 2019

Jenkins ok to test.

@holdenk
Copy link
Contributor

holdenk commented Sep 19, 2019

@dongjoon-hyun it looks like you aren't asking for any specific change anymore, is that correct?

@@ -133,9 +134,8 @@ abstract class StreamExecution(
.stripMargin)
}
}
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makeQualified disappeared from here, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. We're using the checkpointFileManager for this behavior now in the PR so we are not directly making FS operations. Looking inside of the default FileSystem impl we don't explicitly translate this to a fully-qualified path, do you think that would be an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this ends-up in behavior change it may break existing queries. I think there must be a reason when we do such change.

@@ -444,7 +444,9 @@ class StreamSuite extends StreamTest {
val inputData = MemoryStream[String]
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
// Test StreamingQuery.display
val dir1 = Utils.createTempDir().getCanonicalFile
val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this (and similar changes)? The original test is running fine with temp checkpoint.

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #111021 has finished for PR 25514 at commit 73ff29b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -106,7 +107,7 @@ abstract class StreamExecution(
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
val legacyCheckpointDirExists =
try {
fs.exists(new Path(legacyCheckpointDir))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here would it make sense to keep the fs check as well since we want to catch the previously created check point directories and suggest folks migrate?

@@ -133,9 +134,8 @@ abstract class StreamExecution(
.stripMargin)
}
}
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. We're using the checkpointFileManager for this behavior now in the PR so we are not directly making FS operations. Looking inside of the default FileSystem impl we don't explicitly translate this to a fully-qualified path, do you think that would be an issue?

@dongjoon-hyun dongjoon-hyun self-requested a review December 5, 2019 19:41
@dongjoon-hyun dongjoon-hyun dismissed their stale review December 5, 2019 19:42

Remove the outdated review.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Dec 5, 2019

Test build #114916 has finished for PR 25514 at commit 73ff29b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 15, 2020
@github-actions github-actions bot closed this Mar 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants