15
15
* limitations under the License.
16
16
*/
17
17
18
- package org .apache .spark .deploy . worker
18
+ package org .apache .spark .deploy
19
19
20
20
import org .apache .spark .{Logging , SparkConf , SecurityManager }
21
21
import org .apache .spark .util .Utils
@@ -24,8 +24,8 @@ import org.apache.spark.network.netty.SparkTransportConf
24
24
import org .apache .spark .network .sasl .SaslRpcHandler
25
25
import org .apache .spark .network .server .TransportServer
26
26
import org .apache .spark .network .shuffle .ExternalShuffleBlockHandler
27
-
28
27
import java .util .concurrent .CountDownLatch
28
+ import org .apache .spark .annotation .DeveloperApi
29
29
30
30
/**
31
31
* Provides a server from which Executors can read shuffle files (rather than reading directly from
@@ -34,7 +34,7 @@ import java.util.concurrent.CountDownLatch
34
34
*
35
35
* Optionally requires SASL authentication in order to read. See [[SecurityManager ]].
36
36
*/
37
- private [worker ]
37
+ private [deploy ]
38
38
class ExternalShuffleService (sparkConf : SparkConf , securityManager : SecurityManager )
39
39
extends Logging {
40
40
@@ -54,14 +54,19 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
54
54
/** Starts the external shuffle service if the user has configured us to. */
55
55
def startIfEnabled () {
56
56
if (enabled) {
57
- require(server == null , " Shuffle server already started" )
58
- logInfo(s " Starting shuffle service on port $port with useSasl = $useSasl" )
59
- server = transportContext.createServer(port)
57
+ start()
60
58
}
61
59
}
62
60
61
+ /** Start the external shuffle service */
62
+ def start () {
63
+ require(server == null , " Shuffle server already started" )
64
+ logInfo(s " Starting shuffle service on port $port with useSasl = $useSasl" )
65
+ server = transportContext.createServer(port)
66
+ }
67
+
63
68
def stop () {
64
- if (enabled && server != null ) {
69
+ if (server != null ) {
65
70
server.close()
66
71
server = null
67
72
}
@@ -86,14 +91,15 @@ object ExternalShuffleService extends Logging {
86
91
// and we assume the user really wants it to be running
87
92
sparkConf.set(" spark.shuffle.service.enabled" , " true" )
88
93
server = new ExternalShuffleService (sparkConf, securityManager)
94
+ server.start()
89
95
90
96
installShutdownHook()
91
97
92
98
// keep running until the process is terminated
93
99
barrier.await()
94
100
}
95
101
96
- private def installShutdownHook () = {
102
+ private def installShutdownHook (): Unit = {
97
103
Runtime .getRuntime.addShutdownHook(new Thread (" External Shuffle Service shutdown thread" ) {
98
104
override def run () {
99
105
logInfo(" Shutting down shuffle service." )
0 commit comments