Skip to content

SPY-875: backported SPARK-11863 and merged Apache branch-1.5 #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Dec 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b767cee
[SPARK-11191][SPARK-11311][SQL] Backports #9664 and #9277 to branch-1.5
liancheng Nov 15, 2015
bf79a17
[SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds
cloud-fan Nov 16, 2015
51fc152
[SPARK-10181][SQL] Do kerberos login for credentials during hive clie…
yolandagao Nov 15, 2015
cf36cdb
[SPARK-11191][SQL][FOLLOW-UP] Cleans up unnecessary anonymous HiveFun…
liancheng Nov 17, 2015
bdcbbda
[SPARK-11786][CORE] Tone down messages from akka error monitor.
Nov 17, 2015
e26dc96
[SPARK-11740][STREAMING] Fix the race condition of two checkpoints in…
zsxwing Nov 17, 2015
f33e277
[HOTFIX][STREAMING] Add mockito to fix the compilation error
zsxwing Nov 18, 2015
f7a7230
[SPARK-11737] [SQL] Fix serialization of UTF8String with Kyro
Nov 18, 2015
0ed6d9c
[SPARK-11652][CORE] Remote code execution with InvokerTransformer
srowen Nov 18, 2015
4b8dc25
rmse was wrongly calculated
Nov 18, 2015
f802b07
[SPARK-11195][CORE] Use correct classloader for TaskResultGetter
Nov 18, 2015
0439e32
[SPARK-11813][MLLIB] Avoid serialization of vocab in Word2Vec
hhbyyh Nov 18, 2015
9957925
[SPARK-11649] Properly set Akka frame size in SparkListenerSuite test
JoshRosen Nov 18, 2015
001c446
[SPARK-11812][PYSPARK] invFunc=None works properly with python's redu…
dtolpin Nov 19, 2015
6fe1ce6
[SPARK-11831][CORE][TESTS] Use port 0 to avoid port conflicts in test…
zsxwing Nov 20, 2015
9a906c1
[SPARK-11817][SQL] Truncating the fractional seconds to prevent inser…
viirya Nov 20, 2015
e9ae1fd
[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in Transform…
zsxwing Nov 20, 2015
27b5f31
[SPARK-11836][SQL] use existing SQLContext for udf/cast (1.5 branch)
Nov 23, 2015
4139a4e
[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associa…
markhamstra Nov 25, 2015
b1fcefc
[SPARK-11974][CORE] Not all the temp dirs had been deleted when the J…
pzzs Nov 25, 2015
7900d19
[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThr…
zsxwing Nov 26, 2015
9b62161
[SPARK-12053][CORE] EventLoggingListener.getLogPath needs 4 parameters
chutium Nov 30, 2015
d78f1bc
[SPARK-12049][CORE] User JVM shutdown hook can cause deadlock at shut…
srowen Dec 1, 2015
80dac0b
Set SPARK_EC2_VERSION to 1.5.2
apivovarov Dec 1, 2015
f28399e
[SPARK-11328][SQL] Improve error message when hitting this issue
nongli Dec 1, 2015
fc3fb84
[SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC …
Dec 1, 2015
7460e43
[SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.
nongli Dec 1, 2015
4f07a59
[SPARK-11352][SQL][BRANCH-1.5] Escape */ in the generated comments.
yhuai Dec 2, 2015
0d57a4a
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAs…
tdas Dec 2, 2015
ed7264b
[SPARK-12090] [PYSPARK] consider shuffle in coalesce()
Dec 2, 2015
8bbb3cd
[SPARK-12048][SQL] Prevent to close JDBC resources twice
Dec 6, 2015
93a0510
[SPARK-12138][SQL] Escape \u in the generated comments of codegen
gatorsmile Dec 6, 2015
3868ab6
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worke…
zsxwing Dec 7, 2015
2f30927
[SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib - 1.5 backport
jkbradley Dec 8, 2015
4b99f72
[SPARK-11652][CORE] Remote code execution with InvokerTransformer
srowen Dec 8, 2015
df8fd5a
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Dec 9, 2015
2c0c771
[SPARK-11863][SQL] Unable to resolve order by if it contains mixture …
dilipbiswal Nov 26, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.nio.file.Paths
import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -78,15 +79,15 @@ private[spark] object TestUtils {
}

/**
* Create a jar file that contains this set of files. All files will be located at the root
* of the jar.
* Create a jar file that contains this set of files. All files will be located in the specified
* directory or at the root of the jar.
*/
def createJar(files: Seq[File], jarFile: File): URL = {
def createJar(files: Seq[File], jarFile: File, directoryPrefix: Option[String] = None): URL = {
val jarFileStream = new FileOutputStream(jarFile)
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())

for (file <- files) {
val jarEntry = new JarEntry(file.getName)
val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), file.getName).toString)
jarStream.putNextEntry(jarEntry)

val in = new FileInputStream(file)
Expand Down Expand Up @@ -118,7 +119,7 @@ private[spark] object TestUtils {
classpathUrls: Seq[URL]): File = {
val compiler = ToolProvider.getSystemJavaCompiler

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// Calling this outputs a class file in pwd. It's easier to just rename the files than
// build a custom FileManager that controls the output location.
val options = if (classpathUrls.nonEmpty) {
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}

import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
import org.apache.spark.{SparkException, SparkUserAppException, SPARK_VERSION}
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
Expand Down Expand Up @@ -521,8 +521,19 @@ object SparkSubmit {
sysProps.put("spark.yarn.isPython", "true")
}
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when the keytab is specified")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@ private[spark] class AppClient(
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
private val registerMasterThreadPool = new ThreadPoolExecutor(
0,
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueue[Runnable](),
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"appclient-register-master-threadpool",
masterRpcAddresses.length // Make sure we can register with all masters at the same time
)

// A scheduled executor for scheduling the registration actions
private val registrationRetryThread =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ private[deploy] class Master(
}

val eventLogFilePrefix = EventLoggingListener.getLogPath(
eventLogDir, app.id, app.desc.eventLogCodec)
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,10 @@ private[deploy] class Worker(
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
private val registerMasterThreadPool = new ThreadPoolExecutor(
0,
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueue[Runnable](),
ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"worker-register-master-threadpool",
masterRpcAddresses.size // Make sure we can register with all masters at the same time
)

var coresUsed = 0
var memoryUsed = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private[akka] class ErrorMonitor extends Actor with ActorLogReceive with Logging
}

override def receiveWithLogging: Actor.Receive = {
case Error(cause: Throwable, _, _, message: String) => logError(message, cause)
case Error(cause: Throwable, _, _, message: String) => logDebug(message, cause)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,9 @@ class DAGScheduler(
stage.resetInternalAccumulators()
}

val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down Expand Up @@ -913,7 +915,7 @@ class DAGScheduler(
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
try {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
val loader = Utils.getContextOrSparkClassLoader
try {
if (serializedData != null && serializedData.limit() > 0) {
reason = serializer.get().deserialize[TaskEndReason](
serializedData, Utils.getSparkClassLoader)
serializedData, loader)
}
} catch {
case cnd: ClassNotFoundException =>
// Log an error but keep going here -- the task failed, so not catastrophic
// if we can't deserialize the reason.
val loader = Utils.getContextOrSparkClassLoader
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
case ex: Exception => {}
Expand Down
37 changes: 19 additions & 18 deletions core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging {
// Add a shutdown hook to delete the temp dirs when the JVM exits
addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
logInfo("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
// we need to materialize the paths to delete because deleteRecursively removes items from
// shutdownDeletePaths as we are traversing through it.
shutdownDeletePaths.toArray.foreach { dirPath =>
try {
logInfo("Deleting directory " + dirPath)
Utils.deleteRecursively(new File(dirPath))
Expand Down Expand Up @@ -204,7 +206,7 @@ private[spark] object ShutdownHookManager extends Logging {
private [util] class SparkShutdownHookManager {

private val hooks = new PriorityQueue[SparkShutdownHook]()
private var shuttingDown = false
@volatile private var shuttingDown = false

/**
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
Expand All @@ -228,28 +230,27 @@ private [util] class SparkShutdownHookManager {
}
}

def runAll(): Unit = synchronized {
def runAll(): Unit = {
shuttingDown = true
while (!hooks.isEmpty()) {
Try(Utils.logUncaughtExceptions(hooks.poll().run()))
var nextHook: SparkShutdownHook = null
while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
Try(Utils.logUncaughtExceptions(nextHook.run()))
}
}

def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
checkState()
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
hookRef
}

def remove(ref: AnyRef): Boolean = synchronized {
hooks.remove(ref)
def add(priority: Int, hook: () => Unit): AnyRef = {
hooks.synchronized {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
}
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
hookRef
}
}

private def checkState(): Unit = {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
}
def remove(ref: AnyRef): Boolean = {
hooks.synchronized { hooks.remove(ref) }
}

}
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = {
def newDaemonCachedThreadPool(
prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
new ThreadPoolExecutor(
0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory)
val threadPool = new ThreadPoolExecutor(
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable],
threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
}

/**
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {

override def beforeAll(): Unit = {
val conf = new SparkConf()
env = createRpcEnv(conf, "local", 12345)
env = createRpcEnv(conf, "local", 0)
}

override def afterAll(): Unit = {
Expand Down Expand Up @@ -75,7 +75,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})

val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely")
try {
Expand Down Expand Up @@ -130,7 +130,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})

val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely")
try {
Expand Down Expand Up @@ -158,7 +158,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val shortProp = "spark.rpc.short.timeout"
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.numRetries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
val anotherEnv = createRpcEnv(conf, "remote", 0)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
try {
Expand Down Expand Up @@ -420,7 +420,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})

val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely")
try {
Expand Down Expand Up @@ -460,7 +460,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})

val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-remotely-error")
Expand Down Expand Up @@ -500,7 +500,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {

})

val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "network-events")
Expand Down Expand Up @@ -529,7 +529,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})

val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-unserializable-error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
})
val conf = new SparkConf()
val newRpcEnv = new AkkaRpcEnvFactory().create(
RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf)))
RpcEnvConfig(conf, "test", "localhost", 0, new SecurityManager(conf)))
try {
val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint")
assert(s"akka.tcp://local@${env.address}/user/test_endpoint" ===
Expand All @@ -56,7 +56,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
val conf = SSLSampleConfigs.sparkSSLConfig()
val securityManager = new SecurityManager(conf)
val rpcEnv = new AkkaRpcEnvFactory().create(
RpcEnvConfig(conf, "test", "localhost", 12346, securityManager))
RpcEnvConfig(conf, "test", "localhost", 0, securityManager))
try {
val uri = rpcEnv.uriOf("local", RpcAddress("1.2.3.4", 12345), "test_endpoint")
assert("akka.ssl.tcp://local@1.2.3.4:12345/user/test_endpoint" === uri)
Expand Down
Loading