Skip to content

[SPARK-8390][Streaming][Kafka] fix docs related to HasOffsetRanges #6863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

koeninger
Copy link
Contributor

No description provided.

@SparkQA
Copy link

SparkQA commented Jun 17, 2015

Test build #35065 has finished for PR 6863 at commit bb4336b.

  • This patch fails some tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor Author

I don't think a doc change caused a python test failure.
On Jun 17, 2015 5:27 PM, "UCB AMPLab" notifications@github.com wrote:

Merged build finished. Test FAILed.


Reply to this email directly or view it on GitHub
#6863 (comment).


- *Efficiency:* Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminate the problem as there is no receiver, and hence no need for Write Ahead Logs.
- *Efficiency:* Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there two spaces before "As long as"

@tdas
Copy link
Contributor

tdas commented Jun 18, 2015

If the java example is updated to the access the offsets, then the scala example should also be updated.

@koeninger
Copy link
Contributor Author

Yeah, the spacing in that document in general is a mess (mix of tabs and spaces, some 2 spaces between sentences, etc). I cleaned it up somewhat. Also further fixed the scala / java offset ranges examples, the java one is pretty much a C&P from the test at this point.

@SparkQA
Copy link

SparkQA commented Jun 18, 2015

Test build #35087 has finished for PR 6863 at commit b108c9d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


- *Exactly-once semantics:* The first approach uses Kafka's high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper and offsets tracked only by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.
- *Exactly-once semantics:* The first approach uses Kafka's high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your save method must be either idempotent, or an atomic transaction that saves results and offsets in your own data store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "save method" --> "output operation that saves the data to external data stores"
  • Also refer to the fault-tolerance section in the main programming guide. This stuff is common for everything.
    *nit: Can you capitalize Write Ahead Logs

@tdas
Copy link
Contributor

tdas commented Jun 18, 2015

Could you update the example to something that accesses the offsets in the foreachRDD?

var offsetRanges: OffsetRange[] = _
...
kafkaStream.transform { rdd => 
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
}.map { ... }.reduceByKey { ... }.foreachRDD { rdd => 
   // use offsetRanges to do something, maybe print the ranges
}

Do the same for Java and Scala

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35279 has finished for PR 6863 at commit 3744492.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35280 has finished for PR 6863 at commit 26a06bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

So the JIRA was about updating the examples actually. Its great that you have updated the docs AND the tests, but it would ideal if the examples DirectKafkaWordCount and JavaDirectKafkaWordCount are updated to show how the offset ranges can be accessed. Since you have updated the tests, mind updating the examples as well?

@koeninger
Copy link
Contributor Author

The word count examples don't have any need of accessing offsets.

Wouldn't it be better to have separate examples? I don't want someone
thinking they need to do all this typecast hoop jumping just to get a word
count

On Fri, Jun 19, 2015 at 3:16 PM, Tathagata Das notifications@github.com
wrote:

So the JIRA was about updating the examples actually. Its great that you
have updated the docs AND the tests, but it would ideal if the examples
DirectKafkaWordCount and JavaDirectKafkaWordCount are updated to show how
the offset ranges can be accessed. Since you have updated the tests, mind
updating the examples as well?


Reply to this email directly or view it on GitHub
#6863 (comment).

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

That is a good point. Then make two different examples one for Java and
Scala.
BTW, I would like to have this for 1.4.1, so gotta merge soon. So I think I
will merge this, and you can make another PR with the same JIRA to add the
new examples.

On Fri, Jun 19, 2015 at 1:29 PM, Cody Koeninger notifications@github.com
wrote:

The word count examples don't have any need of accessing offsets.

Wouldn't it be better to have separate examples? I don't want someone
thinking they need to do all this typecast hoop jumping just to get a word
count

On Fri, Jun 19, 2015 at 3:16 PM, Tathagata Das notifications@github.com
wrote:

So the JIRA was about updating the examples actually. Its great that you
have updated the docs AND the tests, but it would ideal if the examples
DirectKafkaWordCount and JavaDirectKafkaWordCount are updated to show how
the offset ranges can be accessed. Since you have updated the tests, mind
updating the examples as well?


Reply to this email directly or view it on GitHub
#6863 (comment).


Reply to this email directly or view it on GitHub
#6863 (comment).

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

Could you update title to get the ordering right? [JIRA][Streaming][Kafka]. Otherwise LGTM.

@koeninger koeninger changed the title [Streaming][Kafka][SPARK-8390] fix docs related to HasOffsetRanges [SPARK-8390][Streaming][Kafka] fix docs related to HasOffsetRanges Jun 19, 2015
@koeninger
Copy link
Contributor Author

Title should be right now
On Jun 19, 2015 4:26 PM, "Tathagata Das" notifications@github.com wrote:

Could you update title to get the ordering right?
[JIRA][Streaming][Kafka]. Otherwise LGTM.


Reply to this email directly or view it on GitHub
#6863 (comment).

@asfgit asfgit closed this in b305e37 Jun 20, 2015
asfgit pushed a commit that referenced this pull request Jun 20, 2015
Author: cody koeninger <cody@koeninger.org>

Closes #6863 from koeninger/SPARK-8390 and squashes the following commits:

26a06bd [cody koeninger] Merge branch 'master' into SPARK-8390
3744492 [cody koeninger] [Streaming][Kafka][SPARK-8390] doc changes per TD, test to make sure approach shown in docs actually compiles + runs
b108c9d [cody koeninger] [Streaming][Kafka][SPARK-8390] further doc fixes, clean up spacing
bb4336b [cody koeninger] [Streaming][Kafka][SPARK-8390] fix docs related to HasOffsetRanges, cleanup
3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out of the existing java direct stream api
@tdas
Copy link
Contributor

tdas commented Jun 20, 2015

I have merged this to master and branch-1.4, thanks man!

nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 22, 2015
Author: cody koeninger <cody@koeninger.org>

Closes apache#6863 from koeninger/SPARK-8390 and squashes the following commits:

26a06bd [cody koeninger] Merge branch 'master' into SPARK-8390
3744492 [cody koeninger] [Streaming][Kafka][SPARK-8390] doc changes per TD, test to make sure approach shown in docs actually compiles + runs
b108c9d [cody koeninger] [Streaming][Kafka][SPARK-8390] further doc fixes, clean up spacing
bb4336b [cody koeninger] [Streaming][Kafka][SPARK-8390] fix docs related to HasOffsetRanges, cleanup
3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out of the existing java direct stream api
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants