-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-13569][STREAMING][KAFKA] pattern based topic subscription #14026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
796045f
87b5490
723bf1c
f287722
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,10 +22,11 @@ import java.{ lang => jl, util => ju } | |
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.kafka.clients.consumer._ | ||
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener | ||
import org.apache.kafka.common.TopicPartition | ||
|
||
import org.apache.spark.annotation.Experimental | ||
|
||
import org.apache.spark.internal.Logging | ||
|
||
/** | ||
* :: Experimental :: | ||
|
@@ -47,7 +48,9 @@ abstract class ConsumerStrategy[K, V] { | |
|
||
/** | ||
* Must return a fully configured Kafka Consumer, including subscribed or assigned topics. | ||
* See <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Kafka docs</a>. | ||
* This consumer will be used on the driver to query for offsets only, not messages. | ||
* The consumer must be returned in a state that it is safe to call poll(0) on. | ||
* @param currentOffsets A map from TopicPartition to offset, indicating how far the driver | ||
* has successfully read. Will be empty on initial start, possibly non-empty on restart from | ||
* checkpoint. | ||
|
@@ -72,15 +75,83 @@ private case class Subscribe[K, V]( | |
topics: ju.Collection[jl.String], | ||
kafkaParams: ju.Map[String, Object], | ||
offsets: ju.Map[TopicPartition, jl.Long] | ||
) extends ConsumerStrategy[K, V] { | ||
) extends ConsumerStrategy[K, V] with Logging { | ||
|
||
def executorKafkaParams: ju.Map[String, Object] = kafkaParams | ||
|
||
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { | ||
val consumer = new KafkaConsumer[K, V](kafkaParams) | ||
consumer.subscribe(topics) | ||
if (currentOffsets.isEmpty) { | ||
offsets.asScala.foreach { case (topicPartition, offset) => | ||
val toSeek = if (currentOffsets.isEmpty) { | ||
offsets | ||
} else { | ||
currentOffsets | ||
} | ||
if (!toSeek.isEmpty) { | ||
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java maps don't have nonempty, right? |
||
// work around KAFKA-3370 when reset is none | ||
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you comment to explain the problem in short and the work around? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will add comment in the code once I'm back at my workstation. |
||
// poll will throw if no position, i.e. auto offset reset none and no explicit position | ||
// but cant seek to a position before poll, because poll is what gets subscription partitions | ||
// So, poll, suppress the first exception, then seek | ||
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) | ||
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE" | ||
try { | ||
consumer.poll(0) | ||
} catch { | ||
case x: NoOffsetForPartitionException if shouldSuppress => | ||
logWarning("Catching NoOffsetForPartitionException since " + | ||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") | ||
} | ||
toSeek.asScala.foreach { case (topicPartition, offset) => | ||
consumer.seek(topicPartition, offset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 4 chars for indent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Foreach is a scope, case is a nested scope. |
||
} | ||
} | ||
|
||
consumer | ||
} | ||
} | ||
|
||
/** | ||
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | ||
* The pattern matching will be done periodically against topics existing at the time of check. | ||
* @param pattern pattern to subscribe to | ||
* @param kafkaParams Kafka | ||
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> | ||
* configuration parameters</a> to be used on driver. The same params will be used on executors, | ||
* with minor automatic modifications applied. | ||
* Requires "bootstrap.servers" to be set | ||
* with Kafka broker(s) specified in host1:port1,host2:port2 form. | ||
* @param offsets: offsets to begin at on initial startup. If no offset is given for a | ||
* TopicPartition, the committed offset (if applicable) or kafka param | ||
* auto.offset.reset will be used. | ||
*/ | ||
private case class SubscribePattern[K, V]( | ||
pattern: ju.regex.Pattern, | ||
kafkaParams: ju.Map[String, Object], | ||
offsets: ju.Map[TopicPartition, jl.Long] | ||
) extends ConsumerStrategy[K, V] with Logging { | ||
|
||
def executorKafkaParams: ju.Map[String, Object] = kafkaParams | ||
|
||
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { | ||
val consumer = new KafkaConsumer[K, V](kafkaParams) | ||
consumer.subscribe(pattern, new NoOpConsumerRebalanceListener()) | ||
val toSeek = if (currentOffsets.isEmpty) { | ||
offsets | ||
} else { | ||
currentOffsets | ||
} | ||
if (!toSeek.isEmpty) { | ||
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above | ||
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) | ||
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE" | ||
try { | ||
consumer.poll(0) | ||
} catch { | ||
case x: NoOffsetForPartitionException if shouldSuppress => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. log as warn. |
||
logWarning("Catching NoOffsetForPartitionException since " + | ||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") | ||
} | ||
toSeek.asScala.foreach { case (topicPartition, offset) => | ||
consumer.seek(topicPartition, offset) | ||
} | ||
} | ||
|
@@ -113,8 +184,14 @@ private case class Assign[K, V]( | |
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { | ||
val consumer = new KafkaConsumer[K, V](kafkaParams) | ||
consumer.assign(topicPartitions) | ||
if (currentOffsets.isEmpty) { | ||
offsets.asScala.foreach { case (topicPartition, offset) => | ||
val toSeek = if (currentOffsets.isEmpty) { | ||
offsets | ||
} else { | ||
currentOffsets | ||
} | ||
if (!toSeek.isEmpty) { | ||
// this doesn't need a KAFKA-3370 workaround, because partitions are known, no poll needed | ||
toSeek.asScala.foreach { case (topicPartition, offset) => | ||
consumer.seek(topicPartition, offset) | ||
} | ||
} | ||
|
@@ -215,6 +292,95 @@ object ConsumerStrategies { | |
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]()) | ||
} | ||
|
||
/** :: Experimental :: | ||
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | ||
* The pattern matching will be done periodically against topics existing at the time of check. | ||
* @param pattern pattern to subscribe to | ||
* @param kafkaParams Kafka | ||
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> | ||
* configuration parameters</a> to be used on driver. The same params will be used on executors, | ||
* with minor automatic modifications applied. | ||
* Requires "bootstrap.servers" to be set | ||
* with Kafka broker(s) specified in host1:port1,host2:port2 form. | ||
* @param offsets: offsets to begin at on initial startup. If no offset is given for a | ||
* TopicPartition, the committed offset (if applicable) or kafka param | ||
* auto.offset.reset will be used. | ||
*/ | ||
@Experimental | ||
def SubscribePattern[K, V]( | ||
pattern: ju.regex.Pattern, | ||
kafkaParams: collection.Map[String, Object], | ||
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { | ||
new SubscribePattern[K, V]( | ||
pattern, | ||
new ju.HashMap[String, Object](kafkaParams.asJava), | ||
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) | ||
} | ||
|
||
/** :: Experimental :: | ||
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | ||
* The pattern matching will be done periodically against topics existing at the time of check. | ||
* @param pattern pattern to subscribe to | ||
* @param kafkaParams Kafka | ||
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> | ||
* configuration parameters</a> to be used on driver. The same params will be used on executors, | ||
* with minor automatic modifications applied. | ||
* Requires "bootstrap.servers" to be set | ||
* with Kafka broker(s) specified in host1:port1,host2:port2 form. | ||
*/ | ||
@Experimental | ||
def SubscribePattern[K, V]( | ||
pattern: ju.regex.Pattern, | ||
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { | ||
new SubscribePattern[K, V]( | ||
pattern, | ||
new ju.HashMap[String, Object](kafkaParams.asJava), | ||
ju.Collections.emptyMap[TopicPartition, jl.Long]()) | ||
} | ||
|
||
/** :: Experimental :: | ||
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | ||
* The pattern matching will be done periodically against topics existing at the time of check. | ||
* @param pattern pattern to subscribe to | ||
* @param kafkaParams Kafka | ||
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> | ||
* configuration parameters</a> to be used on driver. The same params will be used on executors, | ||
* with minor automatic modifications applied. | ||
* Requires "bootstrap.servers" to be set | ||
* with Kafka broker(s) specified in host1:port1,host2:port2 form. | ||
* @param offsets: offsets to begin at on initial startup. If no offset is given for a | ||
* TopicPartition, the committed offset (if applicable) or kafka param | ||
* auto.offset.reset will be used. | ||
*/ | ||
@Experimental | ||
def SubscribePattern[K, V]( | ||
pattern: ju.regex.Pattern, | ||
kafkaParams: ju.Map[String, Object], | ||
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { | ||
new SubscribePattern[K, V](pattern, kafkaParams, offsets) | ||
} | ||
|
||
/** :: Experimental :: | ||
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | ||
* The pattern matching will be done periodically against topics existing at the time of check. | ||
* @param pattern pattern to subscribe to | ||
* @param kafkaParams Kafka | ||
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> | ||
* configuration parameters</a> to be used on driver. The same params will be used on executors, | ||
* with minor automatic modifications applied. | ||
* Requires "bootstrap.servers" to be set | ||
* with Kafka broker(s) specified in host1:port1,host2:port2 form. | ||
*/ | ||
@Experimental | ||
def SubscribePattern[K, V]( | ||
pattern: ju.regex.Pattern, | ||
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { | ||
new SubscribePattern[K, V]( | ||
pattern, | ||
kafkaParams, | ||
ju.Collections.emptyMap[TopicPartition, jl.Long]()) | ||
} | ||
|
||
/** | ||
* :: Experimental :: | ||
* Assign a fixed collection of TopicPartitions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a change from the semantics with current offsets. earlier logic was: if there is current offsets (e.g. restart from checkpoint), dont seek. This was fine because the DStream new the starting offsets of the next batch, so setting the current position of the consumer was not needed.
Why this change now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer needs to be in a state that poll() is safe to call on it. With auto offset reset none, a consumer with no positions will throw if poll is called. You're right that the driver knows the recovered positions, this is more about working around consumer state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but you are calling poll before the seek? why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because without poll, for subscriptions you don't have partition assignments. It's an interesting consumer api, to say the least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess it wasnt designed for rapidly seeking back and forth. so to confirm my understanding, these are the following calls in Subscribe
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the comments below. That's confirmed. Thanks for adding the comments!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, yeah.
On Jul 8, 2016 7:38 PM, "Tathagata Das" notifications@github.com wrote: