Skip to content

Commit 51daf9a

Browse files
authored
Merge branch 'master' into support-add-jar-ivy
2 parents afaf7bd + 94d648d commit 51daf9a

File tree

509 files changed

+4307
-71305
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

509 files changed

+4307
-71305
lines changed

.github/workflows/build_and_test.yml

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,10 @@ jobs:
168168
python3.8 -m pip list
169169
# SparkR
170170
- name: Install R 4.0
171+
uses: r-lib/actions/setup-r@v1
171172
if: contains(matrix.modules, 'sparkr')
172-
run: |
173-
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
174-
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
175-
sudo apt-get update
176-
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
173+
with:
174+
r-version: 4.0
177175
- name: Install R packages
178176
if: contains(matrix.modules, 'sparkr')
179177
run: |
@@ -232,11 +230,9 @@ jobs:
232230
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
233231
pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme ipython nbsphinx
234232
- name: Install R 4.0
235-
run: |
236-
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
237-
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
238-
sudo apt-get update
239-
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
233+
uses: r-lib/actions/setup-r@v1
234+
with:
235+
r-version: 4.0
240236
- name: Install R linter dependencies and SparkR
241237
run: |
242238
sudo apt-get install -y libcurl4-openssl-dev

.github/workflows/test_report.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,16 @@ jobs:
1515
github_token: ${{ secrets.GITHUB_TOKEN }}
1616
workflow: ${{ github.event.workflow_run.workflow_id }}
1717
commit: ${{ github.event.workflow_run.head_commit.id }}
18+
- name: Check if JUnit report XML files exist
19+
run: |
20+
if ls **/target/test-reports/*.xml > /dev/null 2>&1; then
21+
echo '::set-output name=FILE_EXISTS::true'
22+
else
23+
echo '::set-output name=FILE_EXISTS::false'
24+
fi
25+
id: check-junit-file
1826
- name: Publish test report
27+
if: steps.check-junit-file.outputs.FILE_EXISTS == 'true'
1928
uses: scacap/action-surefire-report@v1
2029
with:
2130
check_name: Report test results

R/pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ exportMethods("%<=>%",
348348
"negate",
349349
"next_day",
350350
"not",
351+
"nth_value",
351352
"ntile",
352353
"otherwise",
353354
"over",
@@ -426,6 +427,7 @@ exportMethods("%<=>%",
426427
"variance",
427428
"var_pop",
428429
"var_samp",
430+
"vector_to_array",
429431
"weekofyear",
430432
"when",
431433
"window",

R/pkg/R/functions.R

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -338,12 +338,29 @@ NULL
338338
#' tmp <- mutate(df, dist = over(cume_dist(), ws), dense_rank = over(dense_rank(), ws),
339339
#' lag = over(lag(df$mpg), ws), lead = over(lead(df$mpg, 1), ws),
340340
#' percent_rank = over(percent_rank(), ws),
341-
#' rank = over(rank(), ws), row_number = over(row_number(), ws))
341+
#' rank = over(rank(), ws), row_number = over(row_number(), ws),
342+
#' nth_value = over(nth_value(df$mpg, 3), ws))
342343
#' # Get ntile group id (1-4) for hp
343344
#' tmp <- mutate(tmp, ntile = over(ntile(4), ws))
344345
#' head(tmp)}
345346
NULL
346347

348+
#' ML functions for Column operations
349+
#'
350+
#' ML functions defined for \code{Column}.
351+
#'
352+
#' @param x Column to compute on.
353+
#' @param ... additional argument(s).
354+
#' @name column_ml_functions
355+
#' @rdname column_ml_functions
356+
#' @family ml functions
357+
#' @examples
358+
#' \dontrun{
359+
#' df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
360+
#' head(select(df, vector_to_array(df$features)))
361+
#' }
362+
NULL
363+
347364
#' @details
348365
#' \code{lit}: A new Column is created to represent the literal value.
349366
#' If the parameter is a Column, it is returned unchanged.
@@ -3298,6 +3315,37 @@ setMethod("lead",
32983315
column(jc)
32993316
})
33003317

3318+
#' @details
3319+
#' \code{nth_value}: Window function: returns the value that is the \code{offset}th
3320+
#' row of the window frame# (counting from 1), and \code{null} if the size of window
3321+
#' frame is less than \code{offset} rows.
3322+
#'
3323+
#' @param offset a numeric indicating number of row to use as the value
3324+
#' @param na.rm a logical which indicates that the Nth value should skip null in the
3325+
#' determination of which row to use
3326+
#'
3327+
#' @rdname column_window_functions
3328+
#' @aliases nth_value nth_value,characterOrColumn-method
3329+
#' @note nth_value since 3.1.0
3330+
setMethod("nth_value",
3331+
signature(x = "characterOrColumn", offset = "numeric"),
3332+
function(x, offset, na.rm = FALSE) {
3333+
x <- if (is.character(x)) {
3334+
column(x)
3335+
} else {
3336+
x
3337+
}
3338+
offset <- as.integer(offset)
3339+
jc <- callJStatic(
3340+
"org.apache.spark.sql.functions",
3341+
"nth_value",
3342+
x@jc,
3343+
offset,
3344+
na.rm
3345+
)
3346+
column(jc)
3347+
})
3348+
33013349
#' @details
33023350
#' \code{ntile}: Returns the ntile group id (from 1 to n inclusive) in an ordered window
33033351
#' partition. For example, if n is 4, the first quarter of the rows will get value 1, the second
@@ -4419,10 +4467,32 @@ setMethod("current_timestamp",
44194467
#' @aliases timestamp_seconds timestamp_seconds,Column-method
44204468
#' @note timestamp_seconds since 3.1.0
44214469
setMethod("timestamp_seconds",
4422-
signature(x = "Column"),
4423-
function(x) {
4424-
jc <- callJStatic(
4425-
"org.apache.spark.sql.functions", "timestamp_seconds", x@jc
4426-
)
4427-
column(jc)
4428-
})
4470+
signature(x = "Column"),
4471+
function(x) {
4472+
jc <- callJStatic(
4473+
"org.apache.spark.sql.functions", "timestamp_seconds", x@jc
4474+
)
4475+
column(jc)
4476+
})
4477+
4478+
#' @details
4479+
#' \code{vector_to_array} Converts a column of MLlib sparse/dense vectors into
4480+
#' a column of dense arrays.
4481+
#'
4482+
#' @param dtype The data type of the output array. Valid values: "float64" or "float32".
4483+
#'
4484+
#' @rdname column_ml_functions
4485+
#' @aliases vector_to_array vector_to_array,Column-method
4486+
#' @note vector_to_array since 3.1.0
4487+
setMethod("vector_to_array",
4488+
signature(x = "Column"),
4489+
function(x, dtype = c("float64", "float32")) {
4490+
dtype <- match.arg(dtype)
4491+
jc <- callJStatic(
4492+
"org.apache.spark.ml.functions",
4493+
"vector_to_array",
4494+
x@jc,
4495+
dtype
4496+
)
4497+
column(jc)
4498+
})

R/pkg/R/generics.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,10 @@ setGeneric("months_between", function(y, x, ...) { standardGeneric("months_betwe
11641164
#' @rdname count
11651165
setGeneric("n", function(x) { standardGeneric("n") })
11661166

1167+
#' @rdname column_window_functions
1168+
#' @name NULL
1169+
setGeneric("nth_value", function(x, offset, ...) { standardGeneric("nth_value") })
1170+
11671171
#' @rdname column_nonaggregate_functions
11681172
#' @name NULL
11691173
setGeneric("nanvl", function(y, x) { standardGeneric("nanvl") })
@@ -1445,6 +1449,10 @@ setGeneric("var_pop", function(x) { standardGeneric("var_pop") })
14451449
#' @name NULL
14461450
setGeneric("var_samp", function(x) { standardGeneric("var_samp") })
14471451

1452+
#' @rdname column_ml_functions
1453+
#' @name NULL
1454+
setGeneric("vector_to_array", function(x, ...) { standardGeneric("vector_to_array") })
1455+
14481456
#' @rdname column_datetime_functions
14491457
#' @name NULL
14501458
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1424,7 +1424,10 @@ test_that("column functions", {
14241424
date_trunc("quarter", c) + current_date() + current_timestamp()
14251425
c25 <- overlay(c1, c2, c3, c3) + overlay(c1, c2, c3) + overlay(c1, c2, 1) +
14261426
overlay(c1, c2, 3, 4)
1427-
c26 <- timestamp_seconds(c1)
1427+
c26 <- timestamp_seconds(c1) + vector_to_array(c) +
1428+
vector_to_array(c, "float32") + vector_to_array(c, "float64")
1429+
c27 <- nth_value("x", 1L) + nth_value("y", 2, TRUE) +
1430+
nth_value(column("v"), 3) + nth_value(column("z"), 4L, FALSE)
14281431

14291432
# Test if base::is.nan() is exposed
14301433
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ public class ExternalShuffleBlockResolver {
9292
@VisibleForTesting
9393
final DB db;
9494

95-
private final List<String> knownManagers = Arrays.asList(
96-
"org.apache.spark.shuffle.sort.SortShuffleManager",
97-
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
98-
9995
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
10096
throws IOException {
10197
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
@@ -148,10 +144,6 @@ public void registerExecutor(
148144
ExecutorShuffleInfo executorInfo) {
149145
AppExecId fullId = new AppExecId(appId, execId);
150146
logger.info("Registered executor {} with {}", fullId, executorInfo);
151-
if (!knownManagers.contains(executorInfo.shuffleManager)) {
152-
throw new UnsupportedOperationException(
153-
"Unsupported shuffle manager of executor: " + executorInfo);
154-
}
155147
try {
156148
if (db != null) {
157149
byte[] key = dbAppExecKey(fullId);

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,6 @@ public void testBadRequests() throws IOException {
7171
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
7272
}
7373

74-
// Invalid shuffle manager
75-
try {
76-
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
77-
resolver.getBlockData("app0", "exec2", 1, 1, 0);
78-
fail("Should have failed");
79-
} catch (UnsupportedOperationException e) {
80-
// pass
81-
}
82-
8374
// Nonexistent shuffle block
8475
resolver.registerExecutor("app0", "exec3",
8576
dataContext.createExecutorInfo(SORT_MANAGER));

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ public void testFetchThreeSort() throws Exception {
233233
exec0Fetch.releaseBuffers();
234234
}
235235

236-
@Test (expected = RuntimeException.class)
237-
public void testRegisterInvalidExecutor() throws Exception {
238-
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
236+
@Test
237+
public void testRegisterWithCustomShuffleManager() throws Exception {
238+
registerExecutor("exec-1", dataContext0.createExecutorInfo("custom shuffle manager"));
239239
}
240240

241241
@Test

0 commit comments

Comments
 (0)