-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17813][SQL][KAFKA] Maximum data per trigger #15527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c45ded7
12d3988
3120fd8
35bb8c3
2e53e5a
5e4511f
cae967c
6c8d459
76ecafc
fde4e33
64fca67
6a7ff24
5e4b468
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,6 +96,9 @@ private[kafka010] case class KafkaSource( | |
private val offsetFetchAttemptIntervalMs = | ||
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong | ||
|
||
private val maxOffsetsPerTrigger = | ||
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) | ||
|
||
/** | ||
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the | ||
* offsets and never commits them. | ||
|
@@ -121,16 +124,63 @@ private[kafka010] case class KafkaSource( | |
}.partitionToOffsets | ||
} | ||
|
||
private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None | ||
|
||
override def schema: StructType = KafkaSource.kafkaSchema | ||
|
||
/** Returns the maximum available offset for this source. */ | ||
override def getOffset: Option[Offset] = { | ||
// Make sure initialPartitionOffsets is initialized | ||
initialPartitionOffsets | ||
|
||
val offset = KafkaSourceOffset(fetchLatestOffsets()) | ||
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}") | ||
Some(offset) | ||
val latest = fetchLatestOffsets() | ||
val offsets = maxOffsetsPerTrigger match { | ||
case None => | ||
latest | ||
case Some(limit) if currentPartitionOffsets.isEmpty => | ||
rateLimit(limit, initialPartitionOffsets, latest) | ||
case Some(limit) => | ||
rateLimit(limit, currentPartitionOffsets.get, latest) | ||
} | ||
|
||
currentPartitionOffsets = Some(offsets) | ||
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") | ||
Some(KafkaSourceOffset(offsets)) | ||
} | ||
|
||
/** Proportionally distribute limit number of offsets among topicpartitions */ | ||
private def rateLimit( | ||
limit: Long, | ||
from: Map[TopicPartition, Long], | ||
until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { | ||
val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq) | ||
val sizes = until.flatMap { | ||
case (tp, end) => | ||
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it | ||
from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => | ||
val size = end - begin | ||
logDebug(s"rateLimit $tp size is $size") | ||
if (size > 0) Some(tp -> size) else None | ||
} | ||
} | ||
val total = sizes.values.sum.toDouble | ||
if (total < 1) { | ||
until | ||
} else { | ||
until.map { | ||
case (tp, end) => | ||
tp -> sizes.get(tp).map { size => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: use 2 spaces |
||
val begin = from.get(tp).getOrElse(fromNew(tp)) | ||
val prorate = limit * (size / total) | ||
logDebug(s"rateLimit $tp prorated amount is $prorate") | ||
// Don't completely starve small topicpartitions | ||
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong | ||
logDebug(s"rateLimit $tp new offset is $off") | ||
// Paranoia, make sure not to return an offset that's past end | ||
Math.min(end, off) | ||
}.getOrElse(end) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -153,11 +203,7 @@ private[kafka010] case class KafkaSource( | |
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't it be set to the highest available offset in the streaming metadata log, not the highest available offset in kafka? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I see what you were saying, thought you were talking about resetting it in getOffset, not the end passed to getBatch. |
||
// Find the new partitions, and get their earliest offsets | ||
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) | ||
val newPartitionOffsets = if (newPartitions.nonEmpty) { | ||
fetchNewPartitionEarliestOffsets(newPartitions.toSeq) | ||
} else { | ||
Map.empty[TopicPartition, Long] | ||
} | ||
val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq) | ||
if (newPartitionOffsets.keySet != newPartitions) { | ||
// We cannot get from offsets for some partitions. It means they got deleted. | ||
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet) | ||
|
@@ -221,6 +267,12 @@ private[kafka010] case class KafkaSource( | |
|
||
logInfo("GetBatch generating RDD of offset range: " + | ||
offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) | ||
|
||
// On recovery, getBatch will get called before getOffset | ||
if (currentPartitionOffsets.isEmpty) { | ||
currentPartitionOffsets = Some(untilPartitionOffsets) | ||
} | ||
|
||
sqlContext.createDataFrame(rdd, schema) | ||
} | ||
|
||
|
@@ -305,23 +357,28 @@ private[kafka010] case class KafkaSource( | |
* some partitions if they are deleted. | ||
*/ | ||
private def fetchNewPartitionEarliestOffsets( | ||
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { | ||
// Poll to get the latest assigned partitions | ||
consumer.poll(0) | ||
val partitions = consumer.assignment() | ||
consumer.pause(partitions) | ||
logDebug(s"\tPartitions assigned to consumer: $partitions") | ||
|
||
// Get the earliest offset of each partition | ||
consumer.seekToBeginning(partitions) | ||
val partitionOffsets = newPartitions.filter { p => | ||
// When deleting topics happen at the same time, some partitions may not be in `partitions`. | ||
// So we need to ignore them | ||
partitions.contains(p) | ||
}.map(p => p -> consumer.position(p)).toMap | ||
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") | ||
partitionOffsets | ||
} | ||
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = | ||
if (newPartitions.isEmpty) { | ||
Map.empty[TopicPartition, Long] | ||
} else { | ||
withRetriesWithoutInterrupt { | ||
// Poll to get the latest assigned partitions | ||
consumer.poll(0) | ||
val partitions = consumer.assignment() | ||
consumer.pause(partitions) | ||
logDebug(s"\tPartitions assigned to consumer: $partitions") | ||
|
||
// Get the earliest offset of each partition | ||
consumer.seekToBeginning(partitions) | ||
val partitionOffsets = newPartitions.filter { p => | ||
// When deleting topics happen at the same time, some partitions may not be in | ||
// `partitions`. So we need to ignore them | ||
partitions.contains(p) | ||
}.map(p => p -> consumer.position(p)).toMap | ||
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") | ||
partitionOffsets | ||
} | ||
} | ||
|
||
/** | ||
* Helper function that does multiple retries on the a body of code that returns offsets. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,13 +23,14 @@ import scala.util.Random | |
|
||
import org.apache.kafka.clients.producer.RecordMetadata | ||
import org.apache.kafka.common.TopicPartition | ||
import org.scalatest.concurrent.Eventually._ | ||
import org.scalatest.concurrent.PatienceConfiguration.Timeout | ||
import org.scalatest.time.SpanSugar._ | ||
|
||
import org.apache.spark.sql.execution.streaming._ | ||
import org.apache.spark.sql.streaming.StreamTest | ||
import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest } | ||
import org.apache.spark.sql.test.SharedSQLContext | ||
|
||
|
||
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { | ||
|
||
protected var testUtils: KafkaTestUtils = _ | ||
|
@@ -133,6 +134,72 @@ class KafkaSourceSuite extends KafkaSourceTest { | |
|
||
private val topicId = new AtomicInteger(0) | ||
|
||
test("maxOffsetsPerTrigger") { | ||
val topic = newTopic() | ||
testUtils.createTopic(topic, partitions = 3) | ||
testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) | ||
testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) | ||
testUtils.sendMessages(topic, Array("1"), Some(2)) | ||
|
||
val reader = spark | ||
.readStream | ||
.format("kafka") | ||
.option("kafka.bootstrap.servers", testUtils.brokerAddress) | ||
.option("kafka.metadata.max.age.ms", "1") | ||
.option("maxOffsetsPerTrigger", 10) | ||
.option("subscribe", topic) | ||
.option("startingOffsets", "earliest") | ||
val kafka = reader.load() | ||
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") | ||
.as[(String, String)] | ||
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) | ||
|
||
val clock = new StreamManualClock | ||
|
||
val waitUntilBatchProcessed = AssertOnQuery { q => | ||
eventually(Timeout(streamingTimeout)) { | ||
if (!q.exception.isDefined) { | ||
assert(clock.isStreamWaitingAt(clock.getTimeMillis())) | ||
} | ||
} | ||
if (q.exception.isDefined) { | ||
throw q.exception.get | ||
} | ||
true | ||
} | ||
|
||
testStream(mapped)( | ||
StartStream(ProcessingTime(100), clock), | ||
waitUntilBatchProcessed, | ||
// 1 from smallest, 1 from middle, 8 from biggest | ||
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a race condition here. The batch may be still running. I figured out the following codes to cover the recovery and fix the race condition finally. test("maxOffsetsPerTrigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, Array("1"), Some(2))
val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 10)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
val clock = new StreamManualClock
val waitUntilBatchProcessed = AssertOnQuery { q =>
eventually(Timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 1 from smallest, 1 from middle, 8 from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
),
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125
),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
)
)
} This test fails now because of an issue being fixed in #14553. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, #14553 got merged. |
||
AdvanceManualClock(100), | ||
waitUntilBatchProcessed, | ||
// smallest now empty, 1 more from middle, 9 more from biggest | ||
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, | ||
11, 108, 109, 110, 111, 112, 113, 114, 115, 116 | ||
), | ||
StopStream, | ||
StartStream(ProcessingTime(100), clock), | ||
waitUntilBatchProcessed, | ||
AdvanceManualClock(100), | ||
waitUntilBatchProcessed, | ||
// smallest now empty, 1 more from middle, 9 more from biggest | ||
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, | ||
11, 108, 109, 110, 111, 112, 113, 114, 115, 116, | ||
12, 117, 118, 119, 120, 121, 122, 123, 124, 125 | ||
), | ||
AdvanceManualClock(100), | ||
waitUntilBatchProcessed, | ||
// smallest now empty, 1 more from middle, 9 more from biggest | ||
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, | ||
11, 108, 109, 110, 111, 112, 113, 114, 115, 116, | ||
12, 117, 118, 119, 120, 121, 122, 123, 124, 125, | ||
13, 126, 127, 128, 129, 130, 131, 132, 133, 134 | ||
) | ||
) | ||
} | ||
|
||
test("cannot stop Kafka stream") { | ||
val topic = newTopic() | ||
testUtils.createTopic(newTopic(), partitions = 5) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use 2 spaces