Skip to content

[SPARK-13569][STREAMING][KAFKA] pattern based topic subscription #14026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import java.{ lang => jl, util => ju }
import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition

import org.apache.spark.annotation.Experimental

import org.apache.spark.internal.Logging

/**
* :: Experimental ::
Expand All @@ -47,7 +48,9 @@ abstract class ConsumerStrategy[K, V] {

/**
* Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
* See <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Kafka docs</a>.
* This consumer will be used on the driver to query for offsets only, not messages.
* The consumer must be returned in a state that it is safe to call poll(0) on.
* @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
Expand All @@ -72,15 +75,83 @@ private case class Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {
) extends ConsumerStrategy[K, V] with Logging {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
val toSeek = if (currentOffsets.isEmpty) {
offsets
} else {
currentOffsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change from the semantics with current offsets. earlier logic was: if there is current offsets (e.g. restart from checkpoint), dont seek. This was fine because the DStream new the starting offsets of the next batch, so setting the current position of the consumer was not needed.

Why this change now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer needs to be in a state that poll() is safe to call on it. With auto offset reset none, a consumer with no positions will throw if poll is called. You're right that the driver knows the recovered positions, this is more about working around consumer state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you are calling poll before the seek? why is that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because without poll, for subscriptions you don't have partition assignments. It's an interesting consumer api, to say the least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess it wasnt designed for rapidly seeking back and forth. so to confirm my understanding, these are the following calls in Subscribe

  • poll - to get the initial assignments. handles none case of no committed offset
  • seek - set the position to whatever is known from user-provided offsets or current offsets, so that later regular polls in the DStream can proceed with as little difficulty as possible.

right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the comments below. That's confirmed. Thanks for adding the comments!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, yeah.
On Jul 8, 2016 7:38 PM, "Tathagata Das" notifications@github.com wrote:

In
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
#14026 (comment):

@@ -79,8 +81,71 @@ private case class Subscribe[K, V](
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumerK, V
consumer.subscribe(topics)

  • if (currentOffsets.isEmpty) {
  •  offsets.asScala.foreach { case (topicPartition, offset) =>
    
  • val toSeek = if (currentOffsets.isEmpty) {
  •  offsets
    
  • } else {
  •  currentOffsets
    

guess it wasnt designed for rapidly seeking back and forth. so to confirm
my understanding, these are the following calls in Subscribe

  • poll - to get the initial assignments. handles none case of no
    committed offset
  • seek - set the position to whatever is known from user-provided
    offsets or current offsets, so that later regular polls in the DStream can
    proceed with as little difficulty as possible.

right?


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/apache/spark/pull/14026/files/796045ff3ad53afeb56cdddb69c4770090f7c168#r70158281,
or mute the thread
https://github.com/notifications/unsubscribe/AAGAB8pP18jSrIgoWLnugKDNUnTrlRuwks5qTu2FgaJpZM4JDmrK
.

}
if (!toSeek.isEmpty) {
Copy link
Contributor

@tdas tdas Jul 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: toSeek.nonEmpty nvm, its java map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java maps don't have nonempty, right?

// work around KAFKA-3370 when reset is none
Copy link
Contributor

@tdas tdas Jul 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment to explain the problem in short and the work around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add comment in the code once I'm back at my workstation.

// poll will throw if no position, i.e. auto offset reset none and no explicit position
// but cant seek to a position before poll, because poll is what gets subscription partitions
// So, poll, suppress the first exception, then seek
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
try {
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 chars for indent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Foreach is a scope, case is a nested scope.

}
}

consumer
}
}

/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
private case class SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] with Logging {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
val toSeek = if (currentOffsets.isEmpty) {
offsets
} else {
currentOffsets
}
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
try {
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. log as warn.

logWarning("Catching NoOffsetForPartitionException since " +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
Expand Down Expand Up @@ -113,8 +184,14 @@ private case class Assign[K, V](
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
val toSeek = if (currentOffsets.isEmpty) {
offsets
} else {
currentOffsets
}
if (!toSeek.isEmpty) {
// this doesn't need a KAFKA-3370 workaround, because partitions are known, no poll needed
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
Expand Down Expand Up @@ -215,6 +292,95 @@ object ConsumerStrategies {
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}

/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](pattern, kafkaParams, offsets)
}

/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](
pattern,
kafkaParams,
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import java.util.*;
import java.util.regex.Pattern;

import scala.collection.JavaConverters;

Expand All @@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable {
@Test
public void testConsumerStrategyConstructors() {
final String topic1 = "topic1";
final Pattern pat = Pattern.compile("top.*");
final Collection<String> topics = Arrays.asList(topic1);
final scala.collection.Iterable<String> sTopics =
JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
Expand Down Expand Up @@ -69,6 +71,19 @@ public Object apply(Long x) {
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));

final ConsumerStrategy<String, String> psub1 =
ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> psub2 =
ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
final ConsumerStrategy<String, String> psub3 =
ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
final ConsumerStrategy<String, String> psub4 =
ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);

Assert.assertEquals(
psub1.executorKafkaParams().get("bootstrap.servers"),
psub3.executorKafkaParams().get("bootstrap.servers"));

final ConsumerStrategy<String, String> asn1 =
ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,17 @@ class DirectKafkaStreamSuite
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
val totalSent = data.values.sum * topics.size
val offsets = Map(new TopicPartition("basic3", 0) -> 2L)
// one topic is starting 2 messages later
val expectedTotal = (data.values.sum * topics.size) - 2
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")

ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala))
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala, offsets))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()

Expand Down Expand Up @@ -149,13 +151,78 @@ class DirectKafkaStreamSuite
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}

test("pattern based subscription") {
val topics = List("pat1", "pat2", "advanced3")
// Should match 2 out of 3 topics
val pat = """pat\d""".r.pattern
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
// 2 matching topics, one of which starts 3 messages later
val expectedTotal = (data.values.sum * 2) - 3
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")

ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.SubscribePattern[String, String](pat, kafkaParams.asScala, offsets))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()

// hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
val tf = stream.transform { rdd =>
// Get the offset ranges in the RDD
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(r => (r.key, r.value))
}

tf.foreachRDD { rdd =>
for (o <- offsetRanges) {
logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
// For each partition, get size of the range in the partition,
// and the number of items in the partition
val off = offsetRanges(i)
val all = iter.toSeq
val partSize = all.size
val rangeSize = off.untilOffset - off.fromOffset
Iterator((partSize, rangeSize))
}.collect

// Verify whether number of elements in each partition
// matches with the corresponding offset range
collected.foreach { case (partSize, rangeSize) =>
assert(partSize === rangeSize, "offset ranges are wrong")
}
}

stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}


test("receiving from largest starting offset") {
val topic = "latest"
val topicPartition = new TopicPartition(topic, 0)
Expand Down Expand Up @@ -228,6 +295,7 @@ class DirectKafkaStreamSuite
kc.close()

// Setup context and kafka stream with largest offset
kafkaParams.put("auto.offset.reset", "none")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
val s = new DirectKafkaInputDStream[String, String](
Expand Down