Skip to content

Commit f1e9dc2

Browse files
hasnain-dbMridul Muralidharan
authored andcommitted
[SPARK-45408][CORE] Add RPC SSL settings to TransportConf
### What changes were proposed in this pull request? This change adds new settings to `TransportConf` which are needed for the RPC SSL functionality to work. Additionally, add some sample configurations which are used by tests in follow up PRs (see #42685 for the full context) ### Why are the changes needed? These changes are needed so that other modules can easily access configurations, and that the sample configurations are easily accessible for tests. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test, then ran: ``` ./build/sbt > project network-common > testOnly org.apache.spark.network.TransportConfSuite ``` There are more follow up tests coming (see #42685) ### Was this patch authored or co-authored using generative AI tooling? No Closes #43220 from hasnain-db/spark-tls-configs-low. Authored-by: Hasnain Lakhani <hasnain.lakhani@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent e8eb245 commit f1e9dc2

File tree

3 files changed

+475
-0
lines changed

3 files changed

+475
-0
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network.util;
1919

20+
import java.io.File;
2021
import java.util.Locale;
2122
import java.util.Properties;
2223
import java.util.concurrent.TimeUnit;
@@ -257,6 +258,157 @@ public int sslShuffleChunkSize() {
257258
conf.get("spark.network.ssl.maxEncryptedBlockSize", "64k")));
258259
}
259260

261+
/**
262+
* Whether Secure (SSL/TLS) RPC (including Block Transfer Service) is enabled
263+
*/
264+
public boolean sslRpcEnabled() {
265+
return conf.getBoolean("spark.ssl.rpc.enabled", false);
266+
}
267+
268+
/**
269+
* SSL protocol (remember that SSLv3 was compromised) supported by Java
270+
*/
271+
public String sslRpcProtocol() {
272+
return conf.get("spark.ssl.rpc.protocol", null);
273+
}
274+
275+
/**
276+
* A comma separated list of ciphers
277+
*/
278+
public String[] sslRpcRequestedCiphers() {
279+
String ciphers = conf.get("spark.ssl.rpc.enabledAlgorithms", null);
280+
return (ciphers != null ? ciphers.split(",") : null);
281+
}
282+
283+
/**
284+
* The key-store file; can be relative to the current directory
285+
*/
286+
public File sslRpcKeyStore() {
287+
String keyStore = conf.get("spark.ssl.rpc.keyStore", null);
288+
if (keyStore != null) {
289+
return new File(keyStore);
290+
} else {
291+
return null;
292+
}
293+
}
294+
295+
/**
296+
* The password to the key-store file
297+
*/
298+
public String sslRpcKeyStorePassword() {
299+
return conf.get("spark.ssl.rpc.keyStorePassword", null);
300+
}
301+
302+
/**
303+
* A PKCS#8 private key file in PEM format; can be relative to the current directory
304+
*/
305+
public File sslRpcPrivateKey() {
306+
String privateKey = conf.get("spark.ssl.rpc.privateKey", null);
307+
if (privateKey != null) {
308+
return new File(privateKey);
309+
} else {
310+
return null;
311+
}
312+
}
313+
314+
/**
315+
* The password to the private key
316+
*/
317+
public String sslRpcKeyPassword() {
318+
return conf.get("spark.ssl.rpc.keyPassword", null);
319+
}
320+
321+
/**
322+
* A X.509 certificate chain file in PEM format; can be relative to the current directory
323+
*/
324+
public File sslRpcCertChain() {
325+
String certChain = conf.get("spark.ssl.rpc.certChain", null);
326+
if (certChain != null) {
327+
return new File(certChain);
328+
} else {
329+
return null;
330+
}
331+
}
332+
333+
/**
334+
* The trust-store file; can be relative to the current directory
335+
*/
336+
public File sslRpcTrustStore() {
337+
String trustStore = conf.get("spark.ssl.rpc.trustStore", null);
338+
if (trustStore != null) {
339+
return new File(trustStore);
340+
} else {
341+
return null;
342+
}
343+
}
344+
345+
/**
346+
* The password to the trust-store file
347+
*/
348+
public String sslRpcTrustStorePassword() {
349+
return conf.get("spark.ssl.rpc.trustStorePassword", null);
350+
}
351+
352+
/**
353+
* If using a trust-store that that reloads its configuration is enabled.
354+
* If true, when the trust-store file on disk changes, it will be reloaded
355+
*/
356+
public boolean sslRpcTrustStoreReloadingEnabled() {
357+
return conf.getBoolean("spark.ssl.rpc.trustStoreReloadingEnabled", false);
358+
}
359+
360+
/**
361+
* The interval, in milliseconds, the trust-store will reload its configuration
362+
*/
363+
public int sslRpctrustStoreReloadIntervalMs() {
364+
return conf.getInt("spark.ssl.rpc.trustStoreReloadIntervalMs", 10000);
365+
}
366+
367+
/**
368+
* If the OpenSSL implementation is enabled,
369+
* (if available on host system), requires certChain and keyFile arguments
370+
*/
371+
public boolean sslRpcOpenSslEnabled() {
372+
return conf.getBoolean("spark.ssl.rpc.openSslEnabled", false);
373+
}
374+
375+
/**
376+
*
377+
* @return true if and only if RPC encryption is enabled and the relevant keys exist
378+
*/
379+
public boolean sslRpcEnabledAndKeysAreValid() {
380+
if (!sslRpcEnabled()) {
381+
return false;
382+
}
383+
if (sslRpcOpenSslEnabled()) {
384+
// OpenSSL requires both the privateKey and certChain
385+
File privateKey = sslRpcPrivateKey();
386+
if (privateKey == null || !privateKey.exists()) {
387+
return false;
388+
}
389+
File certChain = sslRpcCertChain();
390+
if (certChain == null || !certChain.exists()) {
391+
return false;
392+
}
393+
return true;
394+
} else {
395+
File keyStore = sslRpcKeyStore();
396+
if (keyStore == null || !keyStore.exists()) {
397+
return false;
398+
}
399+
// It's fine for the trust store to be missing, we would default to trusting all.
400+
return true;
401+
}
402+
}
403+
404+
/**
405+
* If we can dangerously fallback to unencrypted connections if RPC over SSL is enabled
406+
* but the key files are not present
407+
*/
408+
public boolean sslRpcDangerouslyFallbackIfKeysNotPresent() {
409+
return conf.getBoolean("spark.ssl.rpc.dangerouslyFallbackIfKeysNotPresent", false);
410+
}
411+
260412
/**
261413
* Flag indicating whether to share the pooled ByteBuf allocators between the different Netty
262414
* channels. If enabled then only two pooled ByteBuf allocators are created: one where caching
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.network;
18+
19+
import java.io.File;
20+
21+
import org.junit.jupiter.api.Test;
22+
import static org.junit.jupiter.api.Assertions.*;
23+
24+
import org.apache.spark.network.util.TransportConf;
25+
import org.apache.spark.network.ssl.SslSampleConfigs;
26+
27+
public class TransportConfSuite {
28+
29+
private TransportConf transportConf =
30+
new TransportConf(
31+
"shuffle", SslSampleConfigs.createDefaultConfigProviderForRpcNamespace());
32+
33+
@Test
34+
public void testKeyStorePath() {
35+
assertEquals(new File(SslSampleConfigs.keyStorePath), transportConf.sslRpcKeyStore());
36+
}
37+
38+
@Test
39+
public void testPrivateKeyPath() {
40+
assertEquals(new File(SslSampleConfigs.privateKeyPath), transportConf.sslRpcPrivateKey());
41+
}
42+
43+
@Test
44+
public void testCertChainPath() {
45+
assertEquals(new File(SslSampleConfigs.certChainPath), transportConf.sslRpcCertChain());
46+
}
47+
48+
@Test
49+
public void testTrustStorePath() {
50+
assertEquals(new File(SslSampleConfigs.trustStorePath), transportConf.sslRpcTrustStore());
51+
}
52+
53+
@Test
54+
public void testTrustStoreReloadingEnabled() {
55+
assertFalse(transportConf.sslRpcTrustStoreReloadingEnabled());
56+
}
57+
58+
@Test
59+
public void testOpenSslEnabled() {
60+
assertFalse(transportConf.sslRpcOpenSslEnabled());
61+
}
62+
63+
@Test
64+
public void testSslRpcEnabled() {
65+
assertTrue(transportConf.sslRpcEnabled());
66+
}
67+
68+
69+
@Test
70+
public void testSslKeyStorePassword() {
71+
assertEquals("password", transportConf.sslRpcKeyStorePassword());
72+
}
73+
74+
@Test
75+
public void testSslKeyPassword() {
76+
assertEquals("password", transportConf.sslRpcKeyPassword());
77+
}
78+
79+
@Test
80+
public void testSslTrustStorePassword() {
81+
assertEquals("password", transportConf.sslRpcTrustStorePassword());
82+
}
83+
84+
@Test
85+
public void testSsltrustStoreReloadIntervalMs() {
86+
assertEquals(10000, transportConf.sslRpctrustStoreReloadIntervalMs());
87+
}
88+
}

0 commit comments

Comments
 (0)