-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
Changes from all commits
6db0e3d
7dad0c1
2475173
07267b5
9d902d7
eddb3c7
1a3d20a
292ec5d
aa337c1
dfa712e
4ebd078
1656580
61dea35
5229152
4825215
20e1b9c
9eb6c76
4d4cd70
1870b35
fe9cea1
3f266c1
2eb6646
be30817
3ece6f2
ef9b095
876256e
1a23abb
97ee3ef
23639f4
40b6dc6
e15213e
42d952f
6f1425d
4deb63e
d140708
337785d
9dbe295
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
eason-yuchen-liu marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -226,6 +226,80 @@ class RocksDB( | |
this | ||
} | ||
|
||
/** | ||
* Load from the start snapshot version and apply all the changelog records to reach the | ||
* end version. Note that this will copy all the necessary files from DFS to local disk as needed, | ||
* and possibly restart the native RocksDB instance. | ||
* | ||
* @param snapshotVersion version of the snapshot to start with | ||
* @param endVersion end version | ||
* @return A RocksDB instance loaded with the state endVersion replayed from snapshotVersion. | ||
* Note that the instance will be read-only since this method is only used in State Data | ||
* Source. | ||
*/ | ||
def loadFromSnapshot(snapshotVersion: Long, endVersion: Long): RocksDB = { | ||
assert(snapshotVersion >= 0 && endVersion >= snapshotVersion) | ||
acquire(LoadStore) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lock release path is still the same right ? i assume we release on an abort ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I am copying the existing implementation. Any changes needed here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea - Im guessing the unlock happens in the end as part of an abort within the state data source reader |
||
recordedMetrics = None | ||
logInfo( | ||
log"Loading snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " + | ||
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.") | ||
try { | ||
replayFromCheckpoint(snapshotVersion, endVersion) | ||
|
||
logInfo( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
log"Loaded snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " + | ||
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.") | ||
} catch { | ||
case t: Throwable => | ||
loadedVersion = -1 // invalidate loaded data | ||
throw t | ||
} | ||
this | ||
} | ||
|
||
/** | ||
* Load from the start checkpoint version and apply all the changelog records to reach the | ||
* end version. | ||
* If the start version does not exist, it will throw an exception. | ||
* | ||
* @param snapshotVersion start checkpoint version | ||
* @param endVersion end version | ||
*/ | ||
private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): Any = { | ||
closeDB() | ||
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir) | ||
loadedVersion = snapshotVersion | ||
|
||
// reset last snapshot version | ||
if (lastSnapshotVersion > snapshotVersion) { | ||
// discard any newer snapshots | ||
lastSnapshotVersion = 0L | ||
latestSnapshot = None | ||
} | ||
openDB() | ||
|
||
numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { | ||
// we don't track the total number of rows - discard the number being track | ||
-1L | ||
} else if (metadata.numKeys < 0) { | ||
// we track the total number of rows, but the snapshot doesn't have tracking number | ||
// need to count keys now | ||
countKeys() | ||
} else { | ||
metadata.numKeys | ||
} | ||
if (loadedVersion != endVersion) replayChangelog(endVersion) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to see user-friendly error message when changelog file does not exist. Let's say, users may be actually not using changelog checkpointing and somehow mislead that it's supported. Providing FileNotFoundException to them does not give an hint what is possibly not correct - smart user may just notice what is wrong, but better to be user-friendly, and also be a part of error class framework. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error users will get now when the changelog does not exist is:
It does not have its own error class so I think we should put this to further tasks: Put this error to its own error class and catch it here to remind user of possible cause. |
||
// After changelog replay the numKeysOnWritingVersion will be updated to | ||
// the correct number of keys in the loaded version. | ||
numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics | ||
|
||
if (conf.resetStatsOnLoad) { | ||
nativeStats.reset | ||
} | ||
} | ||
|
||
/** | ||
* Replay change log from the loaded version to the target version. | ||
*/ | ||
|
Uh oh!
There was an error while loading. Please reload this page.