17
17
18
18
package org .apache .spark .deploy .client
19
19
20
- import java .util .concurrent .TimeUnit
21
-
22
- import scala .concurrent .Await
23
- import scala .concurrent .duration .{Duration , FiniteDuration }
20
+ import scala .concurrent ._
24
21
25
22
import akka .actor ._
26
- import akka .actor .Actor .emptyBehavior
27
- import akka .pattern .ask
28
- import akka .remote .RemotingLifecycleEvent
29
23
30
24
import org .apache .spark .Logging
31
- import org .apache .spark .deploy .{ DeployMessage , DriverDescription }
25
+ import org .apache .spark .deploy .DriverDescription
32
26
import org .apache .spark .deploy .DeployMessages ._
33
27
import org .apache .spark .deploy .master .Master
34
28
import org .apache .spark .util .{AkkaUtils , Utils }
35
29
36
30
/**
37
- * Actor that sends a single message to the standalone master and then shuts down.
31
+ * Actor that sends a single message to the standalone master and returns the response in the
32
+ * given promise.
38
33
*/
39
- private [spark] abstract class SingleMessageClient (
40
- actorSystem : ActorSystem , master : String , message : DeployMessage )
41
- extends Logging {
42
-
43
- // Concrete child classes must implement
44
- def handleResponse (response : Any )
45
-
46
- var actor : ActorRef = actorSystem.actorOf(Props (new DriverActor ()))
47
-
48
- class DriverActor extends Actor with Logging {
49
- override def preStart () {
50
- context.system.eventStream.subscribe(self, classOf [RemotingLifecycleEvent ])
51
- logInfo(" Sending message to master " + master + " ..." )
52
- val masterActor = context.actorSelection(Master .toAkkaUrl(master))
53
- val timeoutDuration : FiniteDuration = Duration .create(
54
- System .getProperty(" spark.akka.askTimeout" , " 10" ).toLong, TimeUnit .SECONDS )
55
- val submitFuture = masterActor.ask(message)(timeoutDuration)
56
- handleResponse(Await .result(submitFuture, timeoutDuration))
57
- actorSystem.stop(actor)
58
- actorSystem.shutdown()
34
+ class DriverActor (master : String , response : Promise [(Boolean , String )]) extends Actor with Logging {
35
+ override def receive = {
36
+ case SubmitDriverResponse (success, message) => {
37
+ response.success((success, message))
59
38
}
60
39
61
- override def receive = emptyBehavior
62
- }
63
- }
64
-
65
- /**
66
- * Submits a driver to the master.
67
- */
68
- private [spark] class SubmissionClient (actorSystem : ActorSystem , master : String ,
69
- driverDescription : DriverDescription )
70
- extends SingleMessageClient (actorSystem, master, RequestSubmitDriver (driverDescription)) {
71
-
72
- override def handleResponse (response : Any ) {
73
- val resp = response.asInstanceOf [SubmitDriverResponse ]
74
- if (! resp.success) {
75
- logError(s " Error submitting driver to $master" )
76
- logError(resp.message)
40
+ case KillDriverResponse (success, message) => {
41
+ response.success((success, message))
77
42
}
78
- }
79
- }
80
43
81
- /**
82
- * Terminates a client at the master.
83
- */
84
- private [spark] class TerminationClient (actorSystem : ActorSystem , master : String , driverId : String )
85
- extends SingleMessageClient (actorSystem, master, RequestKillDriver (driverId)) {
86
-
87
- override def handleResponse (response : Any ) {
88
- val resp = response.asInstanceOf [KillDriverResponse ]
89
- if (! resp.success) {
90
- logError(s " Error terminating $driverId at $master" )
91
- logError(resp.message)
44
+ // Relay all other messages to the server.
45
+ case message => {
46
+ logInfo(s " Sending message to master $master... " )
47
+ val masterActor = context.actorSelection(Master .toAkkaUrl(master))
48
+ masterActor ! message
92
49
}
93
50
}
94
51
}
95
52
96
53
/**
97
54
* Executable utility for starting and terminating drivers inside of a standalone cluster.
98
55
*/
99
- object DriverClient {
56
+ object DriverClient extends Logging {
100
57
101
58
def main (args : Array [String ]) {
102
59
val driverArgs = new DriverClientArguments (args)
@@ -105,6 +62,9 @@ object DriverClient {
105
62
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
106
63
val (actorSystem, boundPort) = AkkaUtils .createActorSystem(
107
64
" driverClient" , Utils .localHostName(), 0 )
65
+ val master = driverArgs.master
66
+ val response = promise[(Boolean , String )]
67
+ val driver : ActorRef = actorSystem.actorOf(Props (new DriverActor (driverArgs.master, response)))
108
68
109
69
driverArgs.cmd match {
110
70
case " launch" =>
@@ -116,13 +76,22 @@ object DriverClient {
116
76
driverArgs.driverOptions,
117
77
driverArgs.driverJavaOptions,
118
78
driverArgs.driverEnvVars)
119
- val client = new SubmissionClient (actorSystem, driverArgs.master, driverDescription)
79
+ driver ! RequestSubmitDriver ( driverDescription)
120
80
121
81
case " kill" =>
122
- val master = driverArgs.master
123
82
val driverId = driverArgs.driverId
124
- val client = new TerminationClient (actorSystem, master, driverId)
83
+ driver ! RequestKillDriver ( driverId)
125
84
}
85
+
86
+ val (success, message) =
87
+ try {
88
+ Await .result(response.future, AkkaUtils .askTimeout)
89
+ } catch {
90
+ case e : TimeoutException => (false , s " Master $master failed to respond in time " )
91
+ }
92
+ if (success) logInfo(message) else logError(message)
93
+ actorSystem.stop(driver)
94
+ actorSystem.shutdown()
126
95
actorSystem.awaitTermination()
127
96
}
128
97
}
0 commit comments