Skip to content

Commit 8351348

Browse files
xupefeiscottsand-db
authored andcommitted
Return empty CDC result when no commit is in range
This PR improves timestamp handling for CDC reads, so that a range with no commit in between can return an empty DF instead of throwing an exception: ``` version: 4 5 ---------|-------------------------------------------------|-------- ^ start timestamp ^ end timestamp ``` Before: fail with `end version 4 is older than the start version 5`. After: success and return an empty DF. GitOrigin-RevId: c048b00df27b18c7072c205481340007e8bba6f7
1 parent 53b8464 commit 8351348

File tree

3 files changed

+176
-44
lines changed

3 files changed

+176
-44
lines changed

core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ object DeltaTableUtils extends PredicateHelper
377377

378378
/**
379379
* Given a time travel node, resolve which version it is corresponding to for the given table and
380-
* return the resolved version as well as the access type, i.e. by version or timestamp.
380+
* return the resolved version as well as the access type, i.e. by `version` or `timestamp`.
381381
*/
382382
def resolveTimeTravelVersion(
383383
conf: SQLConf,

core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala

Lines changed: 68 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -156,42 +156,33 @@ trait CDCReaderImpl extends DeltaLogging {
156156
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
157157

158158
/**
159-
* Given timestamp or version this method returns the corresponding version for that timestamp
160-
* or the version itself.
159+
* Given timestamp or version, this method returns the corresponding version for that timestamp
160+
* or the version itself, as well as how the return version is obtained: by `version` or
161+
* `timestamp`.
161162
*/
162-
def getVersionForCDC(
163+
private def getVersionForCDC(
163164
spark: SparkSession,
164165
deltaLog: DeltaLog,
165166
conf: SQLConf,
166167
options: CaseInsensitiveStringMap,
167168
versionKey: String,
168-
timestampKey: String): Option[Long] = {
169+
timestampKey: String): Option[ResolvedCDFVersion] = {
169170
if (options.containsKey(versionKey)) {
170-
Some(options.get(versionKey).toLong)
171+
Some(ResolvedCDFVersion(options.get(versionKey).toLong, timestamp = None))
171172
} else if (options.containsKey(timestampKey)) {
172173
val ts = options.get(timestampKey)
173174
val spec = DeltaTimeTravelSpec(Some(Literal(ts)), None, Some("cdcReader"))
175+
val timestamp = spec.getTimestamp(spark.sessionState.conf)
174176
val allowOutOfRange = conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP)
175-
if (timestampKey == DeltaDataSource.CDC_START_TIMESTAMP_KEY) {
177+
val resolvedVersion = if (timestampKey == DeltaDataSource.CDC_START_TIMESTAMP_KEY) {
176178
// For the starting timestamp we need to find a version after the provided timestamp
177179
// we can use the same semantics as streaming.
178-
val resolvedVersion = DeltaSource.getStartingVersionFromTimestamp(
179-
spark,
180-
deltaLog,
181-
spec.getTimestamp(spark.sessionState.conf),
182-
allowOutOfRange
183-
)
184-
Some(resolvedVersion)
180+
DeltaSource.getStartingVersionFromTimestamp(spark, deltaLog, timestamp, allowOutOfRange)
185181
} else {
186182
// For ending timestamp the version should be before the provided timestamp.
187-
val resolvedVersion = DeltaTableUtils.resolveTimeTravelVersion(
188-
conf,
189-
deltaLog,
190-
spec,
191-
allowOutOfRange
192-
)
193-
Some(resolvedVersion._1)
183+
DeltaTableUtils.resolveTimeTravelVersion(conf, deltaLog, spec, allowOutOfRange)._1
194184
}
185+
Some(ResolvedCDFVersion(resolvedVersion, Some(timestamp)))
195186
} else {
196187
None
197188
}
@@ -240,10 +231,7 @@ trait CDCReaderImpl extends DeltaLogging {
240231
conf,
241232
options,
242233
DeltaDataSource.CDC_START_VERSION_KEY,
243-
DeltaDataSource.CDC_START_TIMESTAMP_KEY
244-
)
245-
246-
if (startingVersion.isEmpty) {
234+
DeltaDataSource.CDC_START_TIMESTAMP_KEY).getOrElse {
247235
throw DeltaErrors.noStartVersionForCDC()
248236
}
249237

@@ -259,29 +247,30 @@ trait CDCReaderImpl extends DeltaLogging {
259247
s"cannot be used with time travel options.")
260248
}
261249

250+
def emptyCDFRelation() = {
251+
new DeltaCDFRelation(
252+
SnapshotWithSchemaMode(snapshotToUse, schemaMode),
253+
spark.sqlContext,
254+
startingVersion = None,
255+
endingVersion = None) {
256+
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] =
257+
sqlContext.sparkSession.sparkContext.emptyRDD[Row]
258+
}
259+
}
260+
262261
// add a version check here that is cheap instead of after trying to list a large version
263262
// that doesn't exist
264-
if (startingVersion.get > snapshotToUse.version) {
263+
if (startingVersion.version > snapshotToUse.version) {
265264
val allowOutOfRange = conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP)
266265
// LS-129: return an empty relation if start version passed in is beyond latest commit version
267266
if (allowOutOfRange) {
268-
return new DeltaCDFRelation(
269-
SnapshotWithSchemaMode(snapshotToUse, schemaMode),
270-
spark.sqlContext,
271-
None,
272-
None) {
273-
override def buildScan(
274-
requiredColumns: Array[String],
275-
filters: Array[Filter]): RDD[Row] = {
276-
sqlContext.sparkSession.sparkContext.emptyRDD[Row]
277-
}
278-
}
267+
return emptyCDFRelation()
279268
}
280269
throw DeltaErrors.startVersionAfterLatestVersion(
281-
startingVersion.get, snapshotToUse.version)
270+
startingVersion.version, snapshotToUse.version)
282271
}
283272

284-
val endingVersion = getVersionForCDC(
273+
val endingVersionOpt = getVersionForCDC(
285274
spark,
286275
snapshotToUse.deltaLog,
287276
conf,
@@ -290,17 +279,42 @@ trait CDCReaderImpl extends DeltaLogging {
290279
DeltaDataSource.CDC_END_TIMESTAMP_KEY
291280
)
292281

293-
if (endingVersion.exists(_ < startingVersion.get)) {
294-
throw DeltaErrors.endBeforeStartVersionInCDC(startingVersion.get, endingVersion.get)
282+
// Given two timestamps, there is a case when both of them lay closely between two versions:
283+
// version: 4 5
284+
// ---------|-------------------------------------------------|--------
285+
// ^ start timestamp ^ end timestamp
286+
// In this case the starting version will be 5 and ending version will be 4. We must not
287+
// throw `endBeforeStartVersionInCDC` but return empty result.
288+
endingVersionOpt.foreach { endingVersion =>
289+
if (startingVersion.resolvedByTimestamp && endingVersion.resolvedByTimestamp) {
290+
// The next `if` is true when end is less than start but no commit is in between.
291+
// We need to capture such a case and throw early.
292+
if (startingVersion.timestamp.get.after(endingVersion.timestamp.get)) {
293+
throw DeltaErrors.endBeforeStartVersionInCDC(
294+
startingVersion.version,
295+
endingVersion.version)
296+
}
297+
if (endingVersion.version == startingVersion.version - 1) {
298+
return emptyCDFRelation()
299+
}
300+
}
301+
}
302+
303+
if (endingVersionOpt.exists(_.version < startingVersion.version)) {
304+
throw DeltaErrors.endBeforeStartVersionInCDC(
305+
startingVersion.version,
306+
endingVersionOpt.get.version)
295307
}
296308

297-
logInfo(s"startingVersion: $startingVersion, endingVersion: $endingVersion")
309+
logInfo(
310+
s"startingVersion: ${startingVersion.version}, " +
311+
s"endingVersion: ${endingVersionOpt.map(_.version)}")
298312

299313
DeltaCDFRelation(
300314
SnapshotWithSchemaMode(snapshotToUse, schemaMode),
301315
spark.sqlContext,
302-
startingVersion,
303-
endingVersion)
316+
Some(startingVersion.version),
317+
endingVersionOpt.map(_.version))
304318
}
305319

306320
/**
@@ -720,4 +734,15 @@ trait CDCReaderImpl extends DeltaLogging {
720734
* @param numBytes the total size of the AddFile + RemoveFile + AddCDCFiles that are in the df
721735
*/
722736
case class CDCVersionDiffInfo(fileChangeDf: DataFrame, numFiles: Long, numBytes: Long)
737+
738+
/**
739+
* Represents a Delta log version, and how the version is determined.
740+
* @param version the determined version.
741+
* @param timestamp the commit timestamp of the determined version. Will be filled when the
742+
* version is determined by timestamp.
743+
*/
744+
private case class ResolvedCDFVersion(version: Long, timestamp: Option[Timestamp]) {
745+
/** Whether this version is resolved by timestamp. */
746+
def resolvedByTimestamp: Boolean = timestamp.isDefined
747+
}
723748
}

core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,113 @@ abstract class DeltaCDCSuiteBase
312312
}
313313
}
314314

315+
test("version from timestamp - before the first version") {
316+
withTempDir { tempDir =>
317+
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
318+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
319+
320+
modifyDeltaTimestamp(deltaLog, 0, 4000)
321+
modifyDeltaTimestamp(deltaLog, 1, 8000)
322+
modifyDeltaTimestamp(deltaLog, 2, 12000)
323+
324+
val ts0 = dateFormat.format(new Date(1000))
325+
val ts1 = dateFormat.format(new Date(3000))
326+
intercept[AnalysisException] {
327+
cdcRead(
328+
new TablePath(tempDir.getAbsolutePath),
329+
StartingTimestamp(ts0),
330+
EndingTimestamp(ts1))
331+
.collect()
332+
}.getMessage.contains("before the earliest version")
333+
}
334+
}
335+
336+
test("version from timestamp - between two valid versions") {
337+
withTempDir { tempDir =>
338+
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
339+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
340+
341+
modifyDeltaTimestamp(deltaLog, 0, 0)
342+
modifyDeltaTimestamp(deltaLog, 1, 4000)
343+
modifyDeltaTimestamp(deltaLog, 2, 8000)
344+
345+
val ts0 = dateFormat.format(new Date(1000))
346+
val ts1 = dateFormat.format(new Date(3000))
347+
val readDf = cdcRead(
348+
new TablePath(tempDir.getAbsolutePath), StartingTimestamp(ts0), EndingTimestamp(ts1))
349+
checkCDCAnswer(
350+
DeltaLog.forTable(spark, tempDir),
351+
readDf,
352+
spark.range(0)
353+
.withColumn("_change_type", lit("insert"))
354+
.withColumn("_commit_version", (col("id") / 10).cast(LongType)))
355+
}
356+
}
357+
358+
test("version from timestamp - one version in between") {
359+
withTempDir { tempDir =>
360+
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
361+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
362+
363+
modifyDeltaTimestamp(deltaLog, 0, 0)
364+
modifyDeltaTimestamp(deltaLog, 1, 4000)
365+
modifyDeltaTimestamp(deltaLog, 2, 8000)
366+
367+
val ts0 = dateFormat.format(new Date(3000))
368+
val ts1 = dateFormat.format(new Date(5000))
369+
val readDf = cdcRead(
370+
new TablePath(tempDir.getAbsolutePath), StartingTimestamp(ts0), EndingTimestamp(ts1))
371+
checkCDCAnswer(
372+
DeltaLog.forTable(spark, tempDir),
373+
readDf,
374+
spark.range(10, 20)
375+
.withColumn("_change_type", lit("insert"))
376+
.withColumn("_commit_version", (col("id") / 10).cast(LongType)))
377+
}
378+
}
379+
380+
test("version from timestamp - end before start") {
381+
withTempDir { tempDir =>
382+
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
383+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
384+
385+
modifyDeltaTimestamp(deltaLog, 0, 0)
386+
modifyDeltaTimestamp(deltaLog, 1, 4000)
387+
modifyDeltaTimestamp(deltaLog, 2, 8000)
388+
389+
val ts0 = dateFormat.format(new Date(3000))
390+
val ts1 = dateFormat.format(new Date(1000))
391+
intercept[DeltaIllegalArgumentException] {
392+
cdcRead(
393+
new TablePath(tempDir.getAbsolutePath),
394+
StartingTimestamp(ts0),
395+
EndingTimestamp(ts1))
396+
.collect()
397+
}.getErrorClass === "DELTA_INVALID_CDC_RANGE"
398+
}
399+
}
400+
401+
test("version from timestamp - end before start with one version in between") {
402+
withTempDir { tempDir =>
403+
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
404+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
405+
406+
modifyDeltaTimestamp(deltaLog, 0, 0)
407+
modifyDeltaTimestamp(deltaLog, 1, 4000)
408+
modifyDeltaTimestamp(deltaLog, 2, 8000)
409+
410+
val ts0 = dateFormat.format(new Date(5000))
411+
val ts1 = dateFormat.format(new Date(3000))
412+
intercept[DeltaIllegalArgumentException] {
413+
cdcRead(
414+
new TablePath(tempDir.getAbsolutePath),
415+
StartingTimestamp(ts0),
416+
EndingTimestamp(ts1))
417+
.collect()
418+
}.getErrorClass === "DELTA_INVALID_CDC_RANGE"
419+
}
420+
}
421+
315422
test("start version and end version are the same") {
316423
val tblName = "tbl"
317424
withTable(tblName) {

0 commit comments

Comments
 (0)