Skip to content

Commit d16e161

Browse files
aniketbhatnagarandrewor14
authored andcommitted
SPARK-3639 | Removed settings master in examples
This patch removes setting of master as local in Kinesis examples so that users can set it using submit-job. Author: aniketbhatnagar <aniket.bhatnagar@gmail.com> Closes #2536 from aniketbhatnagar/Kinesis-Examples-Master-Unset and squashes the following commits: c9723ac [aniketbhatnagar] Merge remote-tracking branch 'origin/Kinesis-Examples-Master-Unset' into Kinesis-Examples-Master-Unset fec8ead [aniketbhatnagar] SPARK-3639 | Removed settings master in examples 31cdc59 [aniketbhatnagar] SPARK-3639 | Removed settings master in examples
1 parent 1aa549b commit d16e161

File tree

2 files changed

+9
-13
lines changed

2 files changed

+9
-13
lines changed

extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@
7171
* org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \
7272
* https://kinesis.us-east-1.amazonaws.com
7373
*
74+
* Note that number of workers/threads should be 1 more than the number of receivers.
75+
* This leaves one thread available for actually processing the data.
76+
*
7477
* There is a companion helper class called KinesisWordCountProducerASL which puts dummy data
7578
* onto the Kinesis stream.
7679
* Usage instructions for KinesisWordCountProducerASL are provided in the class definition.
@@ -114,12 +117,8 @@ public static void main(String[] args) {
114117
/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */
115118
int numStreams = numShards;
116119

117-
/* Must add 1 more thread than the number of receivers or the output won't show properly from the driver */
118-
int numSparkThreads = numStreams + 1;
119-
120120
/* Setup the Spark config. */
121-
SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount").setMaster(
122-
"local[" + numSparkThreads + "]");
121+
SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount");
123122

124123
/* Kinesis checkpoint interval. Same as batchInterval for this example. */
125124
Duration checkpointInterval = batchInterval;

extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ import org.apache.log4j.Level
6565
* org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \
6666
* https://kinesis.us-east-1.amazonaws.com
6767
*
68+
*
69+
* Note that number of workers/threads should be 1 more than the number of receivers.
70+
* This leaves one thread available for actually processing the data.
71+
*
6872
* There is a companion helper class below called KinesisWordCountProducerASL which puts
6973
* dummy data onto the Kinesis stream.
7074
* Usage instructions for KinesisWordCountProducerASL are provided in that class definition.
@@ -97,17 +101,10 @@ private object KinesisWordCountASL extends Logging {
97101
/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
98102
val numStreams = numShards
99103

100-
/*
101-
* numSparkThreads should be 1 more thread than the number of receivers.
102-
* This leaves one thread available for actually processing the data.
103-
*/
104-
val numSparkThreads = numStreams + 1
105-
106104
/* Setup the and SparkConfig and StreamingContext */
107105
/* Spark Streaming batch interval */
108-
val batchInterval = Milliseconds(2000)
106+
val batchInterval = Milliseconds(2000)
109107
val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
110-
.setMaster(s"local[$numSparkThreads]")
111108
val ssc = new StreamingContext(sparkConfig, batchInterval)
112109

113110
/* Kinesis checkpoint interval. Same as batchInterval for this example. */

0 commit comments

Comments
 (0)