-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17346][SQL] Add Kafka source for Structured Streaming #15102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
/cc @marmbrus |
Test build #65401 has finished for PR 15102 at commit
|
Just from a very brief look, this duplicates egregious amounts of code from the existing Kafka submodule and doesn't handle offset ordering correctly when topicpartitions change. I'd -1 on this, wholesale, if I had the right to. |
Test build #65404 has started for PR 15102 at commit |
@koeninger Thanks a lot for looking at this PR.
|
Test build #65405 has finished for PR 15102 at commit
|
My thoughts at this point:
|
I pushed for this code to be copied rather than refactored because I think this is the right direction long term. While it is nice to minimize inter-project dependencies, that is not really the motivation. While the code is very similar now, there a bunch of things I'd like to start changing:
These are just ideas, but given that DStreams and Structured Streaming have significantly different models and user interfaces, I don't think that we want to tie ourselves to the same internals. If we identify utilities that are needed by both, then we should pull those out and share them.
I think we need to take a step back here and look at what is actually required by the
The final version of this
Agreed, if this doesn't make 2.0.1 thats fine with me.
I typically open PRs against the PR author's branch when I want to collaborate more directly.
I'd like this to work the same as other calculations in SQL, using column expressions. df.withColumn("value", $"value".cast("string")) I'd also like to create expressions that work well with kafka specific deserializers, as well as integrations with our json parsing and maybe Avro too. The nice thing about this path is it fits with other SQL APIs and should play nicely with Dataset Does that seem reasonable? Is this missing important cases? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! I only did a quick overview to identify some structural issues that we should discuss. Feel free to push back if you think some of these changes don't belong in the MVP.
* Underlying consumer is not threadsafe, so neither is this, | ||
* but processing the same topicpartition and group id in multiple threads is usually bad anyway. | ||
*/ | ||
class CachedKafkaConsumer[K, V] private( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this in a larger comment, but we might want to consider removing these. I think that this source should probably always pass data to execution as bytes (maybe even copying them directly in tungsten rows eventually).
import org.apache.spark.SparkContext | ||
|
||
/** | ||
* A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "read" data
* | ||
* - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read | ||
* by this source. These strategies directly correspond to the different consumption options | ||
* in . This class is designed to return a configured |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this sentance cuts off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still cuts off. I'd consider removing this abstraction completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still cuts off. I'd consider removing this abstraction completely.
Fixed the format
}.toArray | ||
|
||
// Create a RDD that reads from Kafka and get the (key, value) pair as byte arrays. | ||
val rdd = new KafkaSourceRDD[Array[Byte], Array[Byte]]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be RDD[InternalRow]
otherwise we are copying and doing a lot of extra conversion. It doesn't have to be this PR, but we might just want to do this right since I think I'll have a significant impact on performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about a difference in GenericRow vs GenericInternalRow, or are you saying this shouldn't be a map over an rdd at all? Sorry, I'm not anywhere near familiar enough with the sql internals.
@@ -0,0 +1,446 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split this file up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved KafkaSourceOffset to a new file
|
||
val strategy = caseInsensitiveParams.find(x => strategyOptionNames.contains(x._1)).get match { | ||
case ("subscribe", value) => | ||
val topics = value.split(",").map(_.trim).filter(_.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the answer is "no", but can topics have ,
s in them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not legal to create topics with a comma, verified in code, tested with kafka 0.8 and 0.10
SubscribeStrategy[Array[Byte], Array[Byte]]( | ||
value.split(",").map(_.trim()).filter(_.nonEmpty), | ||
kafkaParamsForStrategy) | ||
case ("subscribepattern", value) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make sure we are correct here (in the face of topics appearing and disappearing if we are going to include this in the MVP.
if (specifiedStrategies.isEmpty) { | ||
throw new IllegalArgumentException( | ||
"One of the following options must be specified for Kafka source: " | ||
+ strategyOptionNames.mkString(", ") + ". See docs for more details.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any docs? :)
StructField("value", BinaryType) | ||
)) | ||
|
||
sealed trait ConsumerStrategy[K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are theses all inner classes? Also, do we need these interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the kind of thing that really weirds me out about copying and pasting code from the initial implementation - it's not at all clear why some of these changes were made. It's clear that the changes aren't necessary for minimal functionality, because I didn't make them in my fork.
I very firmly believe there needs to be a user-accessible interface for arbitrary configuration of a Kafka Consumer. The kafka project's interface for setting up consumers isn't really 100% baked yet, and we need to give people a backdoor for getting their jobs done / working around issues with the underlying consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree that we should expose half-baked external API's in the MVP. Specifically, it sounds like doing so would allow users to setup configurations that would violate the correctness guarantees. This is not to say that we should never expose these, but instead that I would value stability, simplicity and correctness over extra functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The underlying Consumer api is half baked (well, to be fair, more like 85% baked). We already had this exact same argument, it's already exposed in the DStream implementation. If you try to encapsulate it, you're pretty much ensuring that the spark implementation has to change when kafka changes. If you simply expose what kafka provides, the spark implementation isn't going to have to change much, if at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DStreams != DataFrames and we are not getting rid of DStreams. This is by design. The proposal here is to only accept only static lists of topics and to support adding partitions. It doesn't sound like any of that will have to change down the road, correct?
A bigger concern for me at this point is that the code was copied, and then modified in ways that don't seem to have anything to do with the necessities of structured streaming (e.g. your "why is this a nested class" comment). Options from my point of view, from best to worst
Kafka consumers prefetch data already, that's the main reason the CachedKafkaConsumer exists. My thought here is that there isn't much gain to be had with something more than a thin shim around a Kafka rdd, or at least not for a while. Kafka's data model doesn't really allow for much in terms of pushdown optimizations (you basically get to query by offset, or maybe time). About the only idea I've heard that might have promise was Reynold suggesting scheduling straight map jobs as long-running kafka consumers in a poll loop on the executors, to avoid batching latency. But that seems to open a whole can of worms in terms of deterministic behavior, and is probably much further down the road. If we get there, what's the harm in cutting shared dependencies at that point rather than now?
At this point, the shared need is basically everything except KafkaUtils' static constructors, and the parts of the DirectStream related to the DStream interface. You still need an rdd, a cache for consumers, offset ranges, a way to configure consumers, a way to configure locality, a consumer running on the driver to get latest offsets...
We do need to handle it comparing completely different topicpartitions, because it's entirely possible to have a job with a single topicpartition A, which is deleted or unsubscribed, and then single topicpartition B is added, in the space of one batch. I have talked to companies that are actually doing this kind of thing. If all we need to do is be able to tell that one sql offset (that we already knew about) is different from another sql offset (that we just learned about), then I think it's pretty straightforward - your three cases are
That does imply that any ordering of sql Offsets is by when we learn about them in processing time, which sounds suspect, but...
The problem I see with that approach is that doing it right may require changing the structured streaming interface, which gets progressively harder to do the more users and more implementations there are relying on it. I think it's generally best to learn about your items of highest risk first (even if you don't totally solve them in version 1.0).
Sorry, I meant this less in terms of github mechanics and more in terms of process - are you going to consider pull requests against this branch, what pieces make sense to split off, etc. For a specific initial question, is your mind made up about the submodule thing, or if I do that refactor will you at least look at it?
Yeah, that seems reasonable. Yeah, the builtin types, json, and maybe avro are the ones to hit. I'd be 100% on board if we had one working example of a specific deserializer, that didn't require access to private spark classes, so people could tell how to do it for whatever serialization scheme they are using. My guess is someone's going to want an implicit api to let them just call .stringKeyValue or whatever on the dataframe because all they care about is those two columns, but that's really straightforward for people to do themselves. |
Yeah, it's also possible that when a batch is running, a single topicpartition C is created then deleted. But the Kafka source doesn't even know C, and All data of C will be lost. The root issue is currently the approach is polling metadata, and any metadata changes between two pollings are missing. |
I'm not concerned about people deleting partitions before messages have I am concerned about shipping an implementation that can't handle partition On Sep 18, 2016 11:41 PM, "Shixiong Zhu" notifications@github.com wrote:
|
Test build #65636 has finished for PR 15102 at commit
|
Test build #65640 has finished for PR 15102 at commit
|
You should not be assuming 0 for a starting offset for partitions you've just learned about. You should be asking the underlying driver consumer what its position is. This is yet another argument in favor of keeping the implementations the same until they have to be different, because the existing implementation at least tries to do this correctly. My bigger concern is that it looks like you guys are continuing to hack in a particular direction, without addressing my points or answering whether you're willing to let me help work on this. Have you made up your mind? |
Cody, I think we have been addressing your points, though I know we are not done yet. It would be helpful if you could make specific comments on the code, preferably with pointers to what you think the correct implementation would look like. Otherwise its hard to track which points you think have been resolved and which are still in question. I appreciate that you are concerned that some of this code is duplicated, but I'm going to have to respectfully disagree on that point. I think this is the right choice both for the stability of the DStream implementation and our ability to optimize the SQL implementation.
I'll let Ryan comment further here, but I'm not sure if this is correct. It sounds like if we rely on Kafka to manage the its position there will be cases where partial failure could result it data loss. In general, I think we need to be careful about relying on Kafka internals when our end goal is to provide a much higher level abstraction. |
Absent a direct answer, I'm going to read that as "Yes I've made up my mind, no I will not consider PRs to the contrary." If you want pointers to what I think the correct implementation would look like, I'd say look at the existing DStream and my prototype structured streaming fork. With the main difference that, as I said above, I don't think the current sql Offset interface makes sense if all you need is to measure equality | not equal | error. My main concern is making sure end users can do their jobs using Kafka. If your main concern is a higher level abstraction that may or may not actually work well with Kafka, I wish you luck in that endeavor, but I don't think it makes sense for me to spend any more time arguing with you about it. |
Yes, there are two approaches for getting new partitions' offsets:
However, option 2 makes the query result indeterminate when |
My fork is not following auto.offset.reset, it's following what the Either way, who are you to presume that a user doesn't know what she is On Tue, Sep 20, 2016 at 2:52 PM, Shixiong Zhu notifications@github.com
|
I feel like we are talking past each other. We are not giving the developer the option to manually configure a consumer in this way for this PR precisely because I don't think we can while still maintaining the semantics that structured streaming has promised. This is consciously a higher level API than is provided by DStreams. Neither of these is right or wrong. I do want to be able to support as many real use cases as possible in the Structured Streaming API, but not at the expense of correctness for the queries that we support. If there are low level use cases that we can't support, which require users to use DStreams instead, that is okay, as it is okay that some users still program against RDDs. |
… of type parameters; Update stress tests to test adding partitions
You've got this backwards. As soon as you give someone the ability to set auto.offset.reset to largest, it opens up the can of worms as to whether resets should happen at the beginning of a stream, during a stream, and/or when a partition is added. Giving people the ability to configure a consumer doesn't cause that problem, it allows them to solve that problem until such time as the Kafka project has a unified way to solve it. Similarly, as soon as you allow pattern subscriptions, it opens up the can of worms as to adding/removing topics and whether the sql Offset interface as is makes sense for Kafka. Just saying you aren't going to handle deletions for right now doesn't solve that problem. If you really don't want to consider changing the Offset interface, and want to tell people who need the details of Kafka in order to get their work done to use the DStream, then you should probably eliminate all configuration options except brokers, a fixed list of topicpartitions, and maybe SSL. I'll try one more time, and then I'm done:
Your response, from my point of view, has been
Under those circumstances, I'm happy to answer specific directed questions you may have, but I'm not interested in continuing to argue. If you guys say you've got this and you're going to do it your way, then you've got it. Let me know if you change your mind, I'll still be around. |
Test build #66285 has finished for PR 15102 at commit
|
Test build #66288 has finished for PR 15102 at commit
|
Test build #66289 has finished for PR 15102 at commit
|
Test build #3294 has finished for PR 15102 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for all the work on this. Other than one question about parameter naming this LGTM.
|
||
Structured Streaming integration for Kafka 0.10 to poll data from Kafka. It provides simple parallelism, | ||
1:1 correspondence between Kafka partitions and Spark partitions. The source will cache the Kafka | ||
consumer in executors and try the best to schedule the same Kafka topic partition to the same executor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider dropping all of these implementation details from the docs. I think its possible we will change them.
uery is started, and that resuming will always pick up from where the query left off.</td> | ||
</tr> | ||
<tr> | ||
<td>failOnCorruptMetadata</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the metadata really corrupt? These actually seem like valid conditions (even if they affect our ability to provide exactly-once guarantees). What about failOnDataLoss
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the metadata really corrupt? These actually seem like valid conditions (even if they affect our ability to provide exactly-once guarantees). What about failOnDataLoss?
There may be no data lost as well. Hard to find a reasonable name :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, can you enumerate the cases where setting this to true could cause the query to fail. I understand:
- falling so far behind that data gets aged out.
- deleting a topic before the stream is able to catch up.
What else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 3 places calling reportCorruptMetadata
:
- We find some new partitions, then try to connect to query their earliest offsets but these partitions are gone
- If these partitions are empty, there is no data lost.
- Find new partitions but their earliest offsets are not 0.
- Deleting a topic and create the some topic. This makes the new offset less that the old one.
- There may be no data lost if we have already processed all data in the old topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These all sound like possible data loss (or missing data) to me. We could clarify in the description that its only possible. "Corrupt" sounds like its there but inconsistent, a checksum is failing, or something like that. If others disagree though, this is minor. We should just be super clear about the conditions in the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to failOnDataLoss
|Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported. | ||
|Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to | ||
|specify where to start. Structured Streaming manages which offsets are consumed | ||
|internally, rather than rely on the kafka Consumer to do it. This will ensure that no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "relying"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "relying"
Fixed
* @param ds a dataframe that executes + 1 on a stream of integers, returning the result. | ||
* @param addData and add data action that adds the given numbers to the stream, encoding them | ||
* @param ds a dataframe that executes + 1 on a stream of integers, returning the result | ||
* @param preparedActions actions need to run before starting the stress test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: its kind of odd for this to be past tense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: its kind of odd for this to be past tense.
fixed
@@ -530,3 +530,8 @@ object StreamExecution { | |||
|
|||
def nextId: Long = _nextId.getAndIncrement() | |||
} | |||
|
|||
/** | |||
* A special thread to run the stream query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might comment that we actually check for this class elsewhere. This part is not super clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might comment that we actually check for this class elsewhere. This part is not super clear to me.
Fixed
|
||
Right now, the Kafka source has the following Spark's specific options. | ||
|
||
<table class="table"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would split this into two tables, one that must be specified (subscribe/subscribePattern, and kafka.bootstrap...) and then optional params.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would split this into two tables, one that must be specified (subscribe/subscribePattern, and kafka.bootstrap...) and then optional params.
done
options can be specified for Kafka source.</td> | ||
</tr> | ||
<tr> | ||
<td>kafka.consumer.poll.timeoutMs</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a spark specific one. this can be in the must-be-specified table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a spark specific one. this can be in the must-be-specified table.
This is actually not a Kafka config. This is only used in KafkaSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then why is it named as kafka.something. Its confusing to have all the kafka params be named as kafka.* and this source option also having kafka. format.
Also if you dont expect there to be more params in the subscope of poll, then why make that scope?
How about kafkaConsumer.pollTimeoutMs
?
<td>Number of times to retry before giving up fatch Kafka latest offsets.</td> | ||
</tr> | ||
<tr> | ||
<td>fetchOffset.retry.intervalMs</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why another retry scope? Why not fetchOffset.retryIntervalMs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why another retry scope? Why not fetchOffset.retryIntervalMs?
Renamed
</tr> | ||
<tr> | ||
<td>kafka.consumer.poll.timeoutMs</td> | ||
<td>long</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can keep this is int
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can keep this is int
Since this is a milliseconds, I try to make the type consistent with other configurations in Spark.
</tr> | ||
<tr> | ||
<td>fetchOffset.retry.intervalMs</td> | ||
<td>long</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can keep this as int.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can keep this as int.
Since this is a milliseconds, I try to make the type consistent with other configurations in Spark.
@@ -1126,6 +1126,12 @@ aggDF \ | |||
</div> | |||
</div> | |||
|
|||
## Advanced Sources |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Advanced? How about Other Sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Advanced? How about Other Sources?
Copied from http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well there it was explicitly categories as inbuilt sources and advanced sources before this section. Here that is not there. So the term "advanced" does not make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Best to move it to the same location where the File and socket sources are discussed. Insert a subheading called Input Sources to mirror the subheading Output Sinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas updated. I still left the hard code Spark version there. It's better to make another PR to clean up all these hard-code versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool.
|
||
private val sc = sqlContext.sparkContext | ||
|
||
private val pollTimeoutMs = sourceOptions.getOrElse("kafka.consumer.poll.timeoutMs", "512").toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this option named as "kafka.consumer.poll.timeoutMs". This makes it look like that it is a kafka param. Which it is not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this option named as "kafka.consumer.poll.timeoutMs". This makes it look like that it is a kafka param. Which it is not.
How about executor.poll.timeoutMs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to kafkaConsumer.pollTimeoutMs
|Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported. | ||
|Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to | ||
|specify where to start. Structured Streaming manages which offsets are consumed | ||
|internally, rather than relying on the kafka Consumer to do it. This will ensure that no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: kafka Consumer --> KafkaConsumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #66338 has finished for PR 15102 at commit
|
Test build #66345 has finished for PR 15102 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. There are a couple of small issues, and quite a few nits.
} | ||
|
||
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) | ||
logWarning(s"$deletedPartitions are gone. Some data may have been missed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt this be reportDataLoss instead of logWarning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
/** | ||
* Fetch the earliest offsets for newly discovered partitions. The return results may not contain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return results --> result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* Lazy set initialPartitionOffsets to make sure only call `KafkaConsumer.poll` in | ||
* StreamExecutionThread. Otherwise, interrupting a thread running `KafkaConsumer.poll` may hang | ||
* forever (KAFKA-1894). | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
- Lazily initialize initialPartitionOffsets to make sure that
KafkaConsumer.poll
is only called in StreamExecutionThread. - interrupting a thread while running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
/** Returns the maximum available offset for this source. */ | ||
override def getOffset: Option[Offset] = { | ||
// Make sure initialPartitionOffsets is set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is set --> is initialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
testUnsupportedConfig("kafka.key.deserializer") | ||
testUnsupportedConfig("kafka.value.deserializer") | ||
|
||
// only earliest and latest is supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be removed. this is outdated comment after we removed support for auto.offset.reset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
</tr> | ||
</table> | ||
|
||
The following options should be set for the Kafka source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should --> must be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
</tr> | ||
</table> | ||
|
||
The rest configurations are optional: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the following configurations ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
topics/partitions are dynamically subscribed. Note that `startingOffset` only applies when a new | ||
Streaming query is started, and that resuming will always pick up from where the query left off. | ||
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use | ||
Dataframe operations to explicitly deserialize the keys. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Dataframe -> DataFrame
multiple places have this issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Test build #66396 has finished for PR 15102 at commit
|
Test build #66397 has finished for PR 15102 at commit
|
Test build #66398 has finished for PR 15102 at commit
|
Merged this to master, @zsxwing do you have another PR for 2.0. |
## What changes were proposed in this pull request? This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source. It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing tdas did most of work and part of them was inspired by koeninger's work. ### Introduction The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows: Column | Type ---- | ---- key | binary value | binary topic | string partition | int offset | long timestamp | long timestampType | int The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic. ### Configuration The user can use `DataStreamReader.option` to set the following configurations. Kafka Source's options | value | default | meaning ------ | ------- | ------ | ----- startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets. fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")` ### Usage * Subscribe to 1 topic ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1") .load() ``` * Subscribe to multiple topics ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1,topic2") .load() ``` * Subscribe to a pattern ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribePattern", "topic.*") .load() ``` ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Shixiong Zhu <zsxwing@gmail.com> Author: cody koeninger <cody@koeninger.org> Closes apache#15102 from zsxwing/kafka-source.
What changes were proposed in this pull request?
This PR adds a new project
external/kafka-0-10-sql
for Structured Streaming Kafka source.It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
@tdas did most of work and part of them was inspired by @koeninger's work.
Introduction
The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:
The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.
Configuration
The user can use
DataStreamReader.option
to set the following configurations.Kafka's own configurations can be set via
DataStreamReader.option
withkafka.
prefix, e.g,stream.option("kafka.bootstrap.servers", "host:port")
Usage
How was this patch tested?
The new unit tests.