-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-28784][SS] Use CheckpointFileManager in StreamExecution/StreamingQueryManager for checkpoint dirs #25514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@tdas Could you please take a look at this PR ? |
Test build #109428 has finished for PR 25514 at commit
|
Thank you for your first contribution, @shrutig . |
Retest this please. |
Updated DataStreamReaderWriterSuite tests which were relying on resolvedCheckpointRoot variable in StreamExecution to use the checkpointFileManager method. |
Test build #109519 has finished for PR 25514 at commit
|
Test build #109528 has finished for PR 25514 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore this. (I removed my previous comment since it's outdated.)
IMO, it's not worth to switch to CheckpointFileManager considering we just found some issues in FileContext's checksum. See https://issues.apache.org/jira/browse/SPARK-28025. I'm inclined to use mature FileSystem APIs for now. |
… tests to reflect changes make for CheckpointFileManager usages
@dongjoon-hyun I was able to replicate the failing test in my local and I have submitted my changes to test files to fix that. |
@zsxwing CheckpointFileManager is already being used to handle checkpoint files in multiple places, such as HDFSMetadataLog, HDFSBackedStateStoreProvider and others as part of PR #21048 . As far as the issue in https://issues.apache.org/jira/browse/SPARK-28025 is concerned, in my opinion that should not hinder this PR. That issue has been approved and merged as part of a PR - #25565 . Also, it was caused due to a bug in a specific implementation of CheckpointFileManager (FileContextBasedCheckpointFileManager). |
Test build #109753 has finished for PR 25514 at commit
|
Retest this please. |
Hi, @zsxwing . |
IMO, I don't feel comfortable to move to a known broken API when the existing codes work. |
Test build #109960 has finished for PR 25514 at commit
|
The issue not yet fixed, it may happen in https://issues.apache.org/jira/browse/HADOOP-16255. |
I want to clear up some concerns which were mentioned here:
cc @tdas |
It also looks like the default implementation issue has been fixed in https://issues.apache.org/jira/browse/HADOOP-16255 Since the |
Jenkins ok to test. |
@dongjoon-hyun it looks like you aren't asking for any specific change anymore, is that correct? |
@@ -133,9 +134,8 @@ abstract class StreamExecution( | |||
.stripMargin) | |||
} | |||
} | |||
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makeQualified
disappeared from here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. We're using the checkpointFileManager
for this behavior now in the PR so we are not directly making FS operations. Looking inside of the default FileSystem impl we don't explicitly translate this to a fully-qualified path, do you think that would be an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this ends-up in behavior change it may break existing queries. I think there must be a reason when we do such change.
@@ -444,7 +444,9 @@ class StreamSuite extends StreamTest { | |||
val inputData = MemoryStream[String] | |||
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*")) | |||
// Test StreamingQuery.display | |||
val dir1 = Utils.createTempDir().getCanonicalFile | |||
val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this (and similar changes)? The original test is running fine with temp checkpoint.
Test build #111021 has finished for PR 25514 at commit
|
@@ -106,7 +107,7 @@ abstract class StreamExecution( | |||
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString | |||
val legacyCheckpointDirExists = | |||
try { | |||
fs.exists(new Path(legacyCheckpointDir)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here would it make sense to keep the fs check as well since we want to catch the previously created check point directories and suggest folks migrate?
@@ -133,9 +134,8 @@ abstract class StreamExecution( | |||
.stripMargin) | |||
} | |||
} | |||
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. We're using the checkpointFileManager
for this behavior now in the PR so we are not directly making FS operations. Looking inside of the default FileSystem impl we don't explicitly translate this to a fully-qualified path, do you think that would be an issue?
Retest this please. |
Test build #114916 has finished for PR 25514 at commit
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
After
SPARK-23966 Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface
, the CheckpointFileManager interface was created to handle all structured streaming checkpointing operations and helps users to choose how they wish to write checkpointing files atomically.StreamExecution and StreamingQueryManager still uses some FileSystem operations without using the CheckpointFileManager.
For instance,
Instead, StreamExecution and StreamingQueryManager should use CheckpointFileManager for these operations.
Why are the changes needed?
This change will allow users to use CheckpointFileManager for structured streaming checkpointing files without need for a separate FileSystem implementation for the same.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests