Skip to content

Commit da4856e

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-25875
2 parents 4152c76 + e9d3ca0 commit da4856e

File tree

100 files changed

+2348
-1262
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+2348
-1262
lines changed

R/pkg/tests/fulltests/test_sparkSQL_eager.R

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,30 @@ context("test show SparkDataFrame when eager execution is enabled.")
2222
test_that("eager execution is not enabled", {
2323
# Start Spark session without eager execution enabled
2424
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
25-
25+
2626
df <- createDataFrame(faithful)
2727
expect_is(df, "SparkDataFrame")
2828
expected <- "eruptions:double, waiting:double"
2929
expect_output(show(df), expected)
30-
30+
3131
# Stop Spark session
3232
sparkR.session.stop()
3333
})
3434

3535
test_that("eager execution is enabled", {
3636
# Start Spark session with eager execution enabled
3737
sparkConfig <- list(spark.sql.repl.eagerEval.enabled = "true")
38-
38+
3939
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE, sparkConfig = sparkConfig)
40-
40+
4141
df <- createDataFrame(faithful)
4242
expect_is(df, "SparkDataFrame")
4343
expected <- paste0("(+---------+-------+\n",
4444
"|eruptions|waiting|\n",
4545
"+---------+-------+\n)*",
4646
"(only showing top 20 rows)")
4747
expect_output(show(df), expected)
48-
48+
4949
# Stop Spark session
5050
sparkR.session.stop()
5151
})
@@ -55,9 +55,9 @@ test_that("eager execution is enabled with maxNumRows and truncate set", {
5555
sparkConfig <- list(spark.sql.repl.eagerEval.enabled = "true",
5656
spark.sql.repl.eagerEval.maxNumRows = as.integer(5),
5757
spark.sql.repl.eagerEval.truncate = as.integer(2))
58-
58+
5959
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE, sparkConfig = sparkConfig)
60-
60+
6161
df <- arrange(createDataFrame(faithful), "waiting")
6262
expect_is(df, "SparkDataFrame")
6363
expected <- paste0("(+---------+-------+\n",
@@ -66,7 +66,7 @@ test_that("eager execution is enabled with maxNumRows and truncate set", {
6666
"| 1.| 43|\n)*",
6767
"(only showing top 5 rows)")
6868
expect_output(show(df), expected)
69-
69+
7070
# Stop Spark session
7171
sparkR.session.stop()
7272
})

bin/pyspark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export PYSPARK_PYTHON
5757

5858
# Add the PySpark classes to the Python path:
5959
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
60-
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.7-src.zip:$PYTHONPATH"
60+
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.8.1-src.zip:$PYTHONPATH"
6161

6262
# Load the PySpark shell.py script when ./pyspark is used interactively:
6363
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"

bin/pyspark2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
3030
)
3131

3232
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
33-
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.7-src.zip;%PYTHONPATH%
33+
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.8.1-src.zip;%PYTHONPATH%
3434

3535
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
3636
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@
350350
<dependency>
351351
<groupId>net.sf.py4j</groupId>
352352
<artifactId>py4j</artifactId>
353-
<version>0.10.7</version>
353+
<version>0.10.8.1</version>
354354
</dependency>
355355
<dependency>
356356
<groupId>org.apache.spark</groupId>

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,9 @@ private[spark] object SparkConf extends Logging {
731731
KEYTAB.key -> Seq(
732732
AlternateConfig("spark.yarn.keytab", "3.0")),
733733
PRINCIPAL.key -> Seq(
734-
AlternateConfig("spark.yarn.principal", "3.0"))
734+
AlternateConfig("spark.yarn.principal", "3.0")),
735+
KERBEROS_RELOGIN_PERIOD.key -> Seq(
736+
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
735737
)
736738

737739
/**

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ private[spark] object PythonUtils {
3232
val pythonPath = new ArrayBuffer[String]
3333
for (sparkHome <- sys.env.get("SPARK_HOME")) {
3434
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
35-
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.10.7-src.zip").mkString(File.separator)
35+
pythonPath +=
36+
Seq(sparkHome, "python", "lib", "py4j-0.10.8.1-src.zip").mkString(File.separator)
3637
}
3738
pythonPath ++= SparkContext.jarOfObject(this)
3839
pythonPath.mkString(File.pathSeparator)

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
21+
import java.lang.reflect.Method
2122
import java.security.PrivilegedExceptionAction
2223
import java.text.DateFormat
2324
import java.util.{Arrays, Comparator, Date, Locale}
@@ -30,7 +31,7 @@ import scala.util.control.NonFatal
3031

3132
import com.google.common.primitives.Longs
3233
import org.apache.hadoop.conf.Configuration
33-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
34+
import org.apache.hadoop.fs._
3435
import org.apache.hadoop.mapred.JobConf
3536
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3637
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -412,20 +413,6 @@ object SparkHadoopUtil {
412413

413414
def get: SparkHadoopUtil = instance
414415

415-
/**
416-
* Given an expiration date for the current set of credentials, calculate the time when new
417-
* credentials should be created.
418-
*
419-
* @param expirationDate Drop-dead expiration date
420-
* @param conf Spark configuration
421-
* @return Timestamp when new credentials should be created.
422-
*/
423-
private[spark] def nextCredentialRenewalTime(expirationDate: Long, conf: SparkConf): Long = {
424-
val ct = System.currentTimeMillis
425-
val ratio = conf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
426-
(ct + (ratio * (expirationDate - ct))).toLong
427-
}
428-
429416
/**
430417
* Returns a Configuration object with Spark configuration applied on top. Unlike
431418
* the instance method, this will always return a Configuration instance, and not a
@@ -471,4 +458,33 @@ object SparkHadoopUtil {
471458
hadoopConf.set(key.substring("spark.hadoop.".length), value)
472459
}
473460
}
461+
462+
// scalastyle:off line.size.limit
463+
/**
464+
* Create a path that uses replication instead of erasure coding (ec), regardless of the default
465+
* configuration in hdfs for the given path. This can be helpful as hdfs ec doesn't support
466+
* hflush(), hsync(), or append()
467+
* https://hadoop.apache.org/docs/r3.0.0/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html#Limitations
468+
*/
469+
// scalastyle:on line.size.limit
470+
def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = {
471+
try {
472+
// Use reflection as this uses apis only avialable in hadoop 3
473+
val builderMethod = fs.getClass().getMethod("createFile", classOf[Path])
474+
val builder = builderMethod.invoke(fs, path)
475+
val builderCls = builder.getClass()
476+
// this may throw a NoSuchMethodException if the path is not on hdfs
477+
val replicateMethod = builderCls.getMethod("replicate")
478+
val buildMethod = builderCls.getMethod("build")
479+
val b2 = replicateMethod.invoke(builder)
480+
buildMethod.invoke(b2).asInstanceOf[FSDataOutputStream]
481+
} catch {
482+
case _: NoSuchMethodException =>
483+
// No createFile() method, we're using an older hdfs client, which doesn't give us control
484+
// over EC vs. replication. Older hdfs doesn't have EC anyway, so just create a file with
485+
// old apis.
486+
fs.create(path)
487+
}
488+
}
489+
474490
}

0 commit comments

Comments
 (0)