Skip to content

Commit 3744492

Browse files
committed
[Streaming][Kafka][SPARK-8390] doc changes per TD, test to make sure approach shown in docs actually compiles + runs
1 parent b108c9d commit 3744492

File tree

3 files changed

+62
-19
lines changed

3 files changed

+62
-19
lines changed

docs/streaming-kafka-integration.md

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ This approach has the following advantages over the receiver-based approach (i.e
8282

8383
- *Efficiency:* Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
8484

85-
- *Exactly-once semantics:* The first approach uses Kafka's high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your save method must be either idempotent, or an atomic transaction that saves results and offsets in your own data store.
85+
- *Exactly-once semantics:* The first approach uses Kafka's high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see [Semanitcs of output operations](streaming-programming-guide.html#semantics-of-output-operations) in the main programming guide for further information).
8686

8787
Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).
8888

@@ -135,23 +135,49 @@ Next, we discuss how to use this approach in your streaming application.
135135

136136
<div class="codetabs">
137137
<div data-lang="scala" markdown="1">
138-
directKafkaStream.foreachRDD { rdd =>
139-
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
140-
// offsetRanges.length = # of Kafka partitions being consumed
141-
...
138+
// hold a reference to the current offset ranges, so it can be used downstream
139+
var offsetRanges = Array[OffsetRange]()
140+
141+
directKafkaStream.transform { rdd =>
142+
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
143+
rdd
144+
}.map {
145+
...
146+
}.foreachRDD { rdd =>
147+
for (o <- offsetRanges) {
148+
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
149+
}
150+
...
142151
}
143152
</div>
144153
<div data-lang="java" markdown="1">
145-
directKafkaStream.foreachRDD(
146-
new Function<JavaPairRDD<String, String>, Void>() {
147-
@Override
148-
public Void call(JavaPairRDD<String, String> rdd) throws IOException {
149-
OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
150-
// offsetRanges.length() = # of Kafka partitions being consumed
151-
...
152-
return null;
153-
}
154+
// hold a reference to the current offset ranges, so it can be used downstream
155+
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference();
156+
157+
directKafkaStream.transformToPair(
158+
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
159+
@Override
160+
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
161+
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
162+
offsetRanges.set(offsets);
163+
return rdd;
154164
}
165+
}
166+
).map(
167+
...
168+
).foreachRDD(
169+
new Function<JavaPairRDD<String, String>, Void>() {
170+
@Override
171+
public Void call(JavaPairRDD<String, String> rdd) throws IOException {
172+
for (OffsetRange o : offsetRanges.get()) {
173+
System.out.println(
174+
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
175+
);
176+
}
177+
...
178+
return null;
179+
}
180+
}
155181
);
156182
</div>
157183
<div data-lang="python" markdown="1">

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.HashSet;
2323
import java.util.Arrays;
24+
import java.util.concurrent.atomic.AtomicReference;
2425

2526
import scala.Tuple2;
2627

@@ -70,6 +71,8 @@ public void tearDown() {
7071
public void testKafkaStream() throws InterruptedException {
7172
final String topic1 = "topic1";
7273
final String topic2 = "topic2";
74+
// hold a reference to the current offset ranges, so it can be used downstream
75+
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference();
7376

7477
String[] topic1data = createTopicAndSendData(topic1);
7578
String[] topic2data = createTopicAndSendData(topic2);
@@ -95,7 +98,8 @@ public void testKafkaStream() throws InterruptedException {
9598
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
9699
@Override
97100
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
98-
OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
101+
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
102+
offsetRanges.set(offsets);
99103
Assert.assertEquals(offsets[0].topic(), topic1);
100104
return rdd;
101105
}
@@ -133,6 +137,11 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception
133137
@Override
134138
public Void call(JavaRDD<String> rdd) throws Exception {
135139
result.addAll(rdd.collect());
140+
for (OffsetRange o : offsetRanges.get()) {
141+
System.out.println(
142+
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
143+
);
144+
}
136145
return null;
137146
}
138147
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,21 @@ class DirectKafkaStreamSuite
101101

102102
val allReceived = new ArrayBuffer[(String, String)]
103103

104-
stream.foreachRDD { rdd =>
105-
// Get the offset ranges in the RDD
106-
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
104+
// hold a reference to the current offset ranges, so it can be used downstream
105+
var offsetRanges = Array[OffsetRange]()
106+
107+
stream.transform { rdd =>
108+
// Get the offset ranges in the RDD
109+
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
110+
rdd
111+
}.foreachRDD { rdd =>
112+
for (o <- offsetRanges) {
113+
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
114+
}
107115
val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
108116
// For each partition, get size of the range in the partition,
109117
// and the number of items in the partition
110-
val off = offsets(i)
118+
val off = offsetRanges(i)
111119
val all = iter.toSeq
112120
val partSize = all.size
113121
val rangeSize = off.untilOffset - off.fromOffset

0 commit comments

Comments
 (0)