Skip to content

[SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java code snippet for Kafka 0.10 integration doc #15679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Oct 29, 2016

What changes were proposed in this pull request?

added java code snippet for Kafka 0.10 integration doc

How was this patch tested?

SKIP_API=1 jekyll build

Screenshot

kafka-doc

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 29, 2016

@koeninger @zsxwing it'd be great if you can take a look, thanks!

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i personally feel it'd be strange while we can stream.asInstanceOf[CanCommitOffsets].commitAsync(...) in scala, we must ((CanCommitOffsets) stream**.inputDStream()**).commitAsync(...) in java? I can open a pr to fix this when needed. @koeninger @zsxwing options please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's far too late to fix those issues at this point. DStreams return an RDD, not a parameterized type. KafkaUtils methods return DStreams and RDDs, not an implementation specific type.

Copy link
Contributor Author

@lw-lin lw-lin Oct 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Cody. sorry for not being clear, but my point was that the Java kafka input stream does not implements CanCommitOffsets, thus it has to delegate commitAsync(...) explicitly to stream.inputDStream(), which is a scala input stream which implements CanCommitOffsets.

should createDirectStream() return a java kafka inputdstream that also implements CanCommitOffsets? so people can write:

((CanCommitOffsets) stream).commitAsync(offsetRanges);

rather than

((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood your point. My point is that you have to do the same kind of delegation to get access to HasOffsetRanges on a java rdd, and you're unlikely to be able to fix that kind of thing at this point without either changing the interfaces for dstream, or exposing implementation classes, which Spark is historically very much against.

@SparkQA
Copy link

SparkQA commented Oct 29, 2016

Test build #67735 has finished for PR 15679 at commit 9dc8e0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 29, 2016

Oh there's a on-going 2.0.2 RC cut. It'd be great if we can make this into 2.0.2. So also cc @srowen to take a look, thanks!

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor style things

@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch processing, you can create

</div>
<div data-lang="java" markdown="1">
// Import dependencies and create kafka params as in Create Direct Stream above

OffsetRange[] offsetRanges = new OffsetRange[]{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can just write ... = { ... } in Java for a static array declaration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done -- thanks!

OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.<String, String>createRDD(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think <String, String> is needed after KafkaUtils in Java 7?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually throw Exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done -- my bad


// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet: selectOffsetsFromYourDatabase)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space before colon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


Object results = yourCalculation(rdd);

yourTransactionBlock {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this syntax? don't think this is Java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, just to keep it consistent with the scala counterpart

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Sean, this would probably be clearer if it was changed to a comment like
// begin your transaction
...
// end your transaction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well received; thanks!

import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<String, Object>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: is the map value type really Object for the Kafka API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Oct 29, 2016

Test build #67760 has finished for PR 15679 at commit 400ae12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

Were these extracted from compiled example projects, or just written up?

@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 29, 2016

@koeninger thanks. These were extracted from my local compiled example projects -- no compiled kafka010 examples yet in the Spark repo, right?

}.toMap

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is in an awkward place, omit it or at least move it to the top of this example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@koeninger
Copy link
Contributor

Thanks for working on this, couple minor things to fix but otherwise looks good.

@SparkQA
Copy link

SparkQA commented Oct 29, 2016

Test build #67766 has finished for PR 15679 at commit bd622e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 29, 2016

Test build #67767 has finished for PR 15679 at commit 8be3c8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

LGTM

@srowen
Copy link
Member

srowen commented Oct 30, 2016

merged to master/2.0

asfgit pushed a commit that referenced this pull request Oct 30, 2016
… for Kafka 0.10 integration doc

## What changes were proposed in this pull request?

added java code snippet for Kafka 0.10 integration doc

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshot

![kafka-doc](https://cloud.githubusercontent.com/assets/15843379/19826272/bf0d8a4c-9db8-11e6-9e40-1396723df4bc.png)

Author: Liwei Lin <lwlin7@gmail.com>

Closes #15679 from lw-lin/kafka-010-examples.

(cherry picked from commit 505b927)
Signed-off-by: Sean Owen <sowen@cloudera.com>
@asfgit asfgit closed this in 505b927 Oct 30, 2016
@lw-lin lw-lin deleted the kafka-010-examples branch October 30, 2016 11:05
@lw-lin
Copy link
Contributor Author

lw-lin commented Oct 30, 2016

@srowen @koeninger thanks for the review!

robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
… for Kafka 0.10 integration doc

## What changes were proposed in this pull request?

added java code snippet for Kafka 0.10 integration doc

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshot

![kafka-doc](https://cloud.githubusercontent.com/assets/15843379/19826272/bf0d8a4c-9db8-11e6-9e40-1396723df4bc.png)

Author: Liwei Lin <lwlin7@gmail.com>

Closes apache#15679 from lw-lin/kafka-010-examples.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… for Kafka 0.10 integration doc

## What changes were proposed in this pull request?

added java code snippet for Kafka 0.10 integration doc

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshot

![kafka-doc](https://cloud.githubusercontent.com/assets/15843379/19826272/bf0d8a4c-9db8-11e6-9e40-1396723df4bc.png)

Author: Liwei Lin <lwlin7@gmail.com>

Closes apache#15679 from lw-lin/kafka-010-examples.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants