Skip to content

Commit 8a7a336

Browse files
WenboZhaoCurtis Howard
authored andcommitted
Support simple rsync schema in the executor uri (apache#12)
* Support simple rsync schema in the executor uri * Support simple rsync schema in the executor uri (cherry picked from commit b043fdb) (cherry picked from commit 6d59ab1)
1 parent 32e4d65 commit 8a7a336

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,26 @@ import org.apache.spark.rpc.RpcAddress
4343
import scala.collection.mutable
4444

4545
object CoarseCookSchedulerBackend {
46-
def fetchUri(uri: String): String =
47-
Option(URI.create(uri).getScheme).map(_.toLowerCase) match {
48-
case Some("http") => s"curl -O $uri"
49-
case Some("spark-rsync") =>
50-
val regex = "^spark-rsync://".r
51-
val cleanURI = regex.replaceFirstIn(uri, "")
52-
"RSYNC_CONNECT_PROG=" + "\"" + "knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT" + "\"" +
53-
s" rsync $$SPARK_DRIVER_PULL_HOST::spark/${cleanURI} ./"
54-
case Some("hdfs") => s"$$HADOOP_COMMAND fs -copyToLocal $uri ."
55-
case Some("rover") =>
56-
val storage = "/opt/ts/services/storage-client.ts_storage_client/bin/storage"
57-
s"$storage -X-Dtwosigma.logdir=$${MESOS_SANDBOX} cp $uri ."
58-
case None | Some("file") => s"cp $uri ."
59-
case Some(x) => sys.error(s"$x not supported yet")
60-
}
46+
47+
// A collection of regexes for extracting information from an URI
48+
private val HTTP_URI_REGEX = """http://(.*)""".r
49+
private val RSYNC_URI_REGEX = """rsync://(.*)""".r
50+
private val SPARK_RSYNC_URI_REGEX = """spark-rsync://(.*)""".r
51+
private val HDFS_URI_REGEX = """hdfs://(.*)""".r
52+
53+
private[spark] def fetchURI(uri: String): String = uri.toLowerCase match {
54+
case HTTP_URI_REGEX(httpURI) =>
55+
s"curl -O http://$httpURI"
56+
case RSYNC_URI_REGEX(file) =>
57+
s"rsync $file ./"
58+
case SPARK_RSYNC_URI_REGEX(file) =>
59+
"RSYNC_CONNECT_PROG=\"knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\"" +
60+
s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./"
61+
case HDFS_URI_REGEX(file) =>
62+
s"$$HADOOP_COMMAND fs -copyToLocal hdfs://$file ."
63+
case _ =>
64+
sys.error(s"$uri not supported yet")
65+
}
6166

6267
def apply(scheduler: TaskSchedulerImpl, sc: SparkContext, cookHost: String,
6368
cookPort: Int): CoarseGrainedSchedulerBackend = {
@@ -176,7 +181,7 @@ class CoarseCookSchedulerBackend(
176181
override def applicationAttemptId(): Option[String] = Some(applicationId())
177182

178183
def createJob(numCores: Double): Job = {
179-
import CoarseCookSchedulerBackend.fetchUri
184+
import CoarseCookSchedulerBackend.fetchURI
180185

181186
val jobId = UUID.randomUUID()
182187
executorUUIDWriter(jobId)
@@ -215,20 +220,20 @@ class CoarseCookSchedulerBackend(
215220

216221
val keystoreUri = conf.getOption("spark.executor.keyStoreFilename")
217222
val keystorePull = keystoreUri.map { uri =>
218-
s"${fetchUri(uri)} && mv $$(basename $uri) spark-executor-keystore"
223+
s"${fetchURI(uri)} && mv $$(basename $uri) spark-executor-keystore"
219224
}
220225

221226
val urisCommand =
222227
uriValues.map { uri =>
223-
s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" +
228+
s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" +
224229
" || (echo \"ERROR FETCHING\" && exit 1)"
225230
}
226231

227232
val shippedTarballs: Seq[String] = conf.getOption("spark.cook.shippedTarballs")
228233
.fold(Seq[String]()){ tgz => tgz.split(",").map(_.trim).toList }
229234

230235
val shippedTarballsCommand = shippedTarballs.map { uri =>
231-
s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)"
236+
s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)"
232237
}
233238

234239
logDebug(s"command: $commandString")
@@ -244,7 +249,7 @@ class CoarseCookSchedulerBackend(
244249
val remoteConfFetch = if (remoteHdfsConf.nonEmpty) {
245250
val name = Paths.get(remoteHdfsConf).getFileName
246251
Seq(
247-
fetchUri(remoteHdfsConf),
252+
fetchURI(remoteHdfsConf),
248253
"mkdir HADOOP_CONF_DIR",
249254
s"tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR",
250255
// This must be absolute because we cd into the spark directory

0 commit comments

Comments
 (0)