Skip to content

SPARK-1565, update examples to be used with spark-submit script. #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ unit-tests.log
/lib/
rat-results.txt
scalastyle.txt
conf/*.conf

# For Hive
metastore_db/
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class SparkContext(config: SparkConf) extends Logging {
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
Expand Down
13 changes: 7 additions & 6 deletions examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.examples;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -103,16 +104,16 @@ public static void printWeights(double[] a) {

public static void main(String[] args) {

if (args.length < 3) {
System.err.println("Usage: JavaHdfsLR <master> <file> <iters>");
if (args.length < 2) {
System.err.println("Usage: JavaHdfsLR <file> <iters>");
System.exit(1);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
int ITERATIONS = Integer.parseInt(args[1]);

// Initialize w to a random value
double[] w = new double[D];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.Lists;
import scala.Tuple2;
import scala.Tuple3;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -34,6 +35,8 @@

/**
* Executes a roll up-style query against Apache logs.
*
* Usage: JavaLogQuery [logFile]
*/
public final class JavaLogQuery {

Expand Down Expand Up @@ -97,15 +100,11 @@ public static Stats extractStats(String line) {
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You deleted the usage here. Could you add it back as a comment somewhere else? Right now there's no way to tell what the first parameter should be unless we dig into the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment since it was just optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks.

public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
}

JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);

JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.spark.examples;



import scala.Tuple2;

import com.google.common.collect.Iterables;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -54,20 +57,20 @@ public Double call(Double a, Double b) {
}

public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
if (args.length < 2) {
System.err.println("Usage: JavaPageRank <file> <number_of_iterations>");
System.exit(1);
}

JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
Expand All @@ -87,7 +90,7 @@ public Double call(Iterable<String> rs) {
});

// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
for (int current = 0; current < Integer.parseInt(args[1]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.examples;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand All @@ -25,19 +26,18 @@
import java.util.ArrayList;
import java.util.List;

/** Computes an approximation to pi */
/**
* Computes an approximation to pi
* Usage: JavaSparkPi [slices]
*/
public final class JavaSparkPi {


public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: JavaSparkPi <master> [slices]");
System.exit(1);
}

JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaSparkPi",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
Expand Down
24 changes: 11 additions & 13 deletions examples/src/main/java/org/apache/spark/examples/JavaTC.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@

package org.apache.spark.examples;

import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

/**
* Transitive closure on a graph, implemented in Java.
* Usage: JavaTC [slices]
*/
public final class JavaTC {

Expand Down Expand Up @@ -61,14 +64,9 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> t
}

public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaTC <host> [<slices>]");
System.exit(1);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();

// Linear transitive closure: each round grows paths by one edge,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.examples;

import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -33,14 +34,15 @@ public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");

if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}

JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
Expand Down
22 changes: 11 additions & 11 deletions examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.examples.mllib;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -57,23 +58,22 @@ public String call(Tuple2<Object, double[]> element) {

public static void main(String[] args) {

if (args.length != 5 && args.length != 6) {
if (args.length < 4) {
System.err.println(
"Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
"Usage: JavaALS <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
System.exit(1);
}

int rank = Integer.parseInt(args[2]);
int iterations = Integer.parseInt(args[3]);
String outputDir = args[4];
SparkConf sparkConf = new SparkConf().setAppName("JavaALS");
int rank = Integer.parseInt(args[1]);
int iterations = Integer.parseInt(args[2]);
String outputDir = args[3];
int blocks = -1;
if (args.length == 6) {
blocks = Integer.parseInt(args[5]);
if (args.length == 5) {
blocks = Integer.parseInt(args[4]);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);

JavaRDD<Rating> ratings = lines.map(new ParseRating());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -48,24 +49,21 @@ public Vector call(String line) {
}

public static void main(String[] args) {

if (args.length < 4) {
if (args.length < 3) {
System.err.println(
"Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]");
"Usage: JavaKMeans <input_file> <k> <max_iterations> [<runs>]");
System.exit(1);
}

String inputFile = args[1];
int k = Integer.parseInt(args[2]);
int iterations = Integer.parseInt(args[3]);
String inputFile = args[0];
int k = Integer.parseInt(args[1]);
int iterations = Integer.parseInt(args[2]);
int runs = 1;

if (args.length >= 5) {
runs = Integer.parseInt(args[4]);
if (args.length >= 4) {
runs = Integer.parseInt(args[3]);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(inputFile);

JavaRDD<Vector> points = lines.map(new ParsePoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -51,17 +52,16 @@ public LabeledPoint call(String line) {
}

public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
if (args.length != 3) {
System.err.println("Usage: JavaLR <input_dir> <step_size> <niters>");
System.exit(1);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
SparkConf sparkConf = new SparkConf().setAppName("JavaLR");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);
int iterations = Integer.parseInt(args[3]);
double stepSize = Double.parseDouble(args[1]);
int iterations = Integer.parseInt(args[2]);

// Another way to configure LogisticRegression
//
Expand All @@ -73,7 +73,7 @@ public static void main(String[] args) {
// LogisticRegressionModel model = lr.train(points.rdd());

LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
iterations, stepSize);
iterations, stepSize);

System.out.print("Final w: " + model.weights());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -51,8 +52,8 @@ public void setAge(int age) {
}

public static void main(String[] args) throws Exception {
JavaSparkContext ctx = new JavaSparkContext("local", "JavaSparkSQL",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkSQL.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

// Load a text file and convert each line to a Java Bean.
Expand Down
Loading