Skip to content

Commit bab1e12

Browse files
author
Arthur Rand
authored
[SPARK-529] allow for application args to have arguments withou equals sign, test (apache#182)
* allow for application args to have arguments withou equals sign, test * change setup and teardown back, oops
1 parent 5e22577 commit bab1e12

File tree

5 files changed

+82
-19
lines changed

5 files changed

+82
-19
lines changed

cli/dcos-spark/submit_builder.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,24 @@ ARGLOOP:
315315
// flush the rest and exit.
316316
for i < len(args) {
317317
arg = args[i]
318+
// if we have a --flag going to the application we need to take the arg (flag) and the value ONLY
319+
// if it's not of the format --flag=val which scopt allows
318320
if strings.HasPrefix(arg, "-") {
319321
appFlags = append(appFlags, arg)
322+
if strings.Contains(arg, "=") || (i + 1) >= len(args) {
323+
i += 1
324+
} else {
325+
// if there's a value with this flag, add it
326+
if !strings.HasPrefix(args[i + 1], "-") {
327+
appFlags = append(appFlags, args[i + 1])
328+
i += 1
329+
}
330+
i += 1
331+
}
320332
} else {
321333
argsEquals = append(argsEquals, arg)
334+
i += 1
322335
}
323-
i += 1
324336
}
325337
break
326338
}

docs/run-job.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ enterprise: 'no'
88
to a location visible to the cluster (e.g., HTTP, S3, or HDFS). [Learn more][13].
99

1010
1. Run the job.
11+
Include all configuration flags before the jar url and the args for your spark job after the jar url. Generally following the template `dcos spark run --submit-args="<flags> URL [args]` where `<flags>` can be things like `--conf spark.cores.max=16` and `--class my.aprk.App`, `URL` is the location of the application, and `[args]` are any arguments for the application.
12+
13+
dcos spark run --submit-args=--class MySampleClass http://external.website/mysparkapp.jar"
1114

12-
dcos spark run --submit-args=`--class MySampleClass http://external.website/mysparkapp.jar 30`
13-
14-
dcos spark run --submit-args="--py-files mydependency.py http://external.website/mysparkapp.py 30"
15+
dcos spark run --submit-args="--py-files mydependency.py http://external.website/mysparkapp.py"
1516

1617
dcos spark run --submit-args="http://external.website/mysparkapp.R"
1718

18-
You can submit arbitrary pass-through options to this script via the `--submit-args` options.
19-
2019
If your job runs successfully, you will get a message with the job’s submission ID:
2120

2221
Run job succeeded. Submission id: driver-20160126183319-0001

tests/jobs/scala/build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ lazy val root = (project in file("."))
44
scalaVersion := "2.11.8",
55
libraryDependencies ++= Seq(
66
"org.apache.spark" %% "spark-core" % "2.1.0" % "provided",
7-
"org.apache.hadoop" % "hadoop-aws" % "2.6.0"
7+
"org.apache.hadoop" % "hadoop-aws" % "2.6.0",
8+
"com.github.scopt" %% "scopt" % "3.7.0"
89
)
910
)
1011

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,42 @@
1+
import scopt.OptionParser
12
import org.apache.spark.SparkContext
23
import org.apache.spark.SparkContext._
34
import org.apache.spark.SparkConf
45

56
object S3Job {
67
def main(args: Array[String]): Unit = {
7-
val conf = new SparkConf().setAppName("S3 Test")
8-
val sc = new SparkContext(conf)
8+
object config {
9+
var readurl: String = null
10+
var writeurl: String = null
11+
var countonly: Boolean = false
12+
}
913

10-
val readURL = args(0)
11-
val writeURL = args(1)
12-
println(s"Reading from ${readURL}. Writing to ${writeURL}.")
14+
val parser = new OptionParser[Unit]("S3 Job") {
15+
opt[String]("readUrl").action((x, _) => config.readurl = x)
16+
opt[String]("writeUrl").action((x, _) => config.writeurl = x)
17+
opt[Unit]("countOnly").action((_, _) => config.countonly = true)
18+
}
1319

14-
val textRDD = sc.textFile(readURL)
15-
println(s"Read ${textRDD.count()} lines from${readURL}.")
20+
if (parser.parse(args)) {
21+
println("RUNNING S3 JOB")
22+
val conf = new SparkConf().setAppName("S3 Test")
23+
val sc = new SparkContext(conf)
1624

17-
textRDD.map(_.length).saveAsTextFile(writeURL)
18-
println(s"Wrote ${textRDD.count()} lines to ${writeURL}.")
25+
val readURL = config.readurl
26+
val writeURL = config.writeurl
1927

20-
sc.stop()
28+
println(s"Reading from ${readURL}. Writing to ${writeURL}.")
29+
30+
val textRDD = sc.textFile(readURL)
31+
println(s"Read ${textRDD.count()} lines from ${readURL}.")
32+
33+
textRDD.map(_.length).saveAsTextFile(writeURL)
34+
println(s"Wrote ${textRDD.count()} lines to ${writeURL}.")
35+
36+
sc.stop()
37+
} else {
38+
println("Error bad arguments")
39+
System.exit(1)
40+
}
2141
}
2242
}

tests/test_spark.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,12 @@ def _check_task_network_info(task):
231231

232232

233233
@pytest.mark.sanity
234+
@pytest.mark.runnow
234235
def test_s3():
235236
linecount_path = os.path.join(THIS_DIR, 'resources', 'linecount.txt')
236237
s3.upload_file(linecount_path)
237238

238-
app_args = "{} {}".format(
239+
app_args = "--readUrl {} --writeUrl {}".format(
239240
s3.s3n_url('linecount.txt'),
240241
s3.s3n_url("linecount-out"))
241242

@@ -248,12 +249,42 @@ def test_s3():
248249
"--class", "S3Job"]
249250
utils.run_tests(app_url=_scala_test_jar_url(),
250251
app_args=app_args,
251-
expected_output="",
252+
expected_output="Read 3 lines",
252253
app_name="/spark",
253254
args=args)
254255

255256
assert len(list(s3.list("linecount-out"))) > 0
256257

258+
app_args = "--readUrl {} --countOnly".format(s3.s3n_url('linecount.txt'))
259+
260+
args = ["--conf",
261+
"spark.mesos.driverEnv.AWS_ACCESS_KEY_ID={}".format(
262+
os.environ["AWS_ACCESS_KEY_ID"]),
263+
"--conf",
264+
"spark.mesos.driverEnv.AWS_SECRET_ACCESS_KEY={}".format(
265+
os.environ["AWS_SECRET_ACCESS_KEY"]),
266+
"--class", "S3Job"]
267+
utils.run_tests(app_url=_scala_test_jar_url(),
268+
app_args=app_args,
269+
expected_output="Read 3 lines",
270+
app_name="/spark",
271+
args=args)
272+
273+
app_args = "--countOnly --readUrl {}".format(s3.s3n_url('linecount.txt'))
274+
275+
args = ["--conf",
276+
"spark.mesos.driverEnv.AWS_ACCESS_KEY_ID={}".format(
277+
os.environ["AWS_ACCESS_KEY_ID"]),
278+
"--conf",
279+
"spark.mesos.driverEnv.AWS_SECRET_ACCESS_KEY={}".format(
280+
os.environ["AWS_SECRET_ACCESS_KEY"]),
281+
"--class", "S3Job"]
282+
utils.run_tests(app_url=_scala_test_jar_url(),
283+
app_args=app_args,
284+
expected_output="Read 3 lines",
285+
app_name="/spark",
286+
args=args)
287+
257288

258289
# Skip DC/OS < 1.10, because it doesn't have adminrouter support for service groups.
259290
@pytest.mark.skipif('shakedown.dcos_version_less_than("1.10")')

0 commit comments

Comments
 (0)