@@ -48,9 +48,14 @@ private[spark] class Client(
48
48
49
49
val yarnConf : YarnConfiguration = new YarnConfiguration (hadoopConf)
50
50
51
+ /* ------------------------------------------------------------------------------------- *
52
+ | The following methods have much in common in the stable and alpha versions of Client, |
53
+ | but cannot be implemented in the parent trait due to subtle API differences across |
54
+ | hadoop versions. |
55
+ * ------------------------------------------------------------------------------------- */
56
+
51
57
/** Submit an application running our ApplicationMaster to the ResourceManager. */
52
58
override def submitApplication (): ApplicationId = {
53
- // Initialize and start the client service.
54
59
init(yarnConf)
55
60
start()
56
61
@@ -64,10 +69,8 @@ private[spark] class Client(
64
69
// Verify whether the cluster has enough resources for our AM.
65
70
verifyClusterResources(newAppResponse)
66
71
67
- // Set up ContainerLaunchContext to launch our AM container .
72
+ // Set up the appropriate contexts to launch our AM.
68
73
val containerContext = createContainerLaunchContext(newAppResponse)
69
-
70
- // Set up ApplicationSubmissionContext to submit our AM.
71
74
val appContext = createApplicationSubmissionContext(appId, containerContext)
72
75
73
76
// Finally, submit and monitor the application.
@@ -77,7 +80,9 @@ private[spark] class Client(
77
80
}
78
81
79
82
/**
80
- *
83
+ * Set up a context for launching our ApplicationMaster container.
84
+ * In the Yarn alpha API, the memory requirements of this container must be set in
85
+ * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
81
86
*/
82
87
override def createContainerLaunchContext (newAppResponse : GetNewApplicationResponse )
83
88
: ContainerLaunchContext = {
@@ -88,9 +93,7 @@ private[spark] class Client(
88
93
containerContext
89
94
}
90
95
91
- /**
92
- *
93
- */
96
+ /** Set up the context for submitting our ApplicationMaster. */
94
97
def createApplicationSubmissionContext (
95
98
appId : ApplicationId ,
96
99
containerContext : ContainerLaunchContext ): ApplicationSubmissionContext = {
@@ -104,25 +107,30 @@ private[spark] class Client(
104
107
}
105
108
106
109
/**
107
- *
110
+ * Set up security tokens for launching our ApplicationMaster container.
111
+ * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
108
112
*/
109
- override def getAMMemory (newApp : GetNewApplicationResponse ): Int = {
110
- val minResMemory = newApp.getMinimumResourceCapability().getMemory()
111
- val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
112
- ((if ((args.amMemory % minResMemory) == 0 ) 0 else minResMemory) - memoryOverhead)
113
- amMemory
114
- }
115
-
116
- /** */
117
113
override def setupSecurityToken (amContainer : ContainerLaunchContext ): Unit = {
118
114
val dob = new DataOutputBuffer ()
119
115
credentials.writeTokenStorageToStream(dob)
120
116
amContainer.setContainerTokens(ByteBuffer .wrap(dob.getData()))
121
117
}
122
118
119
+ /**
120
+ * Return the amount of memory for launching the ApplicationMaster container (MB).
121
+ * GetNewApplicationResponse#getMinimumResourceCapability does not exist in the stable API.
122
+ */
123
+ override def getAMMemory (newAppResponse : GetNewApplicationResponse ): Int = {
124
+ val minResMemory = newAppResponse.getMinimumResourceCapability().getMemory()
125
+ val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
126
+ ((if ((args.amMemory % minResMemory) == 0 ) 0 else minResMemory) - memoryOverhead)
127
+ amMemory
128
+ }
129
+
123
130
/**
124
131
* Return the security token used by this client to communicate with the ApplicationMaster.
125
132
* If no security is enabled, the token returned by the report is null.
133
+ * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
126
134
*/
127
135
override def getClientToken (report : ApplicationReport ): String =
128
136
Option (report.getClientToken).getOrElse(" " )
0 commit comments