Skip to content

Commit 684e087

Browse files
author
Venkata krishnan Sowrirajan
committed
1. lambdaS3 bucket to pick spark binaries
2. Moved AWSLambdaClient creation to the top and trying to use the same everywhere 3. Pass java xmx to be the memory from the lambda function 4. Clean up qubole occurrences and other redundant stuff
1 parent 03f93e6 commit 684e087

File tree

10 files changed

+66
-149
lines changed

10 files changed

+66
-149
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -345,12 +345,4 @@ public static byte[] bufferToArray(ByteBuffer buffer) {
345345
}
346346
}
347347

348-
public static Path localFileToS3(String s3PrefixLocation, File path) throws IOException {
349-
return new Path(s3PrefixLocation, path.getCanonicalPath());
350-
}
351-
352-
public static File s3ToLocalFile(String s3PrefixLocation, Path path) {
353-
return new File(path.toString().replace(s3PrefixLocation, ""));
354-
}
355-
356348
}

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
119119

120120
private SparkConf conf;
121121
private boolean shuffleOverS3 = false;
122-
private String s3PrefixLocation = "";
122+
private String shuffleS3Bucket = "";
123123

124124
private Configuration hadoopConf;
125125
private FileSystem hadoopFileSystem;
@@ -144,8 +144,8 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
144144
this.serializer = dep.serializer();
145145
this.shuffleBlockResolver = shuffleBlockResolver;
146146
this.conf = conf;
147-
this.shuffleOverS3 = conf.getBoolean("spark.shuffle.s3.enabled", shuffleOverS3);
148-
this.s3PrefixLocation = conf.get("spark.qubole.s3PrefixLocation", "s3://dev.canopydata.com/vsowrira/");
147+
this.shuffleOverS3 = blockManager.shuffleOverS3Enabled();
148+
this.shuffleS3Bucket = BlockManager.getS3Bucket(conf);
149149
this.hadoopConf = BlockManager.getHadoopConf(conf);
150150
this.hadoopFileSystem = BlockManager.getHadoopFileSystem(conf);
151151
}
@@ -219,7 +219,7 @@ long[] getPartitionLengths() {
219219
*/
220220
private long[] writePartitionedFile(File outputFile) throws IOException {
221221
if(shuffleOverS3) {
222-
Path outputPath = Utils.localFileToS3(s3PrefixLocation, outputFile);
222+
Path outputPath = Utils.localFileToS3(shuffleS3Bucket, outputFile);
223223
return writePartitionedFileToS3(outputPath);
224224
}
225225

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
273273

274274
if (shuffleOverS3Enabled) {
275275
Path outputPath = Utils.localFileToS3(
276-
blockManager.getS3PrefixLocation(sparkConf), outputFile);
276+
blockManager.getS3Bucket(sparkConf), outputFile);
277277
FileSystem fileSystem = outputPath.getFileSystem(
278278
BlockManager.getHadoopConf(sparkConf));
279279
FSDataOutputStream outputStream;
@@ -379,7 +379,7 @@ private long[] mergeSpillsWithFileStream(
379379

380380
if (blockManager.shuffleOverS3Enabled()) {
381381
Path outputPath = Utils.localFileToS3(
382-
blockManager.getS3PrefixLocation(sparkConf), outputFile);
382+
blockManager.getS3Bucket(sparkConf), outputFile);
383383
FileSystem fileSystem = outputPath.getFileSystem(
384384
BlockManager.getHadoopConf(sparkConf));
385385
FSDataOutputStream outputStream = fileSystem.create(outputPath);

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ private[spark] class CoarseGrainedExecutorBackend(
6868
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
6969
}(ThreadUtils.sameThread)
7070

71-
val requestId = env.conf.get("spark.qubole.lambda.awsRequestId")
72-
val logGroupName = env.conf.get("spark.qubole.lambda.logGroupName")
73-
val logStreamName = env.conf.get("spark.qubole.lambda.logStreamName")
71+
val requestId = env.conf.get("spark.lambda.awsRequestId")
72+
val logGroupName = env.conf.get("spark.lambda.logGroupName")
73+
val logStreamName = env.conf.get("spark.lambda.logStreamName")
7474
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
7575
// This is a very fast action so we can use "ThreadUtils.sameThread"
7676
driver = Some(ref)

core/src/main/scala/org/apache/spark/scheduler/cluster/LambdaSchedulerBackend.scala

Lines changed: 30 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
2020
import java.util.concurrent.atomic.AtomicInteger
2121

2222
import com.amazonaws.ClientConfiguration
23+
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
2324
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
2425
import com.amazonaws.services.lambda.invoke.{LambdaFunction, LambdaInvokerFactory}
2526
import com.amazonaws.services.lambda.model.InvokeRequest
@@ -126,8 +127,8 @@ private[spark] class LambdaSchedulerBackend(
126127
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
127128
with Logging {
128129

129-
val lambdaFunctionName = sc.conf.get("spark.qubole.lambda.function.name", "get_spark_from_s3")
130-
val s3SparkVersion = sc.conf.get("spark.qubole.lambda.spark.software.version", "LATEST")
130+
val lambdaFunctionName = sc.conf.get("spark.lambda.function.name", "get_spark_from_s3")
131+
val s3SparkVersion = sc.conf.get("spark.lambda.spark.software.version", "LATEST")
131132
var numExecutorsExpected = 0
132133
var numExecutorsRegistered = new AtomicInteger(0)
133134
var executorId = new AtomicInteger(0)
@@ -137,7 +138,7 @@ private[spark] class LambdaSchedulerBackend(
137138
// Set of executorIds which are currently alive
138139
val liveExecutors = new HashSet[String]
139140

140-
var lambdaContainerMemoryBytes: Int = 0
141+
var lambdaContainerMemory: Int = 0
141142
var lambdaContainerTimeoutSecs: Int = 0
142143

143144
val clientConfig = new ClientConfiguration()
@@ -146,22 +147,32 @@ private[spark] class LambdaSchedulerBackend(
146147
clientConfig.setRequestTimeout(345680)
147148
clientConfig.setSocketTimeout(345681)
148149

150+
val lambdaBucket = Option(sc.getConf.get("spark.lambda.s3.bucket"))
151+
152+
if (!lambdaBucket.isDefined) {
153+
throw new Exception(s"spark.lambda.s3.bucket should" +
154+
s" have a valid S3 bucket name having Spark binaries")
155+
}
156+
157+
val lambdaClient = AWSLambdaClientBuilder
158+
.standard()
159+
.withClientConfiguration(clientConfig)
160+
.build()
161+
149162
final val lambdaExecutorService: LambdaExecutorService =
150163
LambdaInvokerFactory.builder()
151-
.lambdaClient(AWSLambdaClientBuilder.standard().withClientConfiguration(clientConfig).build())
164+
.lambdaClient(lambdaClient)
152165
.build(classOf[LambdaExecutorService])
153166
logInfo(s"Created LambdaExecutorService: $lambdaExecutorService")
154167

155-
val maxConcurrentRequests = sc.conf.getInt("spark.qubole.lambda.concurrent.requests.max", 100)
168+
val maxConcurrentRequests = sc.conf.getInt("spark.lambda.concurrent.requests.max", 100)
156169
val limiter = RateLimiter.create(maxConcurrentRequests)
157170

158171
override def start() {
159172
super.start()
160173
logInfo("start")
161174
numExecutorsExpected = getInitialTargetExecutorNumber(conf)
162175

163-
val lambdaClient = AWSLambdaClientBuilder.defaultClient()
164-
165176
val request = new com.amazonaws.services.lambda.model.GetFunctionRequest
166177
request.setFunctionName(lambdaFunctionName)
167178
val result = lambdaClient.getFunction(request)
@@ -170,7 +181,7 @@ private[spark] class LambdaSchedulerBackend(
170181
val request2 = new com.amazonaws.services.lambda.model.GetFunctionConfigurationRequest
171182
request2.setFunctionName(lambdaFunctionName)
172183
val result2 = lambdaClient.getFunctionConfiguration(request2)
173-
lambdaContainerMemoryBytes = result2.getMemorySize * 1024 * 1024
184+
lambdaContainerMemory = result2.getMemorySize
174185
lambdaContainerTimeoutSecs = result2.getTimeout
175186
logDebug(s"LAMBDA: 16001: Function configuration: ${result2.toString}")
176187

@@ -191,97 +202,11 @@ private[spark] class LambdaSchedulerBackend(
191202

192203
override def applicationId(): String = {
193204
val appId = super.applicationId()
194-
logInfo(s"applicationId: $appId")
205+
logDebug(s"applicationId: $appId")
195206
return appId
196207
}
197208

198209
private def launchExecutorsOnLambda(newExecutorsNeeded: Int) : Future[Boolean] = {
199-
Future {
200-
// TODO: Can we launch in parallel?
201-
// TODO: Can we track each thread separately and audit
202-
(1 to newExecutorsNeeded).foreach { x =>
203-
val request = new Request
204-
request.setSparkS3Bucket("bharatb")
205-
request.setSparkS3Key(s"lambda/spark-small-${s3SparkVersion}.zip")
206-
request.setHadoop2S3Bucket("bharatb")
207-
request.setHadoop2S3Key(s"lambda/hadoop2-small-${s3SparkVersion}.zip")
208-
request.setHive12S3Bucket("bharatb")
209-
request.setHive12S3Key(s"lambda/hive1.2-small-${s3SparkVersion}.zip")
210-
val hostname = sc.env.rpcEnv.address.host
211-
val port = sc.env.rpcEnv.address.port.toString
212-
request.setSparkDriverHostname(hostname)
213-
request.setSparkDriverPort(port)
214-
215-
val classpathSeq = Seq("spark/assembly/target/scala-2.11/jars/*",
216-
"spark/conf",
217-
"hadoop2/share/hadoop/*",
218-
"hadoop2/share/hadoop/common/lib/*",
219-
"hadoop2/share/hadoop/common/*",
220-
"hadoop2/share/hadoop/hdfs",
221-
"hadoop2/share/hadoop/hdfs/lib/*",
222-
"hadoop2/share/hadoop/hdfs/*",
223-
"hadoop2/share/hadoop/yarn/lib/*",
224-
"hadoop2/share/hadoop/yarn/*",
225-
"hadoop2/share/hadoop/mapreduce/*",
226-
"hadoop2/share/hadoop/tools/lib/*",
227-
"hadoop2/share/hadoop/tools/*",
228-
"hadoop2/share/hadoop/qubole/lib/*",
229-
"hadoop2/share/hadoop/qubole/*",
230-
"hadoop2/etc/hadoop/*",
231-
"hive1.2/lib/*"
232-
)
233-
val classpaths = classpathSeq.map(x => s"/tmp/lambda/$x").mkString(":")
234-
val currentExecutorId = executorId.addAndGet(1)
235-
val containerId = applicationId() + "_%08d".format(currentExecutorId)
236-
request.setSparkCommandLine(
237-
s"java -cp ${classpaths} " +
238-
"-server -Xmx1400m " +
239-
"-Djava.net.preferIPv4Stack=true " +
240-
s"-Dspark.driver.port=${port} " +
241-
// "-Dspark.blockManager.port=12345 " +
242-
"-Dspark.dynamicAllocation.enabled=true " +
243-
"-Dspark.shuffle.service.enabled=false " +
244-
"org.apache.spark.executor.CoarseGrainedExecutorBackend " +
245-
s"--driver-url spark://CoarseGrainedScheduler@${hostname}:${port} " +
246-
s"--executor-id ${currentExecutorId} " +
247-
"--hostname LAMBDA " +
248-
"--cores 1 " +
249-
s"--app-id ${applicationId()} " +
250-
s"--container-id ${containerId} " +
251-
s"--container-size ${lambdaContainerMemoryBytes} " +
252-
"--user-class-path file:/tmp/lambda/* "
253-
)
254-
255-
val lambdaRequesterThread = new Thread() {
256-
override def run() {
257-
val executorId = currentExecutorId.toString
258-
logDebug(s"LAMBDA: 9002: Invoking lambda for $executorId: $request")
259-
numLambdaCallsPending.addAndGet(1)
260-
try {
261-
val response = lambdaExecutorService.runExecutor(request)
262-
logDebug(s"LAMBDA: 9003: Returned from lambda $executorId: $response")
263-
} catch {
264-
case t: Throwable => logError(s"Exception in Lambda invocation: $t")
265-
} finally {
266-
logDebug(s"LAMBDA: 9003: Returned from lambda $executorId: finally block")
267-
numLambdaCallsPending.addAndGet(-1)
268-
pendingLambdaRequests.remove(executorId)
269-
}
270-
}
271-
}
272-
lambdaRequesterThread.setDaemon(true)
273-
lambdaRequesterThread.setName(s"Lambda Requester Thread for $currentExecutorId")
274-
pendingLambdaRequests(currentExecutorId.toString) = lambdaRequesterThread
275-
logDebug(s"LAMBDA: 9004: starting lambda requester thread for $currentExecutorId")
276-
lambdaRequesterThread.start()
277-
278-
logDebug(s"LAMBDA: 9005: returning from launchExecutorsOnLambda for $currentExecutorId")
279-
}
280-
true // TODO: Return true/false properly
281-
}
282-
}
283-
284-
private def launchExecutorsOnLambda2(newExecutorsNeeded: Int) : Future[Boolean] = {
285210
Future {
286211
// TODO: Can we launch in parallel?
287212
// TODO: Can we track each thread separately and audit
@@ -311,29 +236,30 @@ private[spark] class LambdaSchedulerBackend(
311236
val containerId = applicationId() + "_%08d".format(currentExecutorId)
312237

313238
val javaPartialCommandLine = s"java -cp ${classpaths} " +
314-
"-server -Xmx1400m " +
239+
s"-server -Xmx${lambdaContainerMemory}m " +
315240
"-Djava.net.preferIPv4Stack=true " +
316241
s"-Dspark.driver.port=${port} " +
317-
// "-Dspark.blockManager.port=12345 " +
318242
"-Dspark.dynamicAllocation.enabled=true " +
319243
"-Dspark.shuffle.service.enabled=false "
244+
320245
val executorPartialCommandLine = "org.apache.spark.executor.CoarseGrainedExecutorBackend " +
321246
s"--driver-url spark://CoarseGrainedScheduler@${hostname}:${port} " +
322247
s"--executor-id ${currentExecutorId} " +
323248
"--hostname LAMBDA " +
324249
"--cores 1 " +
325250
s"--app-id ${applicationId()} " +
326251
s"--container-id ${containerId} " +
327-
s"--container-size ${lambdaContainerMemoryBytes} " +
252+
s"--container-size ${lambdaContainerMemory} " +
328253
"--user-class-path file:/tmp/lambda/* "
254+
329255
val commandLine = javaPartialCommandLine + executorPartialCommandLine
330256

331257
val request = new LambdaRequestPayload(
332-
sparkS3Bucket = "bharatb",
258+
sparkS3Bucket = lambdaBucket.get,
333259
sparkS3Key = s"lambda/spark-small-${s3SparkVersion}.zip",
334-
hadoop2S3Bucket = "bharatb",
260+
hadoop2S3Bucket = lambdaBucket.get,
335261
hadoop2S3Key = s"lambda/hadoop2-small-${s3SparkVersion}.zip",
336-
hive12S3Bucket = "bharatb",
262+
hive12S3Bucket = lambdaBucket.get,
337263
hive12S3Key = s"lambda/hive1.2-small-${s3SparkVersion}.zip",
338264
sparkDriverHostname = hostname,
339265
sparkDriverPort = port,
@@ -349,9 +275,7 @@ private[spark] class LambdaSchedulerBackend(
349275
limiter.acquire()
350276
logDebug(s"LAMBDA: 9050.1: LambdaRequesterThread started $executorId")
351277
numLambdaCallsPending.addAndGet(1)
352-
// TODO: Can we reuse the same client across calls?
353-
val lambdaClient = AWSLambdaClientBuilder.standard()
354-
.withClientConfiguration(clientConfig).build()
278+
355279
val invokeRequest = new InvokeRequest
356280
try {
357281
invokeRequest.setFunctionName(lambdaFunctionName)
@@ -396,11 +320,11 @@ private[spark] class LambdaSchedulerBackend(
396320
if (newExecutorsNeeded <= 0) {
397321
return Future { true }
398322
}
399-
return launchExecutorsOnLambda2(newExecutorsNeeded)
323+
return launchExecutorsOnLambda(newExecutorsNeeded)
400324
}
401325

402326
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
403-
// TODO: Fill this function
327+
// TODO: Right now not implemented
404328
logDebug(s"LAMBDA: 10200: doKillExecutors: $executorIds")
405329
Future {
406330
executorIds.foreach { x =>

core/src/main/scala/org/apache/spark/shuffle/S3ShuffleBlockResolver.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,10 @@ private[spark] class S3ShuffleBlockResolver(
4040
_blockManager: BlockManager = null)
4141
extends IndexShuffleBlockResolver(conf, _blockManager = null)
4242
with Logging {
43-
val shuffleOverS3 = conf.getBoolean("spark.shuffle.s3.enabled", false)
44-
val s3PrefixLocation = conf.get("spark.qubole.s3PrefixLocation",
45-
"s3://dev.canopydata.com/vsowrira/")
46-
4743
private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)
4844

45+
val shuffleS3Bucket = BlockManager.getS3Bucket(conf)
46+
4947
private lazy val hadoopConf = BlockManager.getHadoopConf(conf)
5048

5149
private lazy val hadoopFileSystem = BlockManager.getHadoopFileSystem(conf)
@@ -84,7 +82,7 @@ private[spark] class S3ShuffleBlockResolver(
8482
* If so, return the partition lengths in the data file. Otherwise return null.
8583
*/
8684
private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
87-
val indexFilePath = Utils.localFileToS3(s3PrefixLocation, index)
85+
val indexFilePath = Utils.localFileToS3(shuffleS3Bucket, index)
8886
try {
8987
if (hadoopFileSystem.getFileStatus(indexFilePath).getLen != (blocks + 1) * 8) {
9088
return null
@@ -124,7 +122,7 @@ private[spark] class S3ShuffleBlockResolver(
124122
in.close()
125123
}
126124

127-
val dataPath = Utils.localFileToS3(s3PrefixLocation, data)
125+
val dataPath = Utils.localFileToS3(shuffleS3Bucket, data)
128126

129127
if (hadoopFileSystem.getFileStatus(dataPath).getLen == lengths.sum) {
130128
logInfo(s"${dataPath} lengths (${lengths.sum}) match with index file length")
@@ -153,8 +151,8 @@ private[spark] class S3ShuffleBlockResolver(
153151
val indexFile = getIndexFile(shuffleId, mapId)
154152
val indexTmp = Utils.tempFileWith(indexFile)
155153

156-
val indexFilePath = Utils.localFileToS3(s3PrefixLocation, indexFile)
157-
val indexTmpPath = Utils.localFileToS3(s3PrefixLocation, indexTmp)
154+
val indexFilePath = Utils.localFileToS3(shuffleS3Bucket, indexFile)
155+
val indexTmpPath = Utils.localFileToS3(shuffleS3Bucket, indexTmp)
158156

159157
try {
160158
val outputStream = hadoopFileSystem.create(indexTmpPath)
@@ -171,8 +169,8 @@ private[spark] class S3ShuffleBlockResolver(
171169
}
172170

173171
val dataFile = getDataFile(shuffleId, mapId)
174-
val dataFilePath = Utils.localFileToS3(s3PrefixLocation, dataFile)
175-
val dataTmpPath = Utils.localFileToS3(s3PrefixLocation, dataTmp)
172+
val dataFilePath = Utils.localFileToS3(shuffleS3Bucket, dataFile)
173+
val dataTmpPath = Utils.localFileToS3(shuffleS3Bucket, dataTmp)
176174
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
177175
// the following check and rename are atomic.
178176
synchronized {
@@ -238,15 +236,15 @@ private[spark] class S3ShuffleBlockResolver(
238236
}
239237

240238
val indexFile = getFile(executorLocalDirs, subDirs, shuffleIndexFile)
241-
val indexFilePath = Utils.localFileToS3(s3PrefixLocation, indexFile)
239+
val indexFilePath = Utils.localFileToS3(shuffleS3Bucket, indexFile)
242240
val in = hadoopFileSystem.open(indexFilePath)
243241

244242
try {
245243
ByteStreams.skipFully(in, reduceId * 8)
246244
val offset = in.readLong()
247245
val nextOffset = in.readLong()
248246
val dataFile = getFile(executorLocalDirs, subDirs, shuffleDataFile)
249-
val dataFilePath = Utils.localFileToS3(s3PrefixLocation, dataFile)
247+
val dataFilePath = Utils.localFileToS3(shuffleS3Bucket, dataFile)
250248

251249
logDebug("S3Segment managed buffer created for path - " + dataFilePath.toString)
252250

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
2222
import org.apache.spark._
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.shuffle._
25+
import org.apache.spark.storage.BlockManager
2526

2627
/**
2728
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
@@ -79,7 +80,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
7980
*/
8081
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
8182

82-
val shuffleOverS3 = conf.getBoolean("spark.shuffle.s3.enabled", false)
83+
val shuffleOverS3 = BlockManager.shuffleOverS3Enabled(conf)
8384

8485
override val shuffleBlockResolver = if (shuffleOverS3) {
8586
new S3ShuffleBlockResolver(conf)

0 commit comments

Comments
 (0)