@@ -156,42 +156,33 @@ trait CDCReaderImpl extends DeltaLogging {
156
156
import org .apache .spark .sql .delta .commands .cdc .CDCReader ._
157
157
158
158
/**
159
- * Given timestamp or version this method returns the corresponding version for that timestamp
160
- * or the version itself.
159
+ * Given timestamp or version, this method returns the corresponding version for that timestamp
160
+ * or the version itself, as well as how the return version is obtained: by `version` or
161
+ * `timestamp`.
161
162
*/
162
- def getVersionForCDC (
163
+ private def getVersionForCDC (
163
164
spark : SparkSession ,
164
165
deltaLog : DeltaLog ,
165
166
conf : SQLConf ,
166
167
options : CaseInsensitiveStringMap ,
167
168
versionKey : String ,
168
- timestampKey : String ): Option [Long ] = {
169
+ timestampKey : String ): Option [ResolvedCDFVersion ] = {
169
170
if (options.containsKey(versionKey)) {
170
- Some (options.get(versionKey).toLong)
171
+ Some (ResolvedCDFVersion ( options.get(versionKey).toLong, timestamp = None ) )
171
172
} else if (options.containsKey(timestampKey)) {
172
173
val ts = options.get(timestampKey)
173
174
val spec = DeltaTimeTravelSpec (Some (Literal (ts)), None , Some (" cdcReader" ))
175
+ val timestamp = spec.getTimestamp(spark.sessionState.conf)
174
176
val allowOutOfRange = conf.getConf(DeltaSQLConf .DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP )
175
- if (timestampKey == DeltaDataSource .CDC_START_TIMESTAMP_KEY ) {
177
+ val resolvedVersion = if (timestampKey == DeltaDataSource .CDC_START_TIMESTAMP_KEY ) {
176
178
// For the starting timestamp we need to find a version after the provided timestamp
177
179
// we can use the same semantics as streaming.
178
- val resolvedVersion = DeltaSource .getStartingVersionFromTimestamp(
179
- spark,
180
- deltaLog,
181
- spec.getTimestamp(spark.sessionState.conf),
182
- allowOutOfRange
183
- )
184
- Some (resolvedVersion)
180
+ DeltaSource .getStartingVersionFromTimestamp(spark, deltaLog, timestamp, allowOutOfRange)
185
181
} else {
186
182
// For ending timestamp the version should be before the provided timestamp.
187
- val resolvedVersion = DeltaTableUtils .resolveTimeTravelVersion(
188
- conf,
189
- deltaLog,
190
- spec,
191
- allowOutOfRange
192
- )
193
- Some (resolvedVersion._1)
183
+ DeltaTableUtils .resolveTimeTravelVersion(conf, deltaLog, spec, allowOutOfRange)._1
194
184
}
185
+ Some (ResolvedCDFVersion (resolvedVersion, Some (timestamp)))
195
186
} else {
196
187
None
197
188
}
@@ -240,10 +231,7 @@ trait CDCReaderImpl extends DeltaLogging {
240
231
conf,
241
232
options,
242
233
DeltaDataSource .CDC_START_VERSION_KEY ,
243
- DeltaDataSource .CDC_START_TIMESTAMP_KEY
244
- )
245
-
246
- if (startingVersion.isEmpty) {
234
+ DeltaDataSource .CDC_START_TIMESTAMP_KEY ).getOrElse {
247
235
throw DeltaErrors .noStartVersionForCDC()
248
236
}
249
237
@@ -259,29 +247,30 @@ trait CDCReaderImpl extends DeltaLogging {
259
247
s " cannot be used with time travel options. " )
260
248
}
261
249
250
+ def emptyCDFRelation () = {
251
+ new DeltaCDFRelation (
252
+ SnapshotWithSchemaMode (snapshotToUse, schemaMode),
253
+ spark.sqlContext,
254
+ startingVersion = None ,
255
+ endingVersion = None ) {
256
+ override def buildScan (requiredColumns : Array [String ], filters : Array [Filter ]): RDD [Row ] =
257
+ sqlContext.sparkSession.sparkContext.emptyRDD[Row ]
258
+ }
259
+ }
260
+
262
261
// add a version check here that is cheap instead of after trying to list a large version
263
262
// that doesn't exist
264
- if (startingVersion.get > snapshotToUse.version) {
263
+ if (startingVersion.version > snapshotToUse.version) {
265
264
val allowOutOfRange = conf.getConf(DeltaSQLConf .DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP )
266
265
// LS-129: return an empty relation if start version passed in is beyond latest commit version
267
266
if (allowOutOfRange) {
268
- return new DeltaCDFRelation (
269
- SnapshotWithSchemaMode (snapshotToUse, schemaMode),
270
- spark.sqlContext,
271
- None ,
272
- None ) {
273
- override def buildScan (
274
- requiredColumns : Array [String ],
275
- filters : Array [Filter ]): RDD [Row ] = {
276
- sqlContext.sparkSession.sparkContext.emptyRDD[Row ]
277
- }
278
- }
267
+ return emptyCDFRelation()
279
268
}
280
269
throw DeltaErrors .startVersionAfterLatestVersion(
281
- startingVersion.get , snapshotToUse.version)
270
+ startingVersion.version , snapshotToUse.version)
282
271
}
283
272
284
- val endingVersion = getVersionForCDC(
273
+ val endingVersionOpt = getVersionForCDC(
285
274
spark,
286
275
snapshotToUse.deltaLog,
287
276
conf,
@@ -290,17 +279,42 @@ trait CDCReaderImpl extends DeltaLogging {
290
279
DeltaDataSource .CDC_END_TIMESTAMP_KEY
291
280
)
292
281
293
- if (endingVersion.exists(_ < startingVersion.get)) {
294
- throw DeltaErrors .endBeforeStartVersionInCDC(startingVersion.get, endingVersion.get)
282
+ // Given two timestamps, there is a case when both of them lay closely between two versions:
283
+ // version: 4 5
284
+ // ---------|-------------------------------------------------|--------
285
+ // ^ start timestamp ^ end timestamp
286
+ // In this case the starting version will be 5 and ending version will be 4. We must not
287
+ // throw `endBeforeStartVersionInCDC` but return empty result.
288
+ endingVersionOpt.foreach { endingVersion =>
289
+ if (startingVersion.resolvedByTimestamp && endingVersion.resolvedByTimestamp) {
290
+ // The next `if` is true when end is less than start but no commit is in between.
291
+ // We need to capture such a case and throw early.
292
+ if (startingVersion.timestamp.get.after(endingVersion.timestamp.get)) {
293
+ throw DeltaErrors .endBeforeStartVersionInCDC(
294
+ startingVersion.version,
295
+ endingVersion.version)
296
+ }
297
+ if (endingVersion.version == startingVersion.version - 1 ) {
298
+ return emptyCDFRelation()
299
+ }
300
+ }
301
+ }
302
+
303
+ if (endingVersionOpt.exists(_.version < startingVersion.version)) {
304
+ throw DeltaErrors .endBeforeStartVersionInCDC(
305
+ startingVersion.version,
306
+ endingVersionOpt.get.version)
295
307
}
296
308
297
- logInfo(s " startingVersion: $startingVersion, endingVersion: $endingVersion" )
309
+ logInfo(
310
+ s " startingVersion: ${startingVersion.version}, " +
311
+ s " endingVersion: ${endingVersionOpt.map(_.version)}" )
298
312
299
313
DeltaCDFRelation (
300
314
SnapshotWithSchemaMode (snapshotToUse, schemaMode),
301
315
spark.sqlContext,
302
- startingVersion,
303
- endingVersion )
316
+ Some ( startingVersion.version) ,
317
+ endingVersionOpt.map(_.version) )
304
318
}
305
319
306
320
/**
@@ -720,4 +734,15 @@ trait CDCReaderImpl extends DeltaLogging {
720
734
* @param numBytes the total size of the AddFile + RemoveFile + AddCDCFiles that are in the df
721
735
*/
722
736
case class CDCVersionDiffInfo (fileChangeDf : DataFrame , numFiles : Long , numBytes : Long )
737
+
738
+ /**
739
+ * Represents a Delta log version, and how the version is determined.
740
+ * @param version the determined version.
741
+ * @param timestamp the commit timestamp of the determined version. Will be filled when the
742
+ * version is determined by timestamp.
743
+ */
744
+ private case class ResolvedCDFVersion (version : Long , timestamp : Option [Timestamp ]) {
745
+ /** Whether this version is resolved by timestamp. */
746
+ def resolvedByTimestamp : Boolean = timestamp.isDefined
747
+ }
723
748
}
0 commit comments