Skip to content

Branch 2.3 merge #261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
49e1eb8
[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when …
patrickbrownsync Nov 1, 2018
0c7d82b
[SPARK-25933][DOCUMENTATION] Fix pstats.Stats() reference in configur…
Nov 3, 2018
7a59618
[SPARK-26011][SPARK-SUBMIT] Yarn mode pyspark app without python main…
shanyu Nov 15, 2018
550408e
[SPARK-25934][MESOS] Don't propagate SPARK_CONF_DIR from spark submit
mpmolek Nov 16, 2018
90e4dd1
[MINOR][SQL] Fix typo in CTAS plan database string
dongjoon-hyun Nov 17, 2018
0fb830c
[SPARK-26084][SQL] Fixes unresolved AggregateExpression.references ex…
ssimeonov Nov 20, 2018
8b6504e
[SPARK-26109][WEBUI] Duration in the task summary metrics table and t…
shahidki31 Nov 21, 2018
62010d6
[SPARK-26118][BACKPORT-2.3][WEB UI] Introducing spark.ui.requestHeade…
attilapiros Nov 22, 2018
de5f489
[SPARK-25786][CORE] If the ByteBuffer.hasArray is false , it will thr…
10110346 Nov 24, 2018
96a5a12
[SPARK-26137][CORE] Use Java system property "file.separator" inste…
Nov 28, 2018
e96ba84
[SPARK-26211][SQL] Fix InSet for binary, and struct and array with null.
ueshin Nov 29, 2018
4ee463a
[SPARK-26201] Fix python broadcast with encryption
Nov 30, 2018
0058986
[MINOR][DOC] Correct some document description errors
10110346 Dec 1, 2018
8236f64
[SPARK-26198][SQL] Fix Metadata serialize null values throw NPE
wangyum Dec 2, 2018
1899dd2
[SPARK-26233][SQL][BACKPORT-2.3] CheckOverflow when encoding a decima…
mgaido91 Dec 5, 2018
3772d93
[SPARK-26307][SQL] Fix CTAS when INSERT a partitioned table using Hiv…
gatorsmile Dec 10, 2018
7930fbd
[SPARK-26327][SQL][BACKPORT-2.3] Bug fix for `FileSourceScanExec` met…
xuanyuanking Dec 14, 2018
20558f7
[SPARK-26315][PYSPARK] auto cast threshold from Integer to Float in a…
Dec 15, 2018
1576bd7
[SPARK-26352][SQL] join reorder should not change the order of output…
rednaxelafx Dec 17, 2018
bccefa5
[SPARK-26352][SQL][FOLLOWUP-2.3] Fix missing sameOutput in branch-2.3
rednaxelafx Dec 17, 2018
35c4235
[SPARK-26316][SPARK-21052][BRANCH-2.3] Revert hash join metrics in th…
JkSelf Dec 18, 2018
832812e
[SPARK-26394][CORE] Fix annotation error for Utils.timeStringAsMs
Dec 18, 2018
a22a11b
[SPARK-24687][CORE] Avoid job hanging when generate task binary cause…
caneGuy Dec 20, 2018
b4aeb81
[SPARK-26422][R] Support to disable Hive support in SparkR even for H…
HyukjinKwon Dec 21, 2018
a7d50ae
[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consi…
mgaido91 Dec 21, 2018
d9d3bea
Revert "[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter shou…
dongjoon-hyun Dec 22, 2018
acf20d2
[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consi…
mgaido91 Dec 23, 2018
acbfb31
[SPARK-26444][WEBUI] Stage color doesn't change with it's status
seancxmao Dec 28, 2018
c3d759f
[SPARK-26496][SS][TEST] Avoid to use Random.nextString in StreamingIn…
HyukjinKwon Dec 29, 2018
70a99ba
[SPARK-25591][PYSPARK][SQL][BRANCH-2.3] Avoid overwriting deserialize…
viirya Jan 3, 2019
30a811b
[SPARK-26019][PYSPARK] Allow insecure py4j gateways
squito Jan 3, 2019
30b82a3
[MINOR][NETWORK][TEST] Fix TransportFrameDecoderSuite to use ByteBuf …
dongjoon-hyun Jan 4, 2019
3501e3b
Merge branch 'branch-2.3' of github.com:apache/spark into csd-2.3
markhamstra Jan 5, 2019
d618d27
[SPARK-26078][SQL][BACKPORT-2.3] Dedup self-join attributes on IN sub…
mgaido91 Jan 5, 2019
64fce5c
[SPARK-26545] Fix typo in EqualNullSafe's truth table comment
rednaxelafx Jan 5, 2019
bb52170
[SPARK-26537][BUILD][BRANCH-2.3] change git-wip-us to gitbox
shaneknapp Jan 6, 2019
38fe12b
[SPARK-25253][PYSPARK][FOLLOWUP] Undefined name: from pyspark.util im…
Aug 30, 2018
9052a5e
[MINOR][BUILD] Fix script name in `release-tag.sh` usage message
dongjoon-hyun Jan 7, 2019
80ed747
Merge branch 'branch-2.3' of github.com:apache/spark into branch-2.3-…
markhamstra Jan 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network.util;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -69,7 +68,7 @@ public void testInterception() throws Exception {
decoder.channelRead(ctx, len);
decoder.channelRead(ctx, dataBuf);
verify(interceptor, times(interceptedReads)).handle(any(ByteBuf.class));
verify(ctx).fireChannelRead(any(ByteBuffer.class));
verify(ctx).fireChannelRead(any(ByteBuf.class));
assertEquals(0, len.refCnt());
assertEquals(0, dataBuf.refCnt());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ rect.getting-result-time-proportion {
cursor: pointer;
}

.vis-timeline .vis-item.stage.succeeded {
.vis-timeline .vis-item.stage.complete {
background-color: #A0DFFF;
border-color: #3EC0FF;
}

.vis-timeline .vis-item.stage.succeeded.vis-selected {
.vis-timeline .vis-item.stage.complete.vis-selected {
background-color: #A0DFFF;
border-color: #3EC0FF;
z-index: auto;
Expand All @@ -130,12 +130,12 @@ rect.getting-result-time-proportion {
stroke: #FF4D6D;
}

.vis-timeline .vis-item.stage.running {
.vis-timeline .vis-item.stage.active {
background-color: #A2FCC0;
border-color: #36F572;
}

.vis-timeline .vis-item.stage.running.vis-selected {
.vis-timeline .vis-item.stage.active.vis-selected {
background-color: #A2FCC0;
border-color: #36F572;
z-index: auto;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ private[spark] object PythonGatewayServer extends Logging {
// with the same secret, in case the app needs callbacks from the JVM to the underlying
// python processes.
val localhost = InetAddress.getLoopbackAddress()
val gatewayServer: GatewayServer = new GatewayServer.GatewayServerBuilder()
.authToken(secret)
val builder = new GatewayServer.GatewayServerBuilder()
.javaPort(0)
.javaAddress(localhost)
.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
.build()
if (sys.env.getOrElse("_PYSPARK_CREATE_INSECURE_GATEWAY", "0") != "1") {
builder.authToken(secret)
} else {
assert(sys.env.getOrElse("SPARK_TESTING", "0") == "1",
"Creating insecure Java gateways only allowed for testing")
}
val gatewayServer: GatewayServer = builder.build()

gatewayServer.start()
val boundPort: Int = gatewayServer.getListeningPort
Expand Down
35 changes: 29 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,10 @@ private[spark] class PythonAccumulatorV2(
if (socket == null || socket.isClosed) {
socket = new Socket(serverHost, serverPort)
logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort")
// send the secret just for the initial authentication when opening a new connection
socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
if (secretToken != null) {
// send the secret just for the initial authentication when opening a new connection
socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
}
}
socket
}
Expand Down Expand Up @@ -639,6 +641,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
with Logging {

private var encryptionServer: PythonServer[Unit] = null
private var decryptionServer: PythonServer[Unit] = null

/**
* Read data from disks, then copy it to `out`
Expand Down Expand Up @@ -687,16 +690,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
override def handleConnection(sock: Socket): Unit = {
val env = SparkEnv.get
val in = sock.getInputStream()
val dir = new File(Utils.getLocalDir(env.conf))
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path))
val abspath = new File(path).getAbsolutePath
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath))
DechunkedInputStream.dechunkAndCopyToOutput(in, out)
}
}
Array(encryptionServer.port, encryptionServer.secret)
}

def setupDecryptionServer(): Array[Any] = {
decryptionServer = new PythonServer[Unit]("broadcast-decrypt-server-for-driver") {
override def handleConnection(sock: Socket): Unit = {
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()))
Utils.tryWithSafeFinally {
val in = SparkEnv.get.serializerManager.wrapForEncryption(new FileInputStream(path))
Utils.tryWithSafeFinally {
Utils.copyStream(in, out, false)
} {
in.close()
}
out.flush()
} {
JavaUtils.closeQuietly(out)
}
}
}
Array(decryptionServer.port, decryptionServer.secret)
}

def waitTillBroadcastDataSent(): Unit = decryptionServer.getResult()

def waitTillDataReceived(): Unit = encryptionServer.getResult()
}
// scalastyle:on no.finalize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ private[deploy] object DependencyUtils {
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
val targetDir = Utils.createTempDir()
val userJarName = userJar.split(File.separatorChar).last
Option(jars)
.map {
resolveGlobPaths(_, hadoopConf)
.split(",")
.filterNot(_.contains(userJar.split("/").last))
.filterNot(_.contains(userJarName))
.mkString(",")
}
.filterNot(_ == "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ object SparkSubmit extends CommandLineUtils with Logging {

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
}

private[spark] object RestSubmissionClient {

// SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong
// on the remote machine (SPARK-12345) (SPARK-25934)
private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
private val REPORT_DRIVER_STATUS_INTERVAL = 1000
private val REPORT_DRIVER_STATUS_MAX_TRIES = 10
val PROTOCOL_VERSION = "v1"
Expand All @@ -403,9 +407,7 @@ private[spark] object RestSubmissionClient {
*/
private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = {
env.filterKeys { k =>
// SPARK_HOME is filtered out because it is usually wrong on the remote machine (SPARK-12345)
(k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") ||
k.startsWith("MESOS_")
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ package object config {
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
.intConf
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.checkValue(_ > 0, "The capacity of listener bus event queue must be positive")
.createWithDefault(10000)

private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED =
Expand Down Expand Up @@ -436,7 +436,8 @@ package object config {
"made in creating intermediate shuffle files.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
"The file buffer size must be greater than 0 and less than or equal to " +
s"${Int.MaxValue / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
Expand All @@ -445,7 +446,8 @@ package object config {
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
"The buffer size must be greater than 0 and less than or equal to " +
s"${Int.MaxValue / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
Expand Down Expand Up @@ -499,6 +501,12 @@ package object config {
.stringConf
.createOptional

private[spark] val UI_REQUEST_HEADER_SIZE =
ConfigBuilder("spark.ui.requestHeaderSize")
.doc("Value for HTTP request header size in bytes.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8k")

private[spark] val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners")
.doc("Class names of listeners to add to SparkContext during initialization.")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,9 +1059,11 @@ class DAGScheduler(

// Abort execution
return
case NonFatal(e) =>
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage

// Abort execution
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.collection.CompactBuffer

/**
Expand Down Expand Up @@ -358,7 +358,12 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boole
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val kryo = borrowKryo()
try {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
releaseKryo(kryo)
Expand All @@ -370,7 +375,12 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boole
val oldClassLoader = kryo.getClassLoader
try {
kryo.setClassLoader(loader)
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
kryo.setClassLoader(oldClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,16 +950,6 @@ private[spark] class AppStatusListener(
kvstore.delete(e.getClass(), e.id)
}

val tasks = kvstore.view(classOf[TaskDataWrapper])
.index("stage")
.first(key)
.last(key)
.asScala

tasks.foreach { t =>
kvstore.delete(t.getClass(), t.taskId)
}

// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
Expand All @@ -982,6 +972,15 @@ private[spark] class AppStatusListener(

cleanupCachedQuantiles(key)
}

// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow
val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala
val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet
tasks.foreach { t =>
if (keys.contains((t.stageId, t.stageAttemptId))) {
kvstore.delete(t.getClass(), t.taskId)
}
}
}

private def cleanupTasks(stage: LiveStage): Unit = {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,15 @@ private[spark] object JettyUtils extends Logging {

(connector, connector.getLocalPort())
}
val httpConfig = new HttpConfiguration()
httpConfig.setRequestHeaderSize(conf.get(UI_REQUEST_HEADER_SIZE).toInt)

// If SSL is configured, create the secure connector first.
val securePort = sslOptions.createJettySslContextFactory().map { factory =>
val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0)
val secureServerName = if (serverName.nonEmpty) s"$serverName (HTTPS)" else serverName
val connectionFactories = AbstractConnectionFactory.getFactories(factory,
new HttpConnectionFactory())
new HttpConnectionFactory(httpConfig))

def sslConnect(currentPort: Int): (ServerConnector, Int) = {
newConnector(connectionFactories, currentPort)
Expand All @@ -376,7 +378,7 @@ private[spark] object JettyUtils extends Logging {

// Bind the HTTP port.
def httpConnect(currentPort: Int): (ServerConnector, Int) = {
newConnector(Array(new HttpConnectionFactory()), currentPort)
newConnector(Array(new HttpConnectionFactory(httpConfig)), currentPort)
}

val (httpConnector, httpPort) = Utils.startServiceOnPort[ServerConnector](port, httpConnect,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
val stageId = stage.stageId
val attemptId = stage.attemptId
val name = stage.name
val status = stage.status.toString
val status = stage.status.toString.toLowerCase(Locale.ROOT)
val submissionTime = stage.submissionTime.get.getTime()
val completionTime = stage.completionTime.map(_.getTime())
.getOrElse(System.currentTimeMillis())
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ private[ui] class TaskPagedTable(
</div>
</td>
<td>{UIUtils.formatDate(task.launchTime)}</td>
<td>{formatDuration(task.duration)}</td>
<td>{formatDuration(task.taskMetrics.map(_.executorRunTime))}</td>
<td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
{UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))}
</td>
Expand Down Expand Up @@ -993,7 +993,9 @@ private[ui] object ApiHelper {
HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
HEADER_HOST -> TaskIndexNames.HOST,
HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME,
HEADER_DURATION -> TaskIndexNames.DURATION,
// SPARK-26109: Duration of task as executorRunTime to make it consistent with the
// aggregated tasks summary metrics table and the previous versions of Spark.
HEADER_DURATION -> TaskIndexNames.EXEC_RUN_TIME,
HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY,
HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME,
HEADER_GC_TIME -> TaskIndexNames.GC_TIME,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ private[spark] object Utils extends Logging {
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
def timeStringAsMs(str: String): Long = {
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,25 @@ class SparkSubmitSuite
}
}

test("remove copies of application jar from classpath") {
val fs = File.separator
val sparkConf = new SparkConf(false)
val hadoopConf = new Configuration()
val secMgr = new SecurityManager(sparkConf)

val appJarName = "myApp.jar"
val jar1Name = "myJar1.jar"
val jar2Name = "myJar2.jar"
val userJar = s"file:/path${fs}to${fs}app${fs}jar$fs$appJarName"
val jars = s"file:/$jar1Name,file:/$appJarName,file:/$jar2Name"

val resolvedJars = DependencyUtils
.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf, secMgr)

assert(!resolvedJars.contains(appJarName))
assert(resolvedJars.contains(jar1Name) && resolvedJars.contains(jar2Name))
}

test("Avoid re-upload remote resources in yarn client mode") {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
Expand Down
Loading