Skip to content

Commit 2e9b09c

Browse files
committed
Merge branch 'master' into issues/SPARK-26060/rename_config
2 parents 15000a2 + 5a140b7 commit 2e9b09c

File tree

28 files changed

+595
-194
lines changed

28 files changed

+595
-194
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
159159
*/
160160
private final Location loc;
161161

162-
private final boolean enablePerfMetrics;
162+
private long numProbes = 0L;
163163

164-
private long numProbes = 0;
165-
166-
private long numKeyLookups = 0;
164+
private long numKeyLookups = 0L;
167165

168166
private long peakMemoryUsedBytes = 0L;
169167

@@ -180,16 +178,14 @@ public BytesToBytesMap(
180178
SerializerManager serializerManager,
181179
int initialCapacity,
182180
double loadFactor,
183-
long pageSizeBytes,
184-
boolean enablePerfMetrics) {
181+
long pageSizeBytes) {
185182
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
186183
this.taskMemoryManager = taskMemoryManager;
187184
this.blockManager = blockManager;
188185
this.serializerManager = serializerManager;
189186
this.loadFactor = loadFactor;
190187
this.loc = new Location();
191188
this.pageSizeBytes = pageSizeBytes;
192-
this.enablePerfMetrics = enablePerfMetrics;
193189
if (initialCapacity <= 0) {
194190
throw new IllegalArgumentException("Initial capacity must be greater than 0");
195191
}
@@ -209,23 +205,14 @@ public BytesToBytesMap(
209205
TaskMemoryManager taskMemoryManager,
210206
int initialCapacity,
211207
long pageSizeBytes) {
212-
this(taskMemoryManager, initialCapacity, pageSizeBytes, false);
213-
}
214-
215-
public BytesToBytesMap(
216-
TaskMemoryManager taskMemoryManager,
217-
int initialCapacity,
218-
long pageSizeBytes,
219-
boolean enablePerfMetrics) {
220208
this(
221209
taskMemoryManager,
222210
SparkEnv.get() != null ? SparkEnv.get().blockManager() : null,
223211
SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null,
224212
initialCapacity,
225213
// In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5.
226214
0.5,
227-
pageSizeBytes,
228-
enablePerfMetrics);
215+
pageSizeBytes);
229216
}
230217

231218
/**
@@ -462,15 +449,12 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash)
462449
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash) {
463450
assert(longArray != null);
464451

465-
if (enablePerfMetrics) {
466-
numKeyLookups++;
467-
}
452+
numKeyLookups++;
453+
468454
int pos = hash & mask;
469455
int step = 1;
470456
while (true) {
471-
if (enablePerfMetrics) {
472-
numProbes++;
473-
}
457+
numProbes++;
474458
if (longArray.get(pos * 2) == 0) {
475459
// This is a new key.
476460
loc.with(pos, hash, false);
@@ -860,9 +844,6 @@ public long getPeakMemoryUsedBytes() {
860844
* Returns the average number of probes per key lookup.
861845
*/
862846
public double getAverageProbesPerLookup() {
863-
if (!enablePerfMetrics) {
864-
throw new IllegalStateException();
865-
}
866847
return (1.0 * numProbes) / numKeyLookups;
867848
}
868849

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,25 +348,36 @@ private[spark] class SecurityManager(
348348
*/
349349
def initializeAuth(): Unit = {
350350
import SparkMasterRegex._
351+
val k8sRegex = "k8s.*".r
351352

352353
if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
353354
return
354355
}
355356

357+
// TODO: this really should be abstracted somewhere else.
356358
val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "")
357-
master match {
359+
val storeInUgi = master match {
358360
case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) =>
359-
// Secret generation allowed here
361+
true
362+
363+
case k8sRegex() =>
364+
// Don't propagate the secret through the user's credentials in kubernetes. That conflicts
365+
// with the way k8s handles propagation of delegation tokens.
366+
false
367+
360368
case _ =>
361369
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
362370
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
363371
return
364372
}
365373

366374
secretKey = Utils.createSecret(sparkConf)
367-
val creds = new Credentials()
368-
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
369-
UserGroupInformation.getCurrentUser().addCredentials(creds)
375+
376+
if (storeInUgi) {
377+
val creds = new Credentials()
378+
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
379+
UserGroupInformation.getCurrentUser().addCredentials(creds)
380+
}
370381
}
371382

372383
// Default SecurityManager only has a single secret key, so ignore appId.

core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ public void failureToGrow() {
530530
@Test
531531
public void spillInIterator() throws IOException {
532532
BytesToBytesMap map = new BytesToBytesMap(
533-
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
533+
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024);
534534
try {
535535
int i;
536536
for (i = 0; i < 1024; i++) {
@@ -569,7 +569,7 @@ public void spillInIterator() throws IOException {
569569
@Test
570570
public void multipleValuesForSameKey() {
571571
BytesToBytesMap map =
572-
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false);
572+
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);
573573
try {
574574
int i;
575575
for (i = 0; i < 1024; i++) {

core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -395,15 +395,23 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
395395
assert(keyFromEnv === new SecurityManager(conf2).getSecretKey())
396396
}
397397

398-
test("secret key generation") {
399-
Seq(
400-
("yarn", true),
401-
("local", true),
402-
("local[*]", true),
403-
("local[1, 2]", true),
404-
("local-cluster[2, 1, 1024]", false),
405-
("invalid", false)
406-
).foreach { case (master, shouldGenerateSecret) =>
398+
// How is the secret expected to be generated and stored.
399+
object SecretTestType extends Enumeration {
400+
val MANUAL, AUTO, UGI = Value
401+
}
402+
403+
import SecretTestType._
404+
405+
Seq(
406+
("yarn", UGI),
407+
("local", UGI),
408+
("local[*]", UGI),
409+
("local[1, 2]", UGI),
410+
("k8s://127.0.0.1", AUTO),
411+
("local-cluster[2, 1, 1024]", MANUAL),
412+
("invalid", MANUAL)
413+
).foreach { case (master, secretType) =>
414+
test(s"secret key generation: master '$master'") {
407415
val conf = new SparkConf()
408416
.set(NETWORK_AUTH_ENABLED, true)
409417
.set(SparkLauncher.SPARK_MASTER, master)
@@ -412,19 +420,26 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
412420
UserGroupInformation.createUserForTesting("authTest", Array()).doAs(
413421
new PrivilegedExceptionAction[Unit]() {
414422
override def run(): Unit = {
415-
if (shouldGenerateSecret) {
416-
mgr.initializeAuth()
417-
val creds = UserGroupInformation.getCurrentUser().getCredentials()
418-
val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
419-
assert(secret != null)
420-
assert(new String(secret, UTF_8) === mgr.getSecretKey())
421-
} else {
422-
intercept[IllegalArgumentException] {
423+
secretType match {
424+
case UGI =>
425+
mgr.initializeAuth()
426+
val creds = UserGroupInformation.getCurrentUser().getCredentials()
427+
val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
428+
assert(secret != null)
429+
assert(new String(secret, UTF_8) === mgr.getSecretKey())
430+
431+
case AUTO =>
423432
mgr.initializeAuth()
424-
}
425-
intercept[IllegalArgumentException] {
426-
mgr.getSecretKey()
427-
}
433+
val creds = UserGroupInformation.getCurrentUser().getCredentials()
434+
assert(creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) === null)
435+
436+
case MANUAL =>
437+
intercept[IllegalArgumentException] {
438+
mgr.initializeAuth()
439+
}
440+
intercept[IllegalArgumentException] {
441+
mgr.getSecretKey()
442+
}
428443
}
429444
}
430445
}

docs/security.md

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,29 @@ not documented, Spark does not support.
2626
Spark currently supports authentication for RPC channels using a shared secret. Authentication can
2727
be turned on by setting the `spark.authenticate` configuration parameter.
2828

29-
The exact mechanism used to generate and distribute the shared secret is deployment-specific.
29+
The exact mechanism used to generate and distribute the shared secret is deployment-specific. Unless
30+
specified below, the secret must be defined by setting the `spark.authenticate.secret` config
31+
option. The same secret is shared by all Spark applications and daemons in that case, which limits
32+
the security of these deployments, especially on multi-tenant clusters.
3033

31-
For Spark on [YARN](running-on-yarn.html) and local deployments, Spark will automatically handle
32-
generating and distributing the shared secret. Each application will use a unique shared secret. In
34+
The REST Submission Server and the MesosClusterDispatcher do not support authentication. You should
35+
ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077
36+
respectively by default) are restricted to hosts that are trusted to submit jobs.
37+
38+
### YARN
39+
40+
For Spark on [YARN](running-on-yarn.html), Spark will automatically handle generating and
41+
distributing the shared secret. Each application will use a unique shared secret. In
3342
the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of
3443
secrets to be secure.
3544

36-
For other resource managers, `spark.authenticate.secret` must be configured on each of the nodes.
37-
This secret will be shared by all the daemons and applications, so this deployment configuration is
38-
not as secure as the above, especially when considering multi-tenant clusters. In this
39-
configuration, a user with the secret can effectively impersonate any other user.
45+
### Kubernetes
4046

41-
The Rest Submission Server and the MesosClusterDispatcher do not support authentication. You should
42-
ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077
43-
respectively by default) are restricted to hosts that are trusted to submit jobs.
47+
On Kubernetes, Spark will also automatically generate an authentication secret unique to each
48+
application. The secret is propagated to executor pods using environment variables. This means
49+
that any user that can list pods in the namespace where the Spark application is running can
50+
also see their authentication secret. Access control rules should be properly set up by the
51+
Kubernetes admin to ensure that Spark authentication is secure.
4452

4553
<table class="table">
4654
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@@ -738,10 +746,10 @@ tokens for supported will be created.
738746
## Secure Interaction with Kubernetes
739747

740748
When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
741-
so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are
742-
shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job:
749+
so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are
750+
shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job:
743751

744-
In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
752+
In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
745753
`spark.kubernetes.hadoop.configMapName.`
746754

747755
It also important to note that the KDC needs to be visible from inside the containers.

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ displayTitle: Spark SQL Upgrading Guide
2929

3030
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined.
3131

32+
- In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`.
33+
3234
- In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.setCommandRejectsSparkCoreConfs` to `false`.
3335

3436
- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.

0 commit comments

Comments
 (0)