Skip to content

Commit 5819826

Browse files
committed
Merge branch 'master' into AE_1
2 parents 2c55985 + b45ff02 commit 5819826

File tree

143 files changed

+2017
-1087
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

143 files changed

+2017
-1087
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1694,7 +1694,7 @@ test_that("column functions", {
16941694

16951695
# check for unparseable
16961696
df <- as.DataFrame(list(list("a" = "")))
1697-
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
1697+
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
16981698

16991699
# check if array type in string is correctly supported.
17001700
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ private void init(String hostToBind, int portToBind) {
126126
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
127127
}
128128

129+
if (conf.enableTcpKeepAlive()) {
130+
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
131+
}
132+
129133
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
130134
@Override
131135
protected void initChannel(SocketChannel ch) {

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class TransportConf {
4242
private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
4343
private final String SPARK_NETWORK_IO_LAZYFD_KEY;
4444
private final String SPARK_NETWORK_VERBOSE_METRICS;
45+
private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY;
4546

4647
private final ConfigProvider conf;
4748

@@ -64,6 +65,7 @@ public TransportConf(String module, ConfigProvider conf) {
6465
SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
6566
SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
6667
SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
68+
SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = getConfKey("io.enableTcpKeepAlive");
6769
}
6870

6971
public int getInt(String name, int defaultValue) {
@@ -173,6 +175,14 @@ public boolean verboseMetrics() {
173175
return conf.getBoolean(SPARK_NETWORK_VERBOSE_METRICS, false);
174176
}
175177

178+
/**
179+
* Whether to enable TCP keep-alive. If true, the TCP keep-alives are enabled, which removes
180+
* connections that are idle for too long.
181+
*/
182+
public boolean enableTcpKeepAlive() {
183+
return conf.getBoolean(SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY, false);
184+
}
185+
176186
/**
177187
* Maximum number of retries when binding to a port before giving up.
178188
*/

common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void onFailure(int chunkIndex, Throwable e) {
172172
for (int chunkIndex : chunkIndices) {
173173
client.fetchChunk(STREAM_ID, chunkIndex, callback);
174174
}
175-
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
175+
if (!sem.tryAcquire(chunkIndices.size(), 60, TimeUnit.SECONDS)) {
176176
fail("Timeout getting response from the server");
177177
}
178178
}

common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
/**
2424
* An array of long values. Compared with native JVM arrays, this:
2525
* <ul>
26-
* <li>supports using both in-heap and off-heap memory</li>
26+
* <li>supports using both on-heap and off-heap memory</li>
2727
* <li>has no bound checking, and thus can crash the JVM process when assert is turned off</li>
2828
* </ul>
2929
*/

common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
/**
2323
* A memory location. Tracked either by a memory address (with off-heap allocation),
24-
* or by an offset from a JVM object (in-heap allocation).
24+
* or by an offset from a JVM object (on-heap allocation).
2525
*/
2626
public class MemoryLocation {
2727

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ public class TaskMemoryManager {
8585
/**
8686
* Similar to an operating system's page table, this array maps page numbers into base object
8787
* pointers, allowing us to translate between the hashtable's internal 64-bit address
88-
* representation and the baseObject+offset representation which we use to support both in- and
88+
* representation and the baseObject+offset representation which we use to support both on- and
8989
* off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
90-
* When using an in-heap allocator, the entries in this map will point to pages' base objects.
90+
* When using an on-heap allocator, the entries in this map will point to pages' base objects.
9191
* Entries are added to this map as new data pages are allocated.
9292
*/
9393
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
@@ -102,7 +102,7 @@ public class TaskMemoryManager {
102102
private final long taskAttemptId;
103103

104104
/**
105-
* Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods
105+
* Tracks whether we're on-heap or off-heap. For off-heap, we short-circuit most of these methods
106106
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
107107
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
108108
*/

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.internal.config._
32+
import org.apache.spark.internal.config.UI._
3233
import org.apache.spark.launcher.SparkLauncher
3334
import org.apache.spark.network.sasl.SecretKeyHolder
3435
import org.apache.spark.util.Utils
@@ -56,17 +57,13 @@ private[spark] class SecurityManager(
5657
private val WILDCARD_ACL = "*"
5758

5859
private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
59-
// keep spark.ui.acls.enable for backwards compatibility with 1.0
60-
private var aclsOn =
61-
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
60+
private var aclsOn = sparkConf.get(ACLS_ENABLE)
6261

6362
// admin acls should be set before view or modify acls
64-
private var adminAcls: Set[String] =
65-
stringToSet(sparkConf.get("spark.admin.acls", ""))
63+
private var adminAcls: Set[String] = sparkConf.get(ADMIN_ACLS).toSet
6664

6765
// admin group acls should be set before view or modify group acls
68-
private var adminAclsGroups : Set[String] =
69-
stringToSet(sparkConf.get("spark.admin.acls.groups", ""))
66+
private var adminAclsGroups: Set[String] = sparkConf.get(ADMIN_ACLS_GROUPS).toSet
7067

7168
private var viewAcls: Set[String] = _
7269

@@ -82,11 +79,11 @@ private[spark] class SecurityManager(
8279
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
8380
Utils.getCurrentUserName())
8481

85-
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
86-
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
82+
setViewAcls(defaultAclUsers, sparkConf.get(UI_VIEW_ACLS))
83+
setModifyAcls(defaultAclUsers, sparkConf.get(MODIFY_ACLS))
8784

88-
setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
89-
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));
85+
setViewAclsGroups(sparkConf.get(UI_VIEW_ACLS_GROUPS))
86+
setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))
9087

9188
private var secretKey: String = _
9289
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
@@ -127,32 +124,25 @@ private[spark] class SecurityManager(
127124
opts
128125
}
129126

130-
/**
131-
* Split a comma separated String, filter out any empty items, and return a Set of strings
132-
*/
133-
private def stringToSet(list: String): Set[String] = {
134-
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
135-
}
136-
137127
/**
138128
* Admin acls should be set before the view or modify acls. If you modify the admin
139129
* acls you should also set the view and modify acls again to pick up the changes.
140130
*/
141-
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
142-
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
131+
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
132+
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
143133
logInfo("Changing view acls to: " + viewAcls.mkString(","))
144134
}
145135

146-
def setViewAcls(defaultUser: String, allowedUsers: String) {
136+
def setViewAcls(defaultUser: String, allowedUsers: Seq[String]) {
147137
setViewAcls(Set[String](defaultUser), allowedUsers)
148138
}
149139

150140
/**
151141
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
152142
* acls groups you should also set the view and modify acls groups again to pick up the changes.
153143
*/
154-
def setViewAclsGroups(allowedUserGroups: String) {
155-
viewAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
144+
def setViewAclsGroups(allowedUserGroups: Seq[String]) {
145+
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
156146
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
157147
}
158148

@@ -179,17 +169,17 @@ private[spark] class SecurityManager(
179169
* Admin acls should be set before the view or modify acls. If you modify the admin
180170
* acls you should also set the view and modify acls again to pick up the changes.
181171
*/
182-
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
183-
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
172+
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
173+
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
184174
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
185175
}
186176

187177
/**
188178
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
189179
* acls groups you should also set the view and modify acls groups again to pick up the changes.
190180
*/
191-
def setModifyAclsGroups(allowedUserGroups: String) {
192-
modifyAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
181+
def setModifyAclsGroups(allowedUserGroups: Seq[String]) {
182+
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
193183
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
194184
}
195185

@@ -216,17 +206,17 @@ private[spark] class SecurityManager(
216206
* Admin acls should be set before the view or modify acls. If you modify the admin
217207
* acls you should also set the view and modify acls again to pick up the changes.
218208
*/
219-
def setAdminAcls(adminUsers: String) {
220-
adminAcls = stringToSet(adminUsers)
209+
def setAdminAcls(adminUsers: Seq[String]) {
210+
adminAcls = adminUsers.toSet
221211
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
222212
}
223213

224214
/**
225215
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
226216
* acls groups you should also set the view and modify acls groups again to pick up the changes.
227217
*/
228-
def setAdminAclsGroups(adminUserGroups: String) {
229-
adminAclsGroups = stringToSet(adminUserGroups)
218+
def setAdminAclsGroups(adminUserGroups: Seq[String]) {
219+
adminAclsGroups = adminUserGroups.toSet
230220
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
231221
}
232222

@@ -416,7 +406,7 @@ private[spark] object SecurityManager {
416406

417407
val k8sRegex = "k8s.*".r
418408
val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
419-
val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
409+
val SPARK_AUTH_SECRET_CONF = AUTH_SECRET.key
420410
// This is used to set auth secret to an executor's env variable. It should have the same
421411
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
422412
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
594594
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
595595
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
596596
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
597-
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " +
597+
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be greater than the value of " +
598598
s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
599599
}
600600

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream,
4646
import org.apache.spark.internal.Logging
4747
import org.apache.spark.internal.config._
4848
import org.apache.spark.internal.config.Tests._
49+
import org.apache.spark.internal.config.UI._
4950
import org.apache.spark.io.CompressionCodec
5051
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5152
import org.apache.spark.rdd._
@@ -440,7 +441,7 @@ class SparkContext(config: SparkConf) extends Logging {
440441
}
441442

442443
_ui =
443-
if (conf.getBoolean("spark.ui.enabled", true)) {
444+
if (conf.get(UI_ENABLED)) {
444445
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
445446
startTime))
446447
} else {
@@ -510,7 +511,7 @@ class SparkContext(config: SparkConf) extends Logging {
510511
_applicationId = _taskScheduler.applicationId()
511512
_applicationAttemptId = taskScheduler.applicationAttemptId()
512513
_conf.set("spark.app.id", _applicationId)
513-
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
514+
if (_conf.get(UI_REVERSE_PROXY)) {
514515
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
515516
}
516517
_ui.foreach(_.setAppId(_applicationId))

0 commit comments

Comments
 (0)