-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-47568][SS] Fix race condition between maintenance thread and load/commit for snapshot files. #45724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@HeartSaVioR @anishshri-db PTAL, thanks ! |
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only nits
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
// do maintenance - upload any latest snapshots so far | ||
// would fail to acquire lock and no snapshots would be uploaded | ||
db.doMaintenance() | ||
db.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we think of the way to verify this? Or is it not feasible as it's about race condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verify that maintenance actually fails here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no snapshot being uploaded at this moment. but OK to skip if it's bound to race condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
Thanks! Merging to master/3.5. |
@sahnib Could you please file a new PR for 3.5? Looks like there is a merge conflict. Thanks in advance! |
…oad/commit for snapshot files ### What changes were proposed in this pull request? This PR fixes a race condition between the maintenance thread and task thread when change-log checkpointing is enabled, and ensure all snapshots are valid. 1. The maintenance thread currently relies on class variable lastSnapshot to find the latest checkpoint and uploads it to DFS. This checkpoint can be modified at commit time by Task thread if a new snapshot is created. 2. The task thread was not resetting the lastSnapshot at load time, which can result in newer snapshots (if a old version is loaded) being considered valid and uploaded to DFS. This results in VersionIdMismatch errors. ### Why are the changes needed? These are logical bugs which can cause `VersionIdMismatch` errors causing user to discard the snapshot and restart the query. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45724 from sahnib/rocks-db-fix. Authored-by: Bhuwan Sahni <bhuwan.sahni@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…and load/commit for snapshot files Backports #45724 to 3.5 ### What changes were proposed in this pull request? This PR fixes a race condition between the maintenance thread and task thread when change-log checkpointing is enabled, and ensure all snapshots are valid. 1. The maintenance thread currently relies on class variable lastSnapshot to find the latest checkpoint and uploads it to DFS. This checkpoint can be modified at commit time by Task thread if a new snapshot is created. 2. The task thread was not resetting the lastSnapshot at load time, which can result in newer snapshots (if a old version is loaded) being considered valid and uploaded to DFS. This results in VersionIdMismatch errors. ### Why are the changes needed? These are logical bugs which can cause `VersionIdMismatch` errors causing user to discard the snapshot and restart the query. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45881 from sahnib/rocks-db-fix-3.5. Authored-by: Bhuwan Sahni <bhuwan.sahni@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…g a deep copy of file mappings in RocksDBFileManager in load() ### What changes were proposed in this pull request? When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by #45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. ### Why are the changes needed? We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46942 from riyaverm-db/remove-lock-contention-between-maintenance-and-task. Authored-by: Riya Verma <riya.verma@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…making a deep copy of file mappings in RocksDBFileManager in load() Backports apache#46942 to 3.5 When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by apache#45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. No Added unit test cases. No Closes apache#46942 from riyaverm-db/remove-lock-contention-between-maintenance-and-task. Authored-by: Riya Verma <riya.verma@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…making a deep copy of file mappings in RocksDBFileManager in load() Backports #46942 to 3.5 ### What changes were proposed in this pull request? When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by #45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. ### Why are the changes needed? We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47130 from riyaverm-db/remove-lock-contention-between-maintenance-and-task-3.5. Lead-authored-by: Riya Verma <riya.verma@databricks.com> Co-authored-by: Riya Verma <170376104+riyaverm-db@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…g a deep copy of file mappings in RocksDBFileManager in load() ### What changes were proposed in this pull request? When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by apache#45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. ### Why are the changes needed? We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46942 from riyaverm-db/remove-lock-contention-between-maintenance-and-task. Authored-by: Riya Verma <riya.verma@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This PR fixes a race condition between the maintenance thread and task thread when change-log checkpointing is enabled, and ensure all snapshots are valid.
Why are the changes needed?
These are logical bugs which can cause
VersionIdMismatch
errors causing user to discard the snapshot and restart the query.Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test cases.
Was this patch authored or co-authored using generative AI tooling?
No