Skip to content

Commit d0bd28e

Browse files
[Spark] Fix O(n^2) issue in find last complete checkpoint before (#3060)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR fixes an O(n^2) issue in `find last complete checkpoint before` method. Today the `findLastCompleteCheckpointBefore` tries to find the last checkpoint before a given version. In order to do this, it do following: findLastCompleteCheckpointBefore(10000): 1. List from 9000 2. List from 8000 3. List from 7000 ... Each of these listing today lists to the end as they completely ignore delta files and try to list with takeWhile with version clause: ``` listFrom(..) .filter { file => isCheckpointFile(file) && file.getLen != 0 } .map{ file => CheckpointInstance(file.getPath) } .takeWhile(tv => (cur == 0 || tv.version <= cur) && tv < upperBoundCv) ``` This PR tries to fix this issue by terminating each listing early by checking if we have crossed a deltaFile for untilVersion. In addition to this, we also optimize how much to list in each iteration. E.g. After this PR, findLastCompleteCheckpointBefore(10000) will need: 1. Iteration-1 lists from 9000 to 10000. 2. Iteration-2 lists from 8000 to 9000. 3. Iteration-3 lists from 7000 to 8000. 4. and so on... ## How was this patch tested? UT
1 parent 3baeb99 commit d0bd28e

File tree

4 files changed

+518
-24
lines changed

4 files changed

+518
-24
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import org.apache.spark.sql.delta.actions.{Action, CheckpointMetadata, Metadata,
2828
import org.apache.spark.sql.delta.metering.DeltaLogging
2929
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3030
import org.apache.spark.sql.delta.storage.LogStore
31-
import org.apache.spark.sql.delta.util.DeltaFileOperations
32-
import org.apache.spark.sql.delta.util.FileNames
31+
import org.apache.spark.sql.delta.util.{DeltaFileOperations, DeltaLogGroupingIterator, FileNames}
3332
import org.apache.spark.sql.delta.util.FileNames._
3433
import org.apache.spark.sql.delta.util.JsonUtils
3534
import org.apache.hadoop.conf.Configuration
@@ -417,7 +416,7 @@ trait Checkpoints extends DeltaLogging {
417416
* Note that the returned checkpoint will always be < `version`.
418417
* @param version The checkpoint version to compare against
419418
*/
420-
protected def findLastCompleteCheckpointBefore(version: Long): Option[CheckpointInstance] = {
419+
private[delta] def findLastCompleteCheckpointBefore(version: Long): Option[CheckpointInstance] = {
421420
val upperBound = CheckpointInstance(version, CheckpointInstance.Format.SINGLE, numParts = None)
422421
findLastCompleteCheckpointBefore(Some(upperBound))
423422
}
@@ -428,38 +427,76 @@ trait Checkpoints extends DeltaLogging {
428427
* deltalog directory.
429428
* @param checkpointInstance The checkpoint instance to compare against
430429
*/
431-
protected def findLastCompleteCheckpointBefore(
430+
private[delta] def findLastCompleteCheckpointBefore(
432431
checkpointInstance: Option[CheckpointInstance] = None): Option[CheckpointInstance] = {
433-
val (upperBoundCv, startVersion) = checkpointInstance
434-
.collect { case cv if cv.version >= 0 => (cv, cv.version) }
435-
.getOrElse((CheckpointInstance.sentinelValue(versionOpt = None), 0L))
436-
var cur = startVersion
437-
val hadoopConf = newDeltaHadoopConf()
438-
439-
logInfo(s"Try to find Delta last complete checkpoint before version $startVersion")
440-
while (cur >= 0) {
441-
val checkpoints = store.listFrom(
442-
listingPrefix(logPath, math.max(0, cur - 1000)),
443-
hadoopConf)
444-
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
445-
// such files, hence we drop them so that we never pick up such checkpoints.
446-
.filter { file => isCheckpointFile(file) && file.getLen != 0 }
447-
.map{ file => CheckpointInstance(file.getPath) }
448-
.takeWhile(tv => (cur == 0 || tv.version <= cur) && tv < upperBoundCv)
449-
.toArray
432+
val upperBoundCv = checkpointInstance.filterNot(_.version < 0).getOrElse {
433+
logInfo(s"Try to find Delta last complete checkpoint")
434+
return findLastCompleteCheckpoint()
435+
}
436+
logInfo(s"Try to find Delta last complete checkpoint before version ${upperBoundCv.version}")
437+
var listingEndVersion = upperBoundCv.version
438+
439+
// Do a backward listing from the upperBoundCv version. We list in chunks of 1000 versions.
440+
// ...........................................................................................
441+
// |
442+
// upper bound cv's version
443+
// [ iter-1 looks in this window ]
444+
// [ iter-2 window ]
445+
// [ iter-3 window ]
446+
// |
447+
// latest checkpoint
448+
while (listingEndVersion >= 0) {
449+
val listingStartVersion = math.max(0, listingEndVersion - 1000)
450+
val checkpoints = store
451+
.listFrom(listingPrefix(logPath, listingStartVersion), newDeltaHadoopConf())
452+
.collect {
453+
// Also collect delta files from the listing result so that the next takeWhile helps us
454+
// terminate iterator early if no checkpoint exists upto the `listingEndVersion`
455+
// version.
456+
case DeltaFile(file, version) => (file, FileType.DELTA, version)
457+
case CheckpointFile(file, version) => (file, FileType.CHECKPOINT, version)
458+
}
459+
.takeWhile { case (_, _, currentFileVersion) => currentFileVersion <= listingEndVersion }
460+
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when
461+
// reading such files, hence we drop them so that we never pick up such checkpoints.
462+
.collect { case (file, FileType.CHECKPOINT, _) if file.getLen > 0 =>
463+
CheckpointInstance(file.getPath)
464+
}
465+
// We still need to filter on `upperBoundCv` to eliminate checkpoint files which are
466+
// same version as `upperBoundCv` but have higher [[CheckpointInstance.Format]]. e.g.
467+
// upperBoundCv is a V2_Checkpoint and we have a Single part checkpoint and a v2
468+
// checkpoint at the same version. In such a scenario, we should not consider the
469+
// v2 checkpoint as it is nor lower than the upperBoundCv.
470+
.filter(_ < upperBoundCv)
471+
.toArray
450472
val lastCheckpoint =
451473
getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version))
452474
if (lastCheckpoint.isDefined) {
453475
logInfo(s"Delta checkpoint is found at version ${lastCheckpoint.get.version}")
454476
return lastCheckpoint
455-
} else {
456-
cur -= 1000
457477
}
478+
listingEndVersion = listingEndVersion - 1000
458479
}
459-
logInfo(s"No checkpoint found for Delta table before version $startVersion")
480+
logInfo(s"No checkpoint found for Delta table before version ${upperBoundCv.version}")
460481
None
461482
}
462483

484+
/** Returns the last complete checkpoint in the delta log directory (if any) */
485+
private def findLastCompleteCheckpoint(): Option[CheckpointInstance] = {
486+
val hadoopConf = newDeltaHadoopConf()
487+
val listingResult = store
488+
.listFrom(listingPrefix(logPath, 0L), hadoopConf)
489+
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when
490+
// reading such files, hence we drop them so that we never pick up such checkpoints.
491+
.collect { case CheckpointFile(file, _) if file.getLen != 0 => file }
492+
new DeltaLogGroupingIterator(listingResult)
493+
.flatMap { case (_, files) =>
494+
getLatestCompleteCheckpointFromList(files.map(f => CheckpointInstance(f.getPath)).toArray)
495+
}.foldLeft(Option.empty[CheckpointInstance])((_, right) => Some(right))
496+
// ^The foldLeft here emulates the non-existing Iterator.tailOption method.
497+
498+
}
499+
463500
/**
464501
* Given a list of checkpoint files, pick the latest complete checkpoint instance which is not
465502
* later than `notLaterThan`.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.util
18+
19+
import scala.collection.mutable.ArrayBuffer
20+
21+
import org.apache.spark.sql.delta.util.FileNames.{CheckpointFile, DeltaFile}
22+
import org.apache.hadoop.fs.FileStatus
23+
24+
/**
25+
* An iterator that groups same types of files by version.
26+
* Note that this class could handle only Checkpoints and Delta files.
27+
* For example for an input iterator:
28+
* - 11.checkpoint.0.1.parquet
29+
* - 11.checkpoint.1.1.parquet
30+
* - 11.json
31+
* - 12.checkpoint.parquet
32+
* - 12.json
33+
* - 13.json
34+
* - 14.json
35+
* - 15.checkpoint.0.1.parquet
36+
* - 15.checkpoint.1.1.parquet
37+
* - 15.checkpoint.<uuid>.parquet
38+
* - 15.json
39+
* This will return:
40+
* - (11, Seq(11.checkpoint.0.1.parquet, 11.checkpoint.1.1.parquet, 11.json))
41+
* - (12, Seq(12.checkpoint.parquet, 12.json))
42+
* - (13, Seq(13.json))
43+
* - (14, Seq(14.json))
44+
* - (15, Seq(15.checkpoint.0.1.parquet, 15.checkpoint.1.1.parquet, 15.checkpoint.<uuid>.parquet,
45+
* 15.json))
46+
*/
47+
class DeltaLogGroupingIterator(
48+
checkpointAndDeltas: Iterator[FileStatus]) extends Iterator[(Long, ArrayBuffer[FileStatus])] {
49+
50+
private val bufferedIterator = checkpointAndDeltas.buffered
51+
52+
/**
53+
* Validates that the underlying file is a checkpoint/delta file and returns the corresponding
54+
* version.
55+
*/
56+
private def getFileVersion(file: FileStatus): Long = {
57+
file match {
58+
case DeltaFile(_, version) => version
59+
case CheckpointFile(_, version) => version
60+
case _ =>
61+
throw new IllegalStateException(
62+
s"${file.getPath} is not a valid commit file / checkpoint file")
63+
}
64+
}
65+
66+
override def hasNext: Boolean = bufferedIterator.hasNext
67+
68+
override def next(): (Long, ArrayBuffer[FileStatus]) = {
69+
val first = bufferedIterator.next()
70+
val buffer = scala.collection.mutable.ArrayBuffer(first)
71+
val firstFileVersion = getFileVersion(first)
72+
while (bufferedIterator.headOption.exists(getFileVersion(_) == firstFileVersion)) {
73+
buffer += bufferedIterator.next()
74+
}
75+
firstFileVersion -> buffer
76+
}
77+
}

0 commit comments

Comments
 (0)