-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java code snippet for Kafka 0.10 integration doc #15679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@koeninger @zsxwing it'd be great if you can take a look, thanks! |
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); | ||
|
||
// some time later, after outputs have completed | ||
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i personally feel it'd be strange while we can stream.asInstanceOf[CanCommitOffsets].commitAsync(...) in scala, we must ((CanCommitOffsets) stream**.inputDStream()**).commitAsync(...) in java? I can open a pr to fix this when needed. @koeninger @zsxwing options please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's far too late to fix those issues at this point. DStreams return an RDD, not a parameterized type. KafkaUtils methods return DStreams and RDDs, not an implementation specific type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks Cody. sorry for not being clear, but my point was that the Java kafka input stream does not implements CanCommitOffsets
, thus it has to delegate commitAsync(...)
explicitly to stream.inputDStream()
, which is a scala input stream which implements CanCommitOffsets
.
should createDirectStream()
return a java kafka inputdstream that also implements CanCommitOffsets
? so people can write:
((CanCommitOffsets) stream).commitAsync(offsetRanges);
rather than
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understood your point. My point is that you have to do the same kind of delegation to get access to HasOffsetRanges on a java rdd, and you're unlikely to be able to fix that kind of thing at this point without either changing the interfaces for dstream, or exposing implementation classes, which Spark is historically very much against.
Test build #67735 has finished for PR 15679 at commit
|
Oh there's a on-going 2.0.2 RC cut. It'd be great if we can make this into 2.0.2. So also cc @srowen to take a look, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor style things
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch processing, you can create | |||
|
|||
</div> | |||
<div data-lang="java" markdown="1"> | |||
// Import dependencies and create kafka params as in Create Direct Stream above | |||
|
|||
OffsetRange[] offsetRanges = new OffsetRange[]{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can just write ... = { ... }
in Java for a static array declaration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done -- thanks!
OffsetRange.create("test", 1, 0, 100) | ||
}; | ||
|
||
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.<String, String>createRDD( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think <String, String>
is needed after KafkaUtils
in Java 7?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); | ||
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() { | ||
@Override | ||
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this actually throw Exception
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done -- my bad
|
||
// begin from the the offsets committed to the database | ||
Map<TopicPartition, Long> fromOffsets = new HashMap<>(); | ||
for (resultSet: selectOffsetsFromYourDatabase) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space before colon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
Object results = yourCalculation(rdd); | ||
|
||
yourTransactionBlock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this syntax? don't think this is Java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, just to keep it consistent with the scala counterpart
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Sean, this would probably be clearer if it was changed to a comment like
// begin your transaction
...
// end your transaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well received; thanks!
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import scala.Tuple2; | ||
|
||
Map<String, Object> kafkaParams = new HashMap<String, Object>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: is the map value type really Object for the Kafka API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test build #67760 has finished for PR 15679 at commit
|
Were these extracted from compiled example projects, or just written up? |
@koeninger thanks. These were extracted from my local compiled example projects -- no compiled kafka010 examples yet in the Spark repo, right? |
}.toMap | ||
|
||
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import is in an awkward place, omit it or at least move it to the top of this example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Thanks for working on this, couple minor things to fix but otherwise looks good. |
Test build #67766 has finished for PR 15679 at commit
|
Test build #67767 has finished for PR 15679 at commit
|
LGTM |
merged to master/2.0 |
… for Kafka 0.10 integration doc ## What changes were proposed in this pull request? added java code snippet for Kafka 0.10 integration doc ## How was this patch tested? SKIP_API=1 jekyll build ## Screenshot  Author: Liwei Lin <lwlin7@gmail.com> Closes #15679 from lw-lin/kafka-010-examples. (cherry picked from commit 505b927) Signed-off-by: Sean Owen <sowen@cloudera.com>
@srowen @koeninger thanks for the review! |
… for Kafka 0.10 integration doc ## What changes were proposed in this pull request? added java code snippet for Kafka 0.10 integration doc ## How was this patch tested? SKIP_API=1 jekyll build ## Screenshot  Author: Liwei Lin <lwlin7@gmail.com> Closes apache#15679 from lw-lin/kafka-010-examples.
… for Kafka 0.10 integration doc ## What changes were proposed in this pull request? added java code snippet for Kafka 0.10 integration doc ## How was this patch tested? SKIP_API=1 jekyll build ## Screenshot  Author: Liwei Lin <lwlin7@gmail.com> Closes apache#15679 from lw-lin/kafka-010-examples.
What changes were proposed in this pull request?
added java code snippet for Kafka 0.10 integration doc
How was this patch tested?
SKIP_API=1 jekyll build
Screenshot