Skip to content

Commit afd6c11

Browse files
committed
Reflect review comments
1 parent ec1f662 commit afd6c11

File tree

3 files changed

+42
-31
lines changed

3 files changed

+42
-31
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ The following configurations are optional:
368368
<td>none (next preference is <code>startingOffsetsByTimestamp</code>)</td>
369369
<td>streaming and batch</td>
370370
<td>The start point of timestamp when a query is started, a string specifying a starting timestamp for
371-
all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If the matched offset doesn't exist,
371+
all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
372372
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
373373
<p/>
374374
Note1: <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and <code>startingOffsets</code>.<p/>
@@ -381,10 +381,10 @@ The following configurations are optional:
381381
<td>json string
382382
""" {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
383383
</td>
384-
<td>none (the value of <code>startingOffsets</code> will apply)</td>
384+
<td>none (next preference is <code>startingOffsets</code>)</td>
385385
<td>streaming and batch</td>
386386
<td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for
387-
each TopicPartition. Please refer the details on timestamp offset options below. If the matched offset doesn't exist,
387+
each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
388388
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
389389
<p/>
390390
Note1: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
@@ -413,8 +413,8 @@ The following configurations are optional:
413413
<td>none (next preference is <code>endingOffsetsByTimestamp</code>)</td>
414414
<td>batch query</td>
415415
<td>The end point when a batch query is ended, a json string specifying an ending timestamp for
416-
all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the offset will
417-
be set to latest.<p/>
416+
all partitions in topics being subscribed. Please refer the details on timestamp offset options below.
417+
If Kafka doesn't return the matched offset, the offset will be set to latest.<p/>
418418
Note: <code>endingTimestamp</code> takes precedence over <code>endingOffsetsByTimestamp</code> and <code>endingOffsets</code>.<p/>
419419
</td>
420420
</tr>
@@ -426,8 +426,8 @@ The following configurations are optional:
426426
<td>none (next preference is <code>endingOffsets</code>)</td>
427427
<td>batch query</td>
428428
<td>The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition.
429-
Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the offset will
430-
be set to latest.<p/>
429+
Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
430+
the offset will be set to latest.<p/>
431431
Note: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
432432
</td>
433433
</tr>
@@ -529,7 +529,7 @@ The following configurations are optional:
529529
### Details on timestamp offset options
530530

531531
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
532-
The behavior varies across options if the matched offset doesn't exist - check the description of each option.
532+
The behavior varies across options if Kafka doesn't return the matched offset - check the description of each option.
533533

534534
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value.
535535
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -581,32 +581,30 @@ private[kafka010] object KafkaSourceProvider extends Logging {
581581
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = {
582582
// The order below represents "preferences"
583583

584-
// 1. global timestamp
585584
if (params.contains(globalOffsetTimestampOptionKey)) {
585+
// 1. global timestamp
586586
val tsStr = params(globalOffsetTimestampOptionKey).trim
587587
try {
588588
val ts = tsStr.toLong
589-
return GlobalTimestampRangeLimit(ts)
589+
GlobalTimestampRangeLimit(ts)
590590
} catch {
591591
case _: NumberFormatException =>
592592
throw new IllegalArgumentException(s"Expected a single long value, got $tsStr")
593593
}
594-
}
595-
596-
// 2. timestamp per topic partition
597-
if (params.contains(offsetByTimestampOptionKey)) {
594+
} else if (params.contains(offsetByTimestampOptionKey)) {
595+
// 2. timestamp per topic partition
598596
val json = params(offsetByTimestampOptionKey).trim
599-
return SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json))
600-
}
601-
602-
// 3. latest/earliest/offset
603-
params.get(offsetOptionKey).map(_.trim) match {
604-
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" =>
605-
LatestOffsetRangeLimit
606-
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" =>
607-
EarliestOffsetRangeLimit
608-
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
609-
case None => defaultOffsets
597+
SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json))
598+
} else {
599+
// 3. latest/earliest/offset
600+
params.get(offsetOptionKey).map(_.trim) match {
601+
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" =>
602+
LatestOffsetRangeLimit
603+
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" =>
604+
EarliestOffsetRangeLimit
605+
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
606+
case None => defaultOffsets
607+
}
610608
}
611609
}
612610

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
293293
TestUtils.assertExceptionMsg(e, "No offset matched from request")
294294
}
295295

296-
test("specifying both global timestamp and specific timestamp for partition") {
296+
test("preferences on offset related options") {
297297
val (topic, timestamps) = prepareTimestampRelatedUnitTest
298298

299299
/*
@@ -305,17 +305,30 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
305305
* specific timestamp for partition
306306
starting only presented as "second", and ending not presented
307307
308-
Here we expect global timestamp will take effect.
308+
* offsets
309+
starting only presented as "earliest", and ending not presented
310+
311+
The preference goes to global timestamp -> timestamp for partition -> offsets
309312
*/
310-
verifyTimestampRelatedQueryResult({ df =>
311-
val startTopicTimestamps = Map(
312-
(0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*)
313-
val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps)
314313

314+
val startTopicTimestamps = Map(
315+
(0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*)
316+
val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps)
317+
318+
// all options are specified: global timestamp
319+
verifyTimestampRelatedQueryResult({ df =>
315320
df
316321
.option("startingTimestamp", timestamps(2))
317322
.option("startingOffsetsByTimestamp", startingTimestamps)
323+
.option("startingOffsets", "earliest")
318324
}, topic, 20 to 29)
325+
326+
// timestamp for partition and offsets are specified: timestamp for partition
327+
verifyTimestampRelatedQueryResult({ df =>
328+
df
329+
.option("startingOffsetsByTimestamp", startingTimestamps)
330+
.option("startingOffsets", "earliest")
331+
}, topic, 10 to 29)
319332
}
320333

321334
test("no matched offset for timestamp - endingOffsets") {

0 commit comments

Comments
 (0)