Skip to content

Commit d8c60cb

Browse files
committed
Merge branch 'master' into issues/SPARK-25525/not_update_existing_conf
2 parents ac0243a + c3c45cb commit d8c60cb

File tree

58 files changed

+552
-684
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+552
-684
lines changed

R/pkg/R/DataFrame.R

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2954,6 +2954,9 @@ setMethod("exceptAll",
29542954
#' @param source a name for external data source.
29552955
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
29562956
#' save mode (it is 'error' by default)
2957+
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
2958+
#' system. If specified, the output is laid out on the file system similar
2959+
#' to Hive's partitioning scheme.
29572960
#' @param ... additional argument(s) passed to the method.
29582961
#'
29592962
#' @family SparkDataFrame functions
@@ -2965,13 +2968,13 @@ setMethod("exceptAll",
29652968
#' sparkR.session()
29662969
#' path <- "path/to/file.json"
29672970
#' df <- read.json(path)
2968-
#' write.df(df, "myfile", "parquet", "overwrite")
2971+
#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", "col2"))
29692972
#' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
29702973
#' }
29712974
#' @note write.df since 1.4.0
29722975
setMethod("write.df",
29732976
signature(df = "SparkDataFrame"),
2974-
function(df, path = NULL, source = NULL, mode = "error", ...) {
2977+
function(df, path = NULL, source = NULL, mode = "error", partitionBy = NULL, ...) {
29752978
if (!is.null(path) && !is.character(path)) {
29762979
stop("path should be character, NULL or omitted.")
29772980
}
@@ -2985,8 +2988,18 @@ setMethod("write.df",
29852988
if (is.null(source)) {
29862989
source <- getDefaultSqlSource()
29872990
}
2991+
cols <- NULL
2992+
if (!is.null(partitionBy)) {
2993+
if (!all(sapply(partitionBy, function(c) is.character(c)))) {
2994+
stop("All partitionBy column names should be characters.")
2995+
}
2996+
cols <- as.list(partitionBy)
2997+
}
29882998
write <- callJMethod(df@sdf, "write")
29892999
write <- callJMethod(write, "format", source)
3000+
if (!is.null(cols)) {
3001+
write <- callJMethod(write, "partitionBy", cols)
3002+
}
29903003
write <- setWriteOptions(write, path = path, mode = mode, ...)
29913004
write <- handledCallJMethod(write, "save")
29923005
})

R/pkg/R/functions.R

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,9 @@ NULL
198198
#' }
199199
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
200200
#' additional named properties to control how it is converted, accepts the same
201-
#' options as the JSON data source. In \code{arrays_zip}, this contains additional
202-
#' Columns of arrays to be merged.
201+
#' options as the JSON data source. Additionally \code{to_json} supports the "pretty"
202+
#' option which enables pretty JSON generation. In \code{arrays_zip}, this contains
203+
#' additional Columns of arrays to be merged.
203204
#' @name column_collection_functions
204205
#' @rdname column_collection_functions
205206
#' @family collection functions

R/pkg/R/sparkR.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,8 @@ sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-pat
626626
sparkConfToSubmitOps[["spark.master"]] <- "--master"
627627
sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab"
628628
sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal"
629+
sparkConfToSubmitOps[["spark.kerberos.keytab"]] <- "--keytab"
630+
sparkConfToSubmitOps[["spark.kerberos.principal"]] <- "--principal"
629631

630632

631633
# Utility function that returns Spark Submit arguments as a string

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2701,8 +2701,16 @@ test_that("read/write text files", {
27012701
expect_equal(colnames(df2), c("value"))
27022702
expect_equal(count(df2), count(df) * 2)
27032703

2704+
df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
2705+
schema = c("key", "value"))
2706+
textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt")
2707+
write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key")
2708+
df4 <- read.df(textPath3, "text")
2709+
expect_equal(count(df3), count(df4))
2710+
27042711
unlink(textPath)
27052712
unlink(textPath2)
2713+
unlink(textPath3)
27062714
})
27072715

27082716
test_that("read/write text files - compression option", {

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ Property Name | Property group | spark-submit equivalent
157157
`spark.driver.extraClassPath` | Runtime Environment | `--driver-class-path`
158158
`spark.driver.extraJavaOptions` | Runtime Environment | `--driver-java-options`
159159
`spark.driver.extraLibraryPath` | Runtime Environment | `--driver-library-path`
160-
`spark.yarn.keytab` | Application Properties | `--keytab`
161-
`spark.yarn.principal` | Application Properties | `--principal`
160+
`spark.kerberos.keytab` | Application Properties | `--keytab`
161+
`spark.kerberos.principal` | Application Properties | `--principal`
162162

163163
**For Windows users**: Due to different file prefixes across operating systems, to avoid the issue of potential wrong prefix, a current workaround is to specify `spark.sql.warehouse.dir` when starting the `SparkSession`.
164164

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,11 @@ private[spark] object SparkConf extends Logging {
726726
DRIVER_MEMORY_OVERHEAD.key -> Seq(
727727
AlternateConfig("spark.yarn.driver.memoryOverhead", "2.3")),
728728
EXECUTOR_MEMORY_OVERHEAD.key -> Seq(
729-
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3"))
729+
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3")),
730+
KEYTAB.key -> Seq(
731+
AlternateConfig("spark.yarn.keytab", "2.5")),
732+
PRINCIPAL.key -> Seq(
733+
AlternateConfig("spark.yarn.principal", "2.5"))
730734
)
731735

732736
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,10 @@ private[spark] class SparkSubmit extends Logging {
520520
confKey = "spark.driver.extraJavaOptions"),
521521
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
522522
confKey = "spark.driver.extraLibraryPath"),
523+
OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
524+
confKey = PRINCIPAL.key),
525+
OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
526+
confKey = KEYTAB.key),
523527

524528
// Propagate attributes for dependency resolution at the driver side
525529
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
@@ -537,8 +541,6 @@ private[spark] class SparkSubmit extends Logging {
537541
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
538542
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
539543
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
540-
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
541-
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
542544

543545
// Other options
544546
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
199199
numExecutors = Option(numExecutors)
200200
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
201201
queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
202-
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
203-
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
202+
keytab = Option(keytab)
203+
.orElse(sparkProperties.get("spark.kerberos.keytab"))
204+
.orElse(sparkProperties.get("spark.yarn.keytab"))
205+
.orNull
206+
principal = Option(principal)
207+
.orElse(sparkProperties.get("spark.kerberos.principal"))
208+
.orElse(sparkProperties.get("spark.yarn.principal"))
209+
.orNull
204210
dynamicAllocationEnabled =
205211
sparkProperties.get("spark.dynamicAllocation.enabled").exists("true".equalsIgnoreCase)
206212

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.history
1919

2020
import java.io.{File, FileNotFoundException, IOException}
2121
import java.nio.file.Files
22-
import java.nio.file.attribute.PosixFilePermissions
2322
import java.util.{Date, ServiceLoader}
2423
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
2524
import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -133,9 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
133132

134133
// Visible for testing.
135134
private[history] val listing: KVStore = storePath.map { path =>
136-
val perms = PosixFilePermissions.fromString("rwx------")
137-
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath(),
138-
PosixFilePermissions.asFileAttribute(perms)).toFile()
135+
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile()
136+
Utils.chmod700(dbPath)
139137

140138
val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
141139
AppStatusStore.CURRENT_VERSION, logDir.toString())

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.spark.deploy.history
1919

2020
import java.io.File
21-
import java.nio.file.Files
22-
import java.nio.file.attribute.PosixFilePermissions
2321
import java.util.concurrent.atomic.AtomicLong
2422

2523
import scala.collection.JavaConverters._
@@ -107,9 +105,8 @@ private class HistoryServerDiskManager(
107105
val needed = approximateSize(eventLogSize, isCompressed)
108106
makeRoom(needed)
109107

110-
val perms = PosixFilePermissions.fromString("rwx------")
111-
val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore",
112-
PosixFilePermissions.asFileAttribute(perms)).toFile()
108+
val tmp = Utils.createTempDir(tmpStoreDir.getPath(), "appstore")
109+
Utils.chmod700(tmp)
113110

114111
updateUsage(needed)
115112
val current = currentUsage.get()

0 commit comments

Comments
 (0)