@@ -694,7 +694,6 @@ ssc.start() <5>
694
694
<1> +EsSparkStreaming+ import
695
695
<2> Define a case class named +Trip+
696
696
<3> Create a +DStream+ around the +RDD+ of +Trip+ instances
697
- <4> Index the +RDD+ explicitly through +EsSpark+
698
697
<4> Configure the +DStream+ to be indexed explicitly through +EsSparkStreaming+
699
698
<5> Start the streaming process
700
699
@@ -848,16 +847,16 @@ Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>(); <2
848
847
microbatches.add(stringRDD);
849
848
JavaDStream<String><3> stringDStream = jssc.queueStream(microbatches);
850
849
851
- JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); <3 >
850
+ JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); <4 >
852
851
853
- jssc.start() <4 >
852
+ jssc.start() <5 >
854
853
----
855
854
856
855
<1> example of an entry within the +DStream+ - the JSON is _written_ as is, without any transformation
857
856
<2> creating an +RDD+, placing it into a queue, and creating a +DStream+ out of the queued ++RDD++s, treating each as a microbatch.
858
- <2 > notice the +JavaDStream<String>+ signature
859
- <3 > configure stream to index the JSON data through the dedicated +saveJsonToEs+ method
860
- <4 > launch stream job
857
+ <3 > notice the +JavaDStream<String>+ signature
858
+ <4 > configure stream to index the JSON data through the dedicated +saveJsonToEs+ method
859
+ <5 > launch stream job
861
860
862
861
[float]
863
862
[[spark-streaming-write-dyn]]
@@ -1085,7 +1084,7 @@ jssc.start();
1085
1084
<7> Tuple associating +sfo+ and its metadata
1086
1085
<8> Create a +JavaDStream+ out of the +JavaRDD+
1087
1086
<9> Repack the +JavaDStream+ into a +JavaPairDStream+ by mapping the +Tuple2+ identity function over it.
1088
- <8 > +saveToEsWithMeta+ invoked over the +JavaPairDStream+ containing documents and their respective metadata
1087
+ <10 > +saveToEsWithMeta+ invoked over the +JavaPairDStream+ containing documents and their respective metadata
1089
1088
1090
1089
[float]
1091
1090
[[spark-streaming-type-conversion]]
0 commit comments