Skip to content

Commit 4ea4f31

Browse files
authored
Merge branch 'master' into json-ignore-no-tokens
2 parents 32ec9ba + a927c76 commit 4ea4f31

File tree

397 files changed

+6499
-3625
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

397 files changed

+6499
-3625
lines changed

R/README.md

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,7 @@ To set other options like driver memory, executor memory etc. you can pass in th
3939

4040
#### Using SparkR from RStudio
4141

42-
If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example
43-
```R
44-
# Set this to where Spark is installed
45-
Sys.setenv(SPARK_HOME="/Users/username/spark")
46-
# This line loads SparkR from the installed directory
47-
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
48-
library(SparkR)
49-
sparkR.session()
50-
```
42+
If you wish to use SparkR from RStudio, please refer [SparkR documentation](https://spark.apache.org/docs/latest/sparkr.html#starting-up-from-rstudio).
5143

5244
#### Making changes to SparkR
5345

R/pkg/R/functions.R

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,9 @@ NULL
202202
#' \itemize{
203203
#' \item \code{from_json}: a structType object to use as the schema to use
204204
#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is
205-
#' also supported for the schema.
206-
#' \item \code{from_csv}: a DDL-formatted string
205+
#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or
206+
#' the DDL-formatted string literal can also be accepted.
207+
#' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv}
207208
#' }
208209
#' @param ... additional argument(s).
209210
#' \itemize{
@@ -1723,7 +1724,7 @@ setMethod("radians",
17231724
#' @details
17241725
#' \code{to_date}: Converts the column into a DateType. You may optionally specify
17251726
#' a format according to the rules in:
1726-
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
1727+
#' \url{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}.
17271728
#' If the string cannot be parsed according to the specified format (or default),
17281729
#' the value of the column will be null.
17291730
#' By default, it follows casting rules to a DateType if the format is omitted
@@ -1819,7 +1820,7 @@ setMethod("to_csv", signature(x = "Column"),
18191820
#' @details
18201821
#' \code{to_timestamp}: Converts the column into a TimestampType. You may optionally specify
18211822
#' a format according to the rules in:
1822-
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
1823+
#' \url{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}.
18231824
#' If the string cannot be parsed according to the specified format (or default),
18241825
#' the value of the column will be null.
18251826
#' By default, it follows casting rules to a TimestampType if the format is omitted
@@ -2240,7 +2241,7 @@ setMethod("n", signature(x = "Column"),
22402241
#' \code{date_format}: Converts a date/timestamp/string to a value of string in the format
22412242
#' specified by the date format given by the second argument. A pattern could be for instance
22422243
#' \code{dd.MM.yyyy} and could return a string like '18.03.1993'. All
2243-
#' pattern letters of \code{java.text.SimpleDateFormat} can be used.
2244+
#' pattern letters of \code{java.time.format.DateTimeFormatter} can be used.
22442245
#' Note: Use when ever possible specialized functions like \code{year}. These benefit from a
22452246
#' specialized implementation.
22462247
#'
@@ -2254,40 +2255,54 @@ setMethod("date_format", signature(y = "Column", x = "character"),
22542255
column(jc)
22552256
})
22562257

2258+
setClassUnion("characterOrstructTypeOrColumn", c("character", "structType", "Column"))
2259+
22572260
#' @details
22582261
#' \code{from_json}: Parses a column containing a JSON string into a Column of \code{structType}
22592262
#' with the specified \code{schema} or array of \code{structType} if \code{as.json.array} is set
22602263
#' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA.
22612264
#'
22622265
#' @rdname column_collection_functions
22632266
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
2264-
#' @aliases from_json from_json,Column,characterOrstructType-method
2267+
#' @aliases from_json from_json,Column,characterOrstructTypeOrColumn-method
22652268
#' @examples
22662269
#'
22672270
#' \dontrun{
22682271
#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
22692272
#' df2 <- mutate(df2, d2 = to_json(df2$d, dateFormat = 'dd/MM/yyyy'))
22702273
#' schema <- structType(structField("date", "string"))
22712274
#' head(select(df2, from_json(df2$d2, schema, dateFormat = 'dd/MM/yyyy')))
2272-
22732275
#' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
22742276
#' df2 <- mutate(df2, people_json = to_json(df2$people))
22752277
#' schema <- structType(structField("name", "string"))
22762278
#' head(select(df2, from_json(df2$people_json, schema)))
2277-
#' head(select(df2, from_json(df2$people_json, "name STRING")))}
2279+
#' head(select(df2, from_json(df2$people_json, "name STRING")))
2280+
#' head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))}
22782281
#' @note from_json since 2.2.0
2279-
setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
2282+
setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
22802283
function(x, schema, as.json.array = FALSE, ...) {
22812284
if (is.character(schema)) {
2282-
schema <- structType(schema)
2285+
jschema <- structType(schema)$jobj
2286+
} else if (class(schema) == "structType") {
2287+
jschema <- schema$jobj
2288+
} else {
2289+
jschema <- schema@jc
22832290
}
22842291

22852292
if (as.json.array) {
2286-
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
2287-
"createArrayType",
2288-
schema$jobj)
2289-
} else {
2290-
jschema <- schema$jobj
2293+
# This case is R-specifically different. Unlike Scala and Python side,
2294+
# R side has 'as.json.array' option to indicate if the schema should be
2295+
# treated as struct or element type of array in order to make it more
2296+
# R-friendly.
2297+
if (class(schema) == "Column") {
2298+
jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
2299+
"createArrayType",
2300+
jschema)
2301+
} else {
2302+
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
2303+
"createArrayType",
2304+
jschema)
2305+
}
22912306
}
22922307
options <- varargsToStrEnv(...)
22932308
jc <- callJStatic("org.apache.spark.sql.functions",
@@ -2328,22 +2343,27 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"),
23282343
#' If the string is unparseable, the Column will contain the value NA.
23292344
#'
23302345
#' @rdname column_collection_functions
2331-
#' @aliases from_csv from_csv,Column,character-method
2346+
#' @aliases from_csv from_csv,Column,characterOrstructTypeOrColumn-method
23322347
#' @examples
23332348
#'
23342349
#' \dontrun{
2335-
#' df <- sql("SELECT 'Amsterdam,2018' as csv")
2350+
#' csv <- "Amsterdam,2018"
2351+
#' df <- sql(paste0("SELECT '", csv, "' as csv"))
23362352
#' schema <- "city STRING, year INT"
2337-
#' head(select(df, from_csv(df$csv, schema)))}
2353+
#' head(select(df, from_csv(df$csv, schema)))
2354+
#' head(select(df, from_csv(df$csv, structType(schema))))
2355+
#' head(select(df, from_csv(df$csv, schema_of_csv(csv))))}
23382356
#' @note from_csv since 3.0.0
2339-
setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
2357+
setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
23402358
function(x, schema, ...) {
2341-
if (class(schema) == "Column") {
2342-
jschema <- schema@jc
2343-
} else if (is.character(schema)) {
2359+
if (class(schema) == "structType") {
2360+
schema <- callJMethod(schema$jobj, "toDDL")
2361+
}
2362+
2363+
if (is.character(schema)) {
23442364
jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema)
23452365
} else {
2346-
stop("schema argument should be a column or character")
2366+
jschema <- schema@jc
23472367
}
23482368
options <- varargsToStrEnv(...)
23492369
jc <- callJStatic("org.apache.spark.sql.functions",
@@ -2666,7 +2686,7 @@ setMethod("format_string", signature(format = "character", x = "Column"),
26662686
#' \code{from_unixtime}: Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC)
26672687
#' to a string representing the timestamp of that moment in the current system time zone in the JVM
26682688
#' in the given format.
2669-
#' See \href{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}{
2689+
#' See \href{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}{
26702690
#' Customizing Formats} for available options.
26712691
#'
26722692
#' @rdname column_datetime_functions

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,6 +1626,12 @@ test_that("column functions", {
16261626
expect_equal(c[[1]][[1]]$a, 1)
16271627
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
16281628
expect_equal(c[[1]][[1]]$a, 1)
1629+
c <- collect(select(df, alias(from_csv(df$col, structType("a INT")), "csv")))
1630+
expect_equal(c[[1]][[1]]$a, 1)
1631+
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv("1")), "csv")))
1632+
expect_equal(c[[1]][[1]]$`_c0`, 1)
1633+
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv(lit("1"))), "csv")))
1634+
expect_equal(c[[1]][[1]]$`_c0`, 1)
16291635

16301636
df <- as.DataFrame(list(list("col" = "1")))
16311637
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
@@ -1651,7 +1657,9 @@ test_that("column functions", {
16511657
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
16521658
df <- as.DataFrame(j)
16531659
schemas <- list(structType(structField("age", "integer"), structField("height", "double")),
1654-
"age INT, height DOUBLE")
1660+
"age INT, height DOUBLE",
1661+
schema_of_json("{\"age\":16,\"height\":176.5}"),
1662+
schema_of_json(lit("{\"age\":16,\"height\":176.5}")))
16551663
for (schema in schemas) {
16561664
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
16571665
expect_equal(ncol(s), 1)
@@ -1691,7 +1699,11 @@ test_that("column functions", {
16911699
# check if array type in string is correctly supported.
16921700
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
16931701
df <- as.DataFrame(list(list("people" = jsonArr)))
1694-
for (schema in list(structType(structField("name", "string")), "name STRING")) {
1702+
schemas <- list(structType(structField("name", "string")),
1703+
"name STRING",
1704+
schema_of_json("{\"name\":\"Alice\"}"),
1705+
schema_of_json(lit("{\"name\":\"Bob\"}")))
1706+
for (schema in schemas) {
16951707
arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
16961708
expect_equal(ncol(arr), 1)
16971709
expect_equal(nrow(arr), 1)

common/network-common/src/main/java/org/apache/spark/network/TransportContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222

23+
import com.codahale.metrics.Counter;
2324
import io.netty.channel.Channel;
2425
import io.netty.channel.ChannelPipeline;
2526
import io.netty.channel.EventLoopGroup;
@@ -66,6 +67,8 @@ public class TransportContext {
6667
private final RpcHandler rpcHandler;
6768
private final boolean closeIdleConnections;
6869
private final boolean isClientOnly;
70+
// Number of registered connections to the shuffle service
71+
private Counter registeredConnections = new Counter();
6972

7073
/**
7174
* Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
@@ -221,7 +224,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
221224
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
222225
rpcHandler, conf.maxChunksBeingTransferred());
223226
return new TransportChannelHandler(client, responseHandler, requestHandler,
224-
conf.connectionTimeoutMs(), closeIdleConnections);
227+
conf.connectionTimeoutMs(), closeIdleConnections, this);
225228
}
226229

227230
/**
@@ -234,4 +237,8 @@ private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler
234237
}
235238

236239
public TransportConf getConf() { return conf; }
240+
241+
public Counter getRegisteredConnections() {
242+
return registeredConnections;
243+
}
237244
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.channel.SimpleChannelInboundHandler;
2222
import io.netty.handler.timeout.IdleState;
2323
import io.netty.handler.timeout.IdleStateEvent;
24+
import org.apache.spark.network.TransportContext;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

@@ -57,18 +58,21 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
5758
private final TransportRequestHandler requestHandler;
5859
private final long requestTimeoutNs;
5960
private final boolean closeIdleConnections;
61+
private final TransportContext transportContext;
6062

6163
public TransportChannelHandler(
6264
TransportClient client,
6365
TransportResponseHandler responseHandler,
6466
TransportRequestHandler requestHandler,
6567
long requestTimeoutMs,
66-
boolean closeIdleConnections) {
68+
boolean closeIdleConnections,
69+
TransportContext transportContext) {
6770
this.client = client;
6871
this.responseHandler = responseHandler;
6972
this.requestHandler = requestHandler;
7073
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
7174
this.closeIdleConnections = closeIdleConnections;
75+
this.transportContext = transportContext;
7276
}
7377

7478
public TransportClient getClient() {
@@ -176,4 +180,16 @@ public TransportResponseHandler getResponseHandler() {
176180
return responseHandler;
177181
}
178182

183+
@Override
184+
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
185+
transportContext.getRegisteredConnections().inc();
186+
super.channelRegistered(ctx);
187+
}
188+
189+
@Override
190+
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
191+
transportContext.getRegisteredConnections().dec();
192+
super.channelUnregistered(ctx);
193+
}
194+
179195
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import com.codahale.metrics.Counter;
2526
import com.codahale.metrics.MetricSet;
2627
import com.google.common.base.Preconditions;
2728
import com.google.common.collect.Lists;
@@ -159,4 +160,8 @@ public void close() {
159160
}
160161
bootstrap = null;
161162
}
163+
164+
public Counter getRegisteredConnections() {
165+
return context.getRegisteredConnections();
166+
}
162167
}

common/network-common/src/main/java/org/apache/spark/network/util/ByteUnit.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
public enum ByteUnit {
2020
BYTE(1),
21-
KiB(1024L),
22-
MiB((long) Math.pow(1024L, 2L)),
23-
GiB((long) Math.pow(1024L, 3L)),
24-
TiB((long) Math.pow(1024L, 4L)),
25-
PiB((long) Math.pow(1024L, 5L));
21+
KiB(1L << 10),
22+
MiB(1L << 20),
23+
GiB(1L << 30),
24+
TiB(1L << 40),
25+
PiB(1L << 50);
2626

2727
ByteUnit(long multiplier) {
2828
this.multiplier = multiplier;
@@ -50,7 +50,7 @@ public long convertTo(long d, ByteUnit u) {
5050
}
5151
}
5252

53-
public double toBytes(long d) {
53+
public long toBytes(long d) {
5454
if (d < 0) {
5555
throw new IllegalArgumentException("Negative size value. Size must be positive: " + d);
5656
}

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,8 @@ public int chunkFetchHandlerThreads() {
309309
}
310310
int chunkFetchHandlerThreadsPercent =
311311
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100);
312-
return (int)Math.ceil(
313-
(this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors()) *
314-
chunkFetchHandlerThreadsPercent/(double)100);
312+
int threads =
313+
this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
314+
return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
315315
}
316316
}

common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,10 @@ public void testRpcHandlerDelegate() throws Exception {
347347
verify(handler).getStreamManager();
348348

349349
saslHandler.channelInactive(null);
350-
verify(handler).channelInactive(any(TransportClient.class));
350+
verify(handler).channelInactive(isNull());
351351

352352
saslHandler.exceptionCaught(null, null);
353-
verify(handler).exceptionCaught(any(Throwable.class), any(TransportClient.class));
353+
verify(handler).exceptionCaught(isNull(), isNull());
354354
}
355355

356356
@Test

common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.network.util;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.Random;
@@ -69,7 +68,7 @@ public void testInterception() throws Exception {
6968
decoder.channelRead(ctx, len);
7069
decoder.channelRead(ctx, dataBuf);
7170
verify(interceptor, times(interceptedReads)).handle(any(ByteBuf.class));
72-
verify(ctx).fireChannelRead(any(ByteBuffer.class));
71+
verify(ctx).fireChannelRead(any(ByteBuf.class));
7372
assertEquals(0, len.refCnt());
7473
assertEquals(0, dataBuf.refCnt());
7574
} finally {

0 commit comments

Comments
 (0)