Skip to content

[SPARK-17346][SQL] Add Kafka source for Structured Streaming #15102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from
Closed

[SPARK-17346][SQL] Add Kafka source for Structured Streaming #15102

wants to merge 33 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Sep 14, 2016

What changes were proposed in this pull request?

This PR adds a new project external/kafka-0-10-sql for Structured Streaming Kafka source.

It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing

@tdas did most of work and part of them was inspired by @koeninger's work.

Introduction

The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.

Configuration

The user can use DataStreamReader.option to set the following configurations.

Kafka Source's options value default meaning
startingOffset ["earliest", "latest"] "latest" The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.
failOnDataLost [true, false] true Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
subscribe A comma-separated list of topics (none) The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
subscribePattern Java regex string (none) The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
kafka.consumer.poll.timeoutMs long 512 The timeout in milliseconds to poll data from Kafka in executors
fetchOffset.numRetries int 3 Number of times to retry before giving up fatch Kafka latest offsets.
fetchOffset.retryIntervalMs long 10 milliseconds to wait before retrying to fetch Kafka offsets

Kafka's own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port")

Usage

  • Subscribe to 1 topic
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic1")
  .load()
  • Subscribe to multiple topics
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic1,topic2")
  .load()
  • Subscribe to a pattern
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribePattern", "topic.*")
  .load()

How was this patch tested?

The new unit tests.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 14, 2016

/cc @marmbrus

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65401 has finished for PR 15102 at commit c4db857.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

Just from a very brief look, this duplicates egregious amounts of code from the existing Kafka submodule and doesn't handle offset ordering correctly when topicpartitions change.

I'd -1 on this, wholesale, if I had the right to.

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65404 has started for PR 15102 at commit 51d251f.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 14, 2016

@koeninger Thanks a lot for looking at this PR.

  • The duplicated codes are mostly because we don't want to depend on Spark DStream in structured streaming source. Anyway, we can create a common project for these codes if that's really a big deal.
  • This PR doesn't support topicpartitions changes for now. I'm not sure how to implement he strict ordering for Kafka offsets when topicpartitions changes on the fly. The strict ordering is important to StreamingExecution otherwise it may skip some data. Do you have any thoughts? I looked your PR but seems you have not yet resolved this problem?

@SparkQA
Copy link

SparkQA commented Sep 15, 2016

Test build #65405 has finished for PR 15102 at commit d3c0754.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

  • This already does depend on most of the existing Kafka DStream implementation. The fact that most of it was copied wholesale proves that. If you're just saying that you don't want it to have a transitive dependency on the spark-streaming-X module, I can refactor that. I would much rather take the time to do that work personally now, as opposed to the maintenance problems later.
  • Users are going to change topicpartitions whether you want them to or not, and this PR ostensibly supports SubscribePattern, so it must handle changing topicpartitions. Kafka does not have a global order (Kafka doesn't even have a per-partition order that's guaranteed contiguous, see SPARK-17147, but that's another can of worms). Given an offset from topicpartition A and an offset from topicpartition B, it is impossible to determine ordering without some other information. I brought some of these concerns up during the structured streaming design document, and it was handwaved as we'll just make structured offset an interface and try to figure it out later. I believe my PR does the least bad thing possible given this interface, in that it does allow changing partitions, does not violate hashcode, and does have a stable order (if we really want to deal with hashcode collisions, we can sort and concat all topicpartitions or something). If you see something else wrong there, by all means let me know. Yes, if someone publishes data to a topicpartition right before deleting it, the stream may not consume that data, but... really, what did they expect to happen in that case? We can talk about whether it might be possible to use time indexing in kafka as an alternative ordering, but what's there right now doesn't cut it.

My thoughts at this point:

  • It's clear from the jira this shouldn't get rushed into 2.0.1, let's do this as right as possible given the circumstances.
  • How can we collaborate on a shared branch? You guys manually copying stuff from my fork doesn't make any sense.
  • @marmbrus Can you give some specific technical direction as to how users can communicate the type for key and value, without having to map over the stream as is done in this PR?

@marmbrus
Copy link
Contributor

marmbrus commented Sep 16, 2016

This already does depend on most of the existing Kafka DStream implementation....

I pushed for this code to be copied rather than refactored because I think this is the right direction long term. While it is nice to minimize inter-project dependencies, that is not really the motivation. While the code is very similar now, there a bunch of things I'd like to start changing:

  • I don't think that all the classes need to be type parameterized. Our interface SQL has its own type system, analyser, and interface to the type system of the JVM (encoders). We should be using that. Operators in SQL do not type parameterize in general.
  • To optimize performance, there are several tricks we might want to play eventually (maybe prefetching data during execution, etc).

These are just ideas, but given that DStreams and Structured Streaming have significantly different models and user interfaces, I don't think that we want to tie ourselves to the same internals. If we identify utilities that are needed by both, then we should pull those out and share them.

Users are going to change topicpartitions whether you want them to or not...

I think we need to take a step back here and look at what is actually required by the Offset interface. We don't need to handle the general problem of is kafka Offset A from Topic 1 before or after kafka Offset B from Topic 2. We'll only ever look at full KafkaOffsets that represent slices in time across a couple of topicpartitions and were returned by the Source (possibly with other annotations the Source wants to add). The only questions that the the StreamExecution will ask today are:

  • Does x: KafkaOffset == y: KafkaOffset (i.e. is there new data since the last time I checked)?
  • Give me the data between KafkaOffset x and KafkaOffset y for all included topicpartitions.

The final version of this Source should almost certainly support wildcards with topicpartitions that change on the fly. Since it seems this is one of the harder problems to solve, as a strawman, I'd propose that we only support static lists of topics in this PR and possibly even static partitions. I want to get users to kick the tires on structured streaming in general and report whats missing so we can all prioritize our engineering effort.

It's clear from the jira this shouldn't get rushed into 2.0.1, let's do this as right as possible given the circumstances.

Agreed, if this doesn't make 2.0.1 thats fine with me.

How can we collaborate on a shared branch? You guys manually copying stuff from my fork doesn't make any sense.

I typically open PRs against the PR author's branch when I want to collaborate more directly.

Can you give some specific technical direction as to how users can communicate the type for key and value, without having to map over the stream as is done in this PR?

I'd like this to work the same as other calculations in SQL, using column expressions.

df.withColumn("value", $"value".cast("string"))

I'd also like to create expressions that work well with kafka specific deserializers, as well as integrations with our json parsing and maybe Avro too. The nice thing about this path is it fits with other SQL APIs and should play nicely with Dataset Encoders, UDFs, our expression library, etc.

Does that seem reasonable? Is this missing important cases?

Copy link
Contributor

@marmbrus marmbrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! I only did a quick overview to identify some structural issues that we should discuss. Feel free to push back if you think some of these changes don't belong in the MVP.

* Underlying consumer is not threadsafe, so neither is this,
* but processing the same topicpartition and group id in multiple threads is usually bad anyway.
*/
class CachedKafkaConsumer[K, V] private(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this in a larger comment, but we might want to consider removing these. I think that this source should probably always pass data to execution as bytes (maybe even copying them directly in tungsten rows eventually).

import org.apache.spark.SparkContext

/**
* A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "read" data

*
* - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read
* by this source. These strategies directly correspond to the different consumption options
* in . This class is designed to return a configured
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this sentance cuts off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still cuts off. I'd consider removing this abstraction completely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still cuts off. I'd consider removing this abstraction completely.

Fixed the format

}.toArray

// Create a RDD that reads from Kafka and get the (key, value) pair as byte arrays.
val rdd = new KafkaSourceRDD[Array[Byte], Array[Byte]](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be RDD[InternalRow] otherwise we are copying and doing a lot of extra conversion. It doesn't have to be this PR, but we might just want to do this right since I think I'll have a significant impact on performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about a difference in GenericRow vs GenericInternalRow, or are you saying this shouldn't be a map over an rdd at all? Sorry, I'm not anywhere near familiar enough with the sql internals.

@@ -0,0 +1,446 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this file up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved KafkaSourceOffset to a new file


val strategy = caseInsensitiveParams.find(x => strategyOptionNames.contains(x._1)).get match {
case ("subscribe", value) =>
val topics = value.split(",").map(_.trim).filter(_.nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer is "no", but can topics have ,s in them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not legal to create topics with a comma, verified in code, tested with kafka 0.8 and 0.10

SubscribeStrategy[Array[Byte], Array[Byte]](
value.split(",").map(_.trim()).filter(_.nonEmpty),
kafkaParamsForStrategy)
case ("subscribepattern", value) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure we are correct here (in the face of topics appearing and disappearing if we are going to include this in the MVP.

if (specifiedStrategies.isEmpty) {
throw new IllegalArgumentException(
"One of the following options must be specified for Kafka source: "
+ strategyOptionNames.mkString(", ") + ". See docs for more details.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any docs? :)

StructField("value", BinaryType)
))

sealed trait ConsumerStrategy[K, V] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are theses all inner classes? Also, do we need these interfaces?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the kind of thing that really weirds me out about copying and pasting code from the initial implementation - it's not at all clear why some of these changes were made. It's clear that the changes aren't necessary for minimal functionality, because I didn't make them in my fork.

I very firmly believe there needs to be a user-accessible interface for arbitrary configuration of a Kafka Consumer. The kafka project's interface for setting up consumers isn't really 100% baked yet, and we need to give people a backdoor for getting their jobs done / working around issues with the underlying consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree that we should expose half-baked external API's in the MVP. Specifically, it sounds like doing so would allow users to setup configurations that would violate the correctness guarantees. This is not to say that we should never expose these, but instead that I would value stability, simplicity and correctness over extra functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying Consumer api is half baked (well, to be fair, more like 85% baked). We already had this exact same argument, it's already exposed in the DStream implementation. If you try to encapsulate it, you're pretty much ensuring that the spark implementation has to change when kafka changes. If you simply expose what kafka provides, the spark implementation isn't going to have to change much, if at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DStreams != DataFrames and we are not getting rid of DStreams. This is by design. The proposal here is to only accept only static lists of topics and to support adding partitions. It doesn't sound like any of that will have to change down the road, correct?

@koeninger
Copy link
Contributor

I pushed for this code to be copied rather than refactored because I think this is the right direction long term. While it is nice to minimize inter-project dependencies, that is not really the motivation. While the code is very similar now, there a bunch of things I'd like to start changing:

A bigger concern for me at this point is that the code was copied, and then modified in ways that don't seem to have anything to do with the necessities of structured streaming (e.g. your "why is this a nested class" comment).

Options from my point of view, from best to worst

  1. Refactor to a common submodule. Based on how little I had to change to get the common functionality in my branch, I think this is going to initially leave most things untouched. If things change in the future, they can be refactored / copied as necessary. I think this minimizes the chance that someone fixes a bug in Dstream cached consumer, and forgets to fix in sql cached consumer, or vice versa.
  2. Copy without changes, make only minimal changes necessary at first. This is going to make what happened more obvious, and make it easier to maintain changes across both pieces of code
  3. Copy and make unnecessary changes (what seems to have been done currently). This seems like a maintenance nightmare for no gain.

I don't think that all the classes need to be type parameterized. Our interface SQL has its own type system, analyser, and interface to the type system of the JVM (encoders). We should be using that. Operators in SQL do not type parameterize in general.
To optimize performance, there are several tricks we might want to play eventually (maybe prefetching data during execution, etc).

Kafka consumers prefetch data already, that's the main reason the CachedKafkaConsumer exists. My thought here is that there isn't much gain to be had with something more than a thin shim around a Kafka rdd, or at least not for a while. Kafka's data model doesn't really allow for much in terms of pushdown optimizations (you basically get to query by offset, or maybe time). About the only idea I've heard that might have promise was Reynold suggesting scheduling straight map jobs as long-running kafka consumers in a poll loop on the executors, to avoid batching latency. But that seems to open a whole can of worms in terms of deterministic behavior, and is probably much further down the road. If we get there, what's the harm in cutting shared dependencies at that point rather than now?

These are just ideas, but given that DStreams and Structured Streaming have significantly different models and user interfaces, I don't think that we want to tie ourselves to the same internals. If we identify utilities that are needed by both, then we should pull those out and share them.

At this point, the shared need is basically everything except KafkaUtils' static constructors, and the parts of the DirectStream related to the DStream interface. You still need an rdd, a cache for consumers, offset ranges, a way to configure consumers, a way to configure locality, a consumer running on the driver to get latest offsets...

We don't need to handle the general problem of is kafka Offset A from Topic 1 before or after kafka Offset B from Topic 2.

Does x: KafkaOffset == y: KafkaOffset (i.e. is there new data since the last time I checked)?

We do need to handle it comparing completely different topicpartitions, because it's entirely possible to have a job with a single topicpartition A, which is deleted or unsubscribed, and then single topicpartition B is added, in the space of one batch. I have talked to companies that are actually doing this kind of thing. If all we need to do is be able to tell that one sql offset (that we already knew about) is different from another sql offset (that we just learned about), then I think it's pretty straightforward - your three cases are

  • error. ahead in some common topicpartitions, and behind in others.
  • equal. same kafka offsets for same topicpartitions
  • not equal. different offsets for same topicpartitions, and/or different topicpartitions

That does imply that any ordering of sql Offsets is by when we learn about them in processing time, which sounds suspect, but...

The final version of this Source should almost certainly support wildcards with topicpartitions that change on the fly. Since it seems this is one of the harder problems to solve, as a strawman, I'd propose that we only support static lists of topics in this PR and possibly even static partitions. I want to get users to kick the tires on structured streaming in general and report whats missing so we can all prioritize our engineering effort.

The problem I see with that approach is that doing it right may require changing the structured streaming interface, which gets progressively harder to do the more users and more implementations there are relying on it. I think it's generally best to learn about your items of highest risk first (even if you don't totally solve them in version 1.0).

I typically open PRs against the PR author's branch when I want to collaborate more directly.

Sorry, I meant this less in terms of github mechanics and more in terms of process - are you going to consider pull requests against this branch, what pieces make sense to split off, etc. For a specific initial question, is your mind made up about the submodule thing, or if I do that refactor will you at least look at it?

df.withColumn("value", $"value".cast("string"))
I'd also like to create expressions that work well with kafka specific deserializers, as well as integrations with our json parsing and maybe Avro too. The nice thing about this path is it fits with other SQL APIs and should play nicely with Dataset Encoders, UDFs, our expression library, etc.

Does that seem reasonable? Is this missing important cases?

Yeah, that seems reasonable. Yeah, the builtin types, json, and maybe avro are the ones to hit. I'd be 100% on board if we had one working example of a specific deserializer, that didn't require access to private spark classes, so people could tell how to do it for whatever serialization scheme they are using.

My guess is someone's going to want an implicit api to let them just call .stringKeyValue or whatever on the dataframe because all they care about is those two columns, but that's really straightforward for people to do themselves.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 19, 2016

We do need to handle it comparing completely different topicpartitions, because it's entirely possible to have a job with a single topicpartition A, which is deleted or unsubscribed, and then single topicpartition B is added, in the space of one batch.

Yeah, it's also possible that when a batch is running, a single topicpartition C is created then deleted. But the Kafka source doesn't even know C, and All data of C will be lost. The root issue is currently the approach is polling metadata, and any metadata changes between two pollings are missing.

@koeninger
Copy link
Contributor

I'm not concerned about people deleting partitions before messages have
been processed, because they can take care of that problem themselves, by
not deleting things until consuming has caught up. This is fairly intuitive
IMHO.

I am concerned about shipping an implementation that can't handle partition
changes at all, because that's a big step backwards.

On Sep 18, 2016 11:41 PM, "Shixiong Zhu" notifications@github.com wrote:

We do need to handle it comparing completely different topicpartitions,
because it's entirely possible to have a job with a single topicpartition
A, which is deleted or unsubscribed, and then single topicpartition B is
added, in the space of one batch.

Yeah, it's also possible that when a batch is running, a single
topicpartition C is created then deleted. But the Kafka source doesn't even
know C. The root issue is currently the approach is polling metadata, and
any metadata changes between two pollings are missing.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15102 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB4dbMY6oP4EYl062Dd3chUtOk520ks5qrhKQgaJpZM4J9QvR
.

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65636 has finished for PR 15102 at commit f5c57f5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65640 has finished for PR 15102 at commit b64d104.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

You should not be assuming 0 for a starting offset for partitions you've just learned about. You should be asking the underlying driver consumer what its position is. This is yet another argument in favor of keeping the implementations the same until they have to be different, because the existing implementation at least tries to do this correctly.

My bigger concern is that it looks like you guys are continuing to hack in a particular direction, without addressing my points or answering whether you're willing to let me help work on this.

Have you made up your mind?

@marmbrus
Copy link
Contributor

My bigger concern is that it looks like you guys are continuing to hack in a particular direction, without addressing my points or answering whether you're willing to let me help work on this.
Have you made up your mind?

Cody, I think we have been addressing your points, though I know we are not done yet. It would be helpful if you could make specific comments on the code, preferably with pointers to what you think the correct implementation would look like. Otherwise its hard to track which points you think have been resolved and which are still in question.

I appreciate that you are concerned that some of this code is duplicated, but I'm going to have to respectfully disagree on that point. I think this is the right choice both for the stability of the DStream implementation and our ability to optimize the SQL implementation.

You should not be assuming 0 for a starting offset for partitions you've just learned about. You should be asking the underlying driver consumer what its position is.

I'll let Ryan comment further here, but I'm not sure if this is correct. It sounds like if we rely on Kafka to manage the its position there will be cases where partial failure could result it data loss. In general, I think we need to be careful about relying on Kafka internals when our end goal is to provide a much higher level abstraction.

@koeninger
Copy link
Contributor

Absent a direct answer, I'm going to read that as "Yes I've made up my mind, no I will not consider PRs to the contrary."

If you want pointers to what I think the correct implementation would look like, I'd say look at the existing DStream and my prototype structured streaming fork. With the main difference that, as I said above, I don't think the current sql Offset interface makes sense if all you need is to measure equality | not equal | error.

My main concern is making sure end users can do their jobs using Kafka. If your main concern is a higher level abstraction that may or may not actually work well with Kafka, I wish you luck in that endeavor, but I don't think it makes sense for me to spend any more time arguing with you about it.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 20, 2016

You should not be assuming 0 for a starting offset for partitions you've just learned about. You should be asking the underlying driver consumer what its position is.

Yes, there are two approaches for getting new partitions' offsets:

  1. From a user perspective, if I set “auto.offset.reset” to latest, I would like to process new data after the query starts. So we should start from the earliest offset for a new partition.
  2. Follow auto.offset.reset as what you did in your fork.

However, option 2 makes the query result indeterminate when auto.offset.reset is latest, depends on how the query runs. E.g., if you add new partitions and push new data very quickly, the new data may be lost; if you add new partitions and push new data when the app is being recovered, the new data may be lost as well.

@koeninger
Copy link
Contributor

My fork is not following auto.offset.reset, it's following what the
(potentially user-provided) consumer does when it sees a new partition.
Maybe that's auto.offset.reset, maybe it's something else.

Either way, who are you to presume that a user doesn't know what she is
doing when she configured a consumer to start at a particular position for
an added partition? You have no guarantee that offset 0L even exists at
that time.

On Tue, Sep 20, 2016 at 2:52 PM, Shixiong Zhu notifications@github.com
wrote:

You should not be assuming 0 for a starting offset for partitions you've
just learned about. You should be asking the underlying driver consumer
what its position is.

Yes, there are two approaches for getting new partitions' offsets:

  1. From a user perspective, if I set “auto.offset.reset” to latest, I
    would like to process new data after the query starts. So we should start
    from the earliest offset for a new partition.
  2. Follow auto.offset.reset as what you did in your fork.

However, option 2 makes the query result indeterminate when
auto.offset.reset is latest, depends on how the query runs. E.g., if you
add new partitions and push new data very quickly, the new data may be
lost; if you add new partitions and push new data when the app is being
recovered, the new data may be lost as well.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15102 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB1U_gCyEfg0GHzuymzWp3wtZ4zNoks5qsDmYgaJpZM4J9QvR
.

@marmbrus
Copy link
Contributor

Either way, who are you to presume that a user doesn't know what she is
doing when she configured a consumer to start at a particular position for
an added partition?

I feel like we are talking past each other. We are not giving the developer the option to manually configure a consumer in this way for this PR precisely because I don't think we can while still maintaining the semantics that structured streaming has promised. This is consciously a higher level API than is provided by DStreams. Neither of these is right or wrong. I do want to be able to support as many real use cases as possible in the Structured Streaming API, but not at the expense of correctness for the queries that we support. If there are low level use cases that we can't support, which require users to use DStreams instead, that is okay, as it is okay that some users still program against RDDs.

… of type parameters; Update stress tests to test adding partitions
@koeninger
Copy link
Contributor

koeninger commented Sep 20, 2016

We are not giving the developer the option to manually configure a consumer in this way for this PR precisely because I don't think we can while still maintaining the semantics that structured streaming has promised.

You've got this backwards. As soon as you give someone the ability to set auto.offset.reset to largest, it opens up the can of worms as to whether resets should happen at the beginning of a stream, during a stream, and/or when a partition is added. Giving people the ability to configure a consumer doesn't cause that problem, it allows them to solve that problem until such time as the Kafka project has a unified way to solve it. Similarly, as soon as you allow pattern subscriptions, it opens up the can of worms as to adding/removing topics and whether the sql Offset interface as is makes sense for Kafka. Just saying you aren't going to handle deletions for right now doesn't solve that problem.

If you really don't want to consider changing the Offset interface, and want to tell people who need the details of Kafka in order to get their work done to use the DStream, then you should probably eliminate all configuration options except brokers, a fixed list of topicpartitions, and maybe SSL.

I'll try one more time, and then I'm done:

  • Months ago you came up with an interface that realistically will only work with Kafka / Kinesis / lookalikes, yet had no implementation for any of those.
  • Actually attempting an implementation raised some notable differences between what the interface allowed for and what the implementation needed.
  • I offered some specific suggestions, including considering changes to the interface
  • I offered to help with implementation

Your response, from my point of view, has been

  • Decline to consider changes to the interface
  • Decline any assistance with actual implementation
  • Only (re)implement a subset of Kafka functionality that you can see is "safe", regardless of whether it's congruent with the way Kafka is already being used by users.

Under those circumstances, I'm happy to answer specific directed questions you may have, but I'm not interested in continuing to argue. If you guys say you've got this and you're going to do it your way, then you've got it.

Let me know if you change your mind, I'll still be around.

@SparkQA
Copy link

SparkQA commented Oct 4, 2016

Test build #66285 has finished for PR 15102 at commit ccadd81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 4, 2016

Test build #66288 has finished for PR 15102 at commit 7ff1059.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 4, 2016

Test build #66289 has finished for PR 15102 at commit a6c4970.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 4, 2016

Test build #3294 has finished for PR 15102 at commit a6c4970.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@marmbrus marmbrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for all the work on this. Other than one question about parameter naming this LGTM.


Structured Streaming integration for Kafka 0.10 to poll data from Kafka. It provides simple parallelism,
1:1 correspondence between Kafka partitions and Spark partitions. The source will cache the Kafka
consumer in executors and try the best to schedule the same Kafka topic partition to the same executor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider dropping all of these implementation details from the docs. I think its possible we will change them.

uery is started, and that resuming will always pick up from where the query left off.</td>
</tr>
<tr>
<td>failOnCorruptMetadata</td>
Copy link
Contributor

@marmbrus marmbrus Oct 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the metadata really corrupt? These actually seem like valid conditions (even if they affect our ability to provide exactly-once guarantees). What about failOnDataLoss?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the metadata really corrupt? These actually seem like valid conditions (even if they affect our ability to provide exactly-once guarantees). What about failOnDataLoss?

There may be no data lost as well. Hard to find a reasonable name :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, can you enumerate the cases where setting this to true could cause the query to fail. I understand:

  • falling so far behind that data gets aged out.
  • deleting a topic before the stream is able to catch up.

What else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 places calling reportCorruptMetadata:

  • We find some new partitions, then try to connect to query their earliest offsets but these partitions are gone
    • If these partitions are empty, there is no data lost.
  • Find new partitions but their earliest offsets are not 0.
  • Deleting a topic and create the some topic. This makes the new offset less that the old one.
    • There may be no data lost if we have already processed all data in the old topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These all sound like possible data loss (or missing data) to me. We could clarify in the description that its only possible. "Corrupt" sounds like its there but inconsistent, a checksum is failing, or something like that. If others disagree though, this is minor. We should just be super clear about the conditions in the description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to failOnDataLoss

|Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported.
|Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to
|specify where to start. Structured Streaming manages which offsets are consumed
|internally, rather than rely on the kafka Consumer to do it. This will ensure that no
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "relying"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "relying"

Fixed

* @param ds a dataframe that executes + 1 on a stream of integers, returning the result.
* @param addData and add data action that adds the given numbers to the stream, encoding them
* @param ds a dataframe that executes + 1 on a stream of integers, returning the result
* @param preparedActions actions need to run before starting the stress test.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: its kind of odd for this to be past tense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: its kind of odd for this to be past tense.

fixed

@@ -530,3 +530,8 @@ object StreamExecution {

def nextId: Long = _nextId.getAndIncrement()
}

/**
* A special thread to run the stream query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might comment that we actually check for this class elsewhere. This part is not super clear to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might comment that we actually check for this class elsewhere. This part is not super clear to me.

Fixed


Right now, the Kafka source has the following Spark's specific options.

<table class="table">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two tables, one that must be specified (subscribe/subscribePattern, and kafka.bootstrap...) and then optional params.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two tables, one that must be specified (subscribe/subscribePattern, and kafka.bootstrap...) and then optional params.

done

options can be specified for Kafka source.</td>
</tr>
<tr>
<td>kafka.consumer.poll.timeoutMs</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a spark specific one. this can be in the must-be-specified table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a spark specific one. this can be in the must-be-specified table.

This is actually not a Kafka config. This is only used in KafkaSource

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why is it named as kafka.something. Its confusing to have all the kafka params be named as kafka.* and this source option also having kafka. format.
Also if you dont expect there to be more params in the subscope of poll, then why make that scope?
How about kafkaConsumer.pollTimeoutMs?

<td>Number of times to retry before giving up fatch Kafka latest offsets.</td>
</tr>
<tr>
<td>fetchOffset.retry.intervalMs</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why another retry scope? Why not fetchOffset.retryIntervalMs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why another retry scope? Why not fetchOffset.retryIntervalMs?

Renamed

</tr>
<tr>
<td>kafka.consumer.poll.timeoutMs</td>
<td>long</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this is int

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this is int

Since this is a milliseconds, I try to make the type consistent with other configurations in Spark.

</tr>
<tr>
<td>fetchOffset.retry.intervalMs</td>
<td>long</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this as int.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this as int.

Since this is a milliseconds, I try to make the type consistent with other configurations in Spark.

@@ -1126,6 +1126,12 @@ aggDF \
</div>
</div>

## Advanced Sources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Advanced? How about Other Sources?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well there it was explicitly categories as inbuilt sources and advanced sources before this section. Here that is not there. So the term "advanced" does not make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Best to move it to the same location where the File and socket sources are discussed. Insert a subheading called Input Sources to mirror the subheading Output Sinks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas updated. I still left the hard code Spark version there. It's better to make another PR to clean up all these hard-code versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool.


private val sc = sqlContext.sparkContext

private val pollTimeoutMs = sourceOptions.getOrElse("kafka.consumer.poll.timeoutMs", "512").toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this option named as "kafka.consumer.poll.timeoutMs". This makes it look like that it is a kafka param. Which it is not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this option named as "kafka.consumer.poll.timeoutMs". This makes it look like that it is a kafka param. Which it is not.

How about executor.poll.timeoutMs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to kafkaConsumer.pollTimeoutMs

|Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported.
|Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to
|specify where to start. Structured Streaming manages which offsets are consumed
|internally, rather than relying on the kafka Consumer to do it. This will ensure that no
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: kafka Consumer --> KafkaConsumer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Oct 5, 2016

Test build #66338 has finished for PR 15102 at commit d50a05e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 5, 2016

Test build #66345 has finished for PR 15102 at commit 4316906.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. There are a couple of small issues, and quite a few nits.

}

val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
logWarning(s"$deletedPartitions are gone. Some data may have been missed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt this be reportDataLoss instead of logWarning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* Fetch the earliest offsets for newly discovered partitions. The return results may not contain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return results --> result

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Lazy set initialPartitionOffsets to make sure only call `KafkaConsumer.poll` in
* StreamExecutionThread. Otherwise, interrupting a thread running `KafkaConsumer.poll` may hang
* forever (KAFKA-1894).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

  • Lazily initialize initialPartitionOffsets to make sure that KafkaConsumer.poll is only called in StreamExecutionThread.
  • interrupting a thread while running

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = {
// Make sure initialPartitionOffsets is set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is set --> is initialized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

testUnsupportedConfig("kafka.key.deserializer")
testUnsupportedConfig("kafka.value.deserializer")

// only earliest and latest is supported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be removed. this is outdated comment after we removed support for auto.offset.reset

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

</tr>
</table>

The following options should be set for the Kafka source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should --> must be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

</tr>
</table>

The rest configurations are optional:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the following configurations ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

topics/partitions are dynamically subscribed. Note that `startingOffset` only applies when a new
Streaming query is started, and that resuming will always pick up from where the query left off.
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
Dataframe operations to explicitly deserialize the keys.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Dataframe -> DataFrame
multiple places have this issue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@SparkQA
Copy link

SparkQA commented Oct 5, 2016

Test build #66396 has finished for PR 15102 at commit d9d848c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 5, 2016

Test build #66397 has finished for PR 15102 at commit 7d658f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 5, 2016

Test build #66398 has finished for PR 15102 at commit 4754125.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 9293734 Oct 5, 2016
@tdas
Copy link
Contributor

tdas commented Oct 5, 2016

Merged this to master, @zsxwing do you have another PR for 2.0.

@zsxwing zsxwing deleted the kafka-source branch October 6, 2016 00:05
@zsxwing
Copy link
Member Author

zsxwing commented Oct 6, 2016

Merged this to master, @zsxwing do you have another PR for 2.0.

See #15367

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source.

It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing

tdas did most of work and part of them was inspired by koeninger's work.

### Introduction

The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:

Column | Type
---- | ----
key | binary
value | binary
topic | string
partition | int
offset | long
timestamp | long
timestampType | int

The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.

### Configuration

The user can use `DataStreamReader.option` to set the following configurations.

Kafka Source's options | value | default | meaning
------ | ------- | ------ | -----
startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.
failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors
fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets.
fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets

Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")`

### Usage

* Subscribe to 1 topic
```Scala
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic1")
  .load()
```

* Subscribe to multiple topics
```Scala
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic1,topic2")
  .load()
```

* Subscribe to a pattern
```Scala
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribePattern", "topic.*")
  .load()
```

## How was this patch tested?

The new unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: cody koeninger <cody@koeninger.org>

Closes apache#15102 from zsxwing/kafka-source.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants