Skip to content

Commit bd8b645

Browse files
committed
Merge branch 'master' of github.com:apache/spark into SPARK-23179
2 parents 2c8e2c7 + c36fecc commit bd8b645

File tree

403 files changed

+9413
-2922
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

403 files changed

+9413
-2922
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ project/plugins/src_managed/
6262
project/plugins/target/
6363
python/lib/pyspark.zip
6464
python/deps
65+
python/test_coverage/coverage_data
66+
python/test_coverage/htmlcov
6567
python/pyspark/python
6668
reports/
6769
scalastyle-on-compile.generated.xml

R/pkg/R/DataFrame.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2090,7 +2090,8 @@ setMethod("selectExpr",
20902090
#'
20912091
#' @param x a SparkDataFrame.
20922092
#' @param colName a column name.
2093-
#' @param col a Column expression, or an atomic vector in the length of 1 as literal value.
2093+
#' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
2094+
#' vector in the length of 1 as literal value.
20942095
#' @return A SparkDataFrame with the new column added or the existing column replaced.
20952096
#' @family SparkDataFrame functions
20962097
#' @aliases withColumn,SparkDataFrame,character-method

R/pkg/R/functions.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,9 @@ setMethod("last_day",
10261026
})
10271027

10281028
#' @details
1029-
#' \code{length}: Computes the length of a given string or binary column.
1029+
#' \code{length}: Computes the character length of a string data or number of bytes
1030+
#' of a binary data. The length of string data includes the trailing spaces.
1031+
#' The length of binary data includes binary zeros.
10301032
#'
10311033
#' @rdname column_string_functions
10321034
#' @aliases length length,Column-method

R/pkg/R/mllib_classification.R

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,24 @@ function(object, path, overwrite = FALSE) {
279279
#' savedModel <- read.ml(path)
280280
#' summary(savedModel)
281281
#'
282-
#' # multinomial logistic regression
282+
#' # binary logistic regression against two classes with
283+
#' # upperBoundsOnCoefficients and upperBoundsOnIntercepts
284+
#' ubc <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
285+
#' model <- spark.logit(training, Species ~ .,
286+
#' upperBoundsOnCoefficients = ubc,
287+
#' upperBoundsOnIntercepts = 1.0)
283288
#'
289+
#' # multinomial logistic regression
284290
#' model <- spark.logit(training, Class ~ ., regParam = 0.5)
285291
#' summary <- summary(model)
286292
#'
293+
#' # multinomial logistic regression with
294+
#' # lowerBoundsOnCoefficients and lowerBoundsOnIntercepts
295+
#' lbc <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4)
296+
#' lbi <- as.array(c(0.0, 0.0))
297+
#' model <- spark.logit(training, Species ~ ., family = "multinomial",
298+
#' lowerBoundsOnCoefficients = lbc,
299+
#' lowerBoundsOnIntercepts = lbi)
287300
#' }
288301
#' @note spark.logit since 2.1.0
289302
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),

R/pkg/R/serialize.R

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@
3030
# POSIXct,POSIXlt -> Time
3131
#
3232
# list[T] -> Array[T], where T is one of above mentioned types
33+
# Multi-element vector of any of the above (except raw) -> Array[T]
3334
# environment -> Map[String, T], where T is a native type
3435
# jobj -> Object, where jobj is an object created in the backend
3536
# nolint end
3637

3738
getSerdeType <- function(object) {
3839
type <- class(object)[[1]]
39-
if (type != "list") {
40-
type
40+
if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
41+
"array"
42+
} else if (type != "list") {
43+
type
4144
} else {
4245
# Check if all elements are of same type
4346
elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
@@ -50,9 +53,7 @@ getSerdeType <- function(object) {
5053
}
5154

5255
writeObject <- function(con, object, writeType = TRUE) {
53-
# NOTE: In R vectors have same type as objects. So we don't support
54-
# passing in vectors as arrays and instead require arrays to be passed
55-
# as lists.
56+
# NOTE: In R vectors have same type as objects
5657
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
5758
# Checking types is needed here, since 'is.na' only handles atomic vectors,
5859
# lists and pairlists

R/pkg/tests/fulltests/test_Serde.R

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,53 @@ test_that("SerDe of primitive types", {
3737
expect_equal(class(x), "character")
3838
})
3939

40+
test_that("SerDe of multi-element primitive vectors inside R data.frame", {
41+
# vector of integers embedded in R data.frame
42+
indices <- 1L:3L
43+
myDf <- data.frame(indices)
44+
myDf$data <- list(rep(0L, 3L))
45+
mySparkDf <- as.DataFrame(myDf)
46+
myResultingDf <- collect(mySparkDf)
47+
myDfListedData <- data.frame(indices)
48+
myDfListedData$data <- list(as.list(rep(0L, 3L)))
49+
expect_equal(myResultingDf, myDfListedData)
50+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
51+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "integer")
52+
53+
# vector of numeric embedded in R data.frame
54+
myDf <- data.frame(indices)
55+
myDf$data <- list(rep(0, 3L))
56+
mySparkDf <- as.DataFrame(myDf)
57+
myResultingDf <- collect(mySparkDf)
58+
myDfListedData <- data.frame(indices)
59+
myDfListedData$data <- list(as.list(rep(0, 3L)))
60+
expect_equal(myResultingDf, myDfListedData)
61+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
62+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "numeric")
63+
64+
# vector of logical embedded in R data.frame
65+
myDf <- data.frame(indices)
66+
myDf$data <- list(rep(TRUE, 3L))
67+
mySparkDf <- as.DataFrame(myDf)
68+
myResultingDf <- collect(mySparkDf)
69+
myDfListedData <- data.frame(indices)
70+
myDfListedData$data <- list(as.list(rep(TRUE, 3L)))
71+
expect_equal(myResultingDf, myDfListedData)
72+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
73+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "logical")
74+
75+
# vector of character embedded in R data.frame
76+
myDf <- data.frame(indices)
77+
myDf$data <- list(rep("abc", 3L))
78+
mySparkDf <- as.DataFrame(myDf)
79+
myResultingDf <- collect(mySparkDf)
80+
myDfListedData <- data.frame(indices)
81+
myDfListedData$data <- list(as.list(rep("abc", 3L)))
82+
expect_equal(myResultingDf, myDfListedData)
83+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
84+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "character")
85+
})
86+
4087
test_that("SerDe of list of primitive types", {
4188
x <- list(1L, 2L, 3L)
4289
y <- callJStatic("SparkRHandler", "echo", x)

R/pkg/tests/fulltests/test_mllib_classification.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ test_that("spark.logit", {
124124
# Petal.Width 0.42122607
125125
# nolint end
126126

127-
# Test multinomial logistic regression againt three classes
127+
# Test multinomial logistic regression against three classes
128128
df <- suppressWarnings(createDataFrame(iris))
129129
model <- spark.logit(df, Species ~ ., regParam = 0.5)
130130
summary <- summary(model)
@@ -196,7 +196,7 @@ test_that("spark.logit", {
196196
#
197197
# nolint end
198198

199-
# Test multinomial logistic regression againt two classes
199+
# Test multinomial logistic regression against two classes
200200
df <- suppressWarnings(createDataFrame(iris))
201201
training <- df[df$Species %in% c("versicolor", "virginica"), ]
202202
model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
@@ -208,7 +208,7 @@ test_that("spark.logit", {
208208
expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
209209
expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
210210

211-
# Test binomial logistic regression againt two classes
211+
# Test binomial logistic regression against two classes
212212
model <- spark.logit(training, Species ~ ., regParam = 0.5)
213213
summary <- summary(model)
214214
coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
@@ -239,7 +239,7 @@ test_that("spark.logit", {
239239
prediction2 <- collect(select(predict(model2, df2), "prediction"))
240240
expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))
241241

242-
# Test binomial logistic regression againt two classes with upperBoundsOnCoefficients
242+
# Test binomial logistic regression against two classes with upperBoundsOnCoefficients
243243
# and upperBoundsOnIntercepts
244244
u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
245245
model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
@@ -252,7 +252,7 @@ test_that("spark.logit", {
252252
expect_error(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = as.array(c(1, 2)),
253253
upperBoundsOnIntercepts = 1.0))
254254

255-
# Test binomial logistic regression againt two classes with lowerBoundsOnCoefficients
255+
# Test binomial logistic regression against two classes with lowerBoundsOnCoefficients
256256
# and lowerBoundsOnIntercepts
257257
l <- matrix(c(0.0, -1.0, 0.0, -1.0), nrow = 1, ncol = 4)
258258
model <- spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ private class DownloadCallback implements StreamCallback {
171171

172172
@Override
173173
public void onData(String streamId, ByteBuffer buf) throws IOException {
174-
channel.write(buf);
174+
while (buf.hasRemaining()) {
175+
channel.write(buf);
176+
}
175177
}
176178

177179
@Override

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executo
118118
onEvent(executorBlacklisted);
119119
}
120120

121+
@Override
122+
public void onExecutorBlacklistedForStage(
123+
SparkListenerExecutorBlacklistedForStage executorBlacklistedForStage) {
124+
onEvent(executorBlacklistedForStage);
125+
}
126+
127+
@Override
128+
public void onNodeBlacklistedForStage(
129+
SparkListenerNodeBlacklistedForStage nodeBlacklistedForStage) {
130+
onEvent(nodeBlacklistedForStage);
131+
}
132+
121133
@Override
122134
public final void onExecutorUnblacklisted(
123135
SparkListenerExecutorUnblacklisted executorUnblacklisted) {

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
172172
currentEntry = sortedConsumers.lastEntry();
173173
}
174174
List<MemoryConsumer> cList = currentEntry.getValue();
175-
MemoryConsumer c = cList.remove(cList.size() - 1);
176-
if (cList.isEmpty()) {
177-
sortedConsumers.remove(currentEntry.getKey());
178-
}
175+
MemoryConsumer c = cList.get(cList.size() - 1);
179176
try {
180177
long released = c.spill(required - got, consumer);
181178
if (released > 0) {
@@ -185,6 +182,11 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
185182
if (got >= required) {
186183
break;
187184
}
185+
} else {
186+
cList.remove(cList.size() - 1);
187+
if (cList.isEmpty()) {
188+
sortedConsumers.remove(currentEntry.getKey());
189+
}
188190
}
189191
} catch (ClosedByInterruptException e) {
190192
// This called by user to kill a task (e.g: speculative task).

0 commit comments

Comments
 (0)