|
19 | 19 |
|
20 | 20 | import java.io.File;
|
21 | 21 | import java.io.IOException;
|
22 |
| -import java.lang.reflect.Method; |
23 |
| -import java.net.URL; |
24 |
| -import java.net.URLClassLoader; |
25 | 22 | import java.util.ArrayList;
|
26 | 23 | import java.util.HashMap;
|
27 | 24 | import java.util.List;
|
28 | 25 | import java.util.Map;
|
29 | 26 | import java.util.Properties;
|
30 |
| -import java.util.concurrent.atomic.AtomicLong; |
31 | 27 |
|
32 | 28 | /**
|
33 | 29 | * Launcher for Spark applications.
|
|
43 | 39 | */
|
44 | 40 | public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
|
45 | 41 |
|
46 |
| - private static final AtomicLong THREAD_ID = new AtomicLong(); |
47 |
| - |
48 | 42 | protected boolean verbose;
|
49 | 43 | protected String appName;
|
50 | 44 | protected String master;
|
@@ -139,78 +133,6 @@ public SparkLauncher setVerbose(boolean verbose) {
|
139 | 133 | return this;
|
140 | 134 | }
|
141 | 135 |
|
142 |
| - /** |
143 |
| - * Starts a new thread that will run the Spark application. |
144 |
| - * <p/> |
145 |
| - * The application will run on a separate thread and use a separate, isolated class loader. |
146 |
| - * No classes or resources from the current thread's class loader will be visible to the app. |
147 |
| - * <p/> |
148 |
| - * This mode does not support certain configuration parameters, like configuring the amount of |
149 |
| - * driver memory or custom driver command line options. If such configuration is detected, an |
150 |
| - * exception will be thrown. |
151 |
| - * <p/> |
152 |
| - * This is extremely experimental and should not be used in production environments. |
153 |
| - * <p/> |
154 |
| - * NOTE: SparkSubmit uses system properties to propagate some configuration value to the app |
155 |
| - * are run concurrently, they may affect each other's configurations. |
156 |
| - * <p/> |
157 |
| - * NOTE: for users running JDK versions older than 8, this option can add a lot of overhead |
158 |
| - * to the VM's perm gen. |
159 |
| - * |
160 |
| - * @param exceptionHandler Optional handler for handling exceptions in the app thread. |
161 |
| - * @param daemon Whether to start a daemon thread. |
162 |
| - * @return A non-daemon thread that will run the application using SparkSubmit. The thread will |
163 |
| - * already be started. |
164 |
| - */ |
165 |
| - public Thread start(Thread.UncaughtExceptionHandler handler, boolean daemon) throws IOException { |
166 |
| - // Do some sanity checking that incompatible driver options are not used, because they |
167 |
| - // cannot be set in this mode. |
168 |
| - Properties props = loadPropertiesFile(); |
169 |
| - String extraClassPath = null; |
170 |
| - if (isClientMode(props)) { |
171 |
| - checkState( |
172 |
| - find(DRIVER_EXTRA_JAVA_OPTIONS, conf, props) == null, |
173 |
| - "Cannot set driver VM options when running in-process."); |
174 |
| - checkState( |
175 |
| - find(DRIVER_EXTRA_LIBRARY_PATH, conf, props) == null, |
176 |
| - "Cannot set native library path when running in-process."); |
177 |
| - checkState( |
178 |
| - find(DRIVER_MEMORY, conf, props) == null, |
179 |
| - "Cannot set driver memory when running in-process."); |
180 |
| - extraClassPath = find(DRIVER_EXTRA_CLASSPATH, conf, props); |
181 |
| - } |
182 |
| - |
183 |
| - List<String> cp = buildClassPath(extraClassPath); |
184 |
| - URL[] cpUrls = new URL[cp.size()]; |
185 |
| - int idx = 0; |
186 |
| - for (String entry : cp) { |
187 |
| - cpUrls[idx++] = new File(entry).toURI().toURL(); |
188 |
| - } |
189 |
| - |
190 |
| - URLClassLoader cl = new URLClassLoader(cpUrls, null); |
191 |
| - |
192 |
| - Thread appThread; |
193 |
| - try { |
194 |
| - Class<?> sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit"); |
195 |
| - Method main = sparkSubmit.getDeclaredMethod("main", String[].class); |
196 |
| - List<String> args = buildSparkSubmitArgs(); |
197 |
| - appThread = new Thread(new SparkSubmitRunner(main, args)); |
198 |
| - } catch (ClassNotFoundException cnfe) { |
199 |
| - throw new IOException(cnfe); |
200 |
| - } catch (NoSuchMethodException nsme) { |
201 |
| - throw new IOException(nsme); |
202 |
| - } |
203 |
| - |
204 |
| - appThread.setName("SparkLauncher-Submit-" + THREAD_ID.incrementAndGet()); |
205 |
| - appThread.setContextClassLoader(cl); |
206 |
| - if (handler != null) { |
207 |
| - appThread.setUncaughtExceptionHandler(handler); |
208 |
| - } |
209 |
| - appThread.setDaemon(daemon); |
210 |
| - appThread.start(); |
211 |
| - return appThread; |
212 |
| - } |
213 |
| - |
214 | 136 | /**
|
215 | 137 | * Launches a sub-process that will start the configured Spark application.
|
216 | 138 | *
|
@@ -340,27 +262,4 @@ private boolean isClientMode(Properties userProps) {
|
340 | 262 | (deployMode == null && !userMaster.startsWith("yarn-"));
|
341 | 263 | }
|
342 | 264 |
|
343 |
| - private static class SparkSubmitRunner implements Runnable { |
344 |
| - |
345 |
| - private final Method main; |
346 |
| - private final Object args; |
347 |
| - |
348 |
| - SparkSubmitRunner(Method main, List<String> args) { |
349 |
| - this.main = main; |
350 |
| - this.args = args.toArray(new String[args.size()]); |
351 |
| - } |
352 |
| - |
353 |
| - @Override |
354 |
| - public void run() { |
355 |
| - try { |
356 |
| - main.invoke(null, args); |
357 |
| - } catch (RuntimeException re) { |
358 |
| - throw re; |
359 |
| - } catch (Exception e) { |
360 |
| - throw new RuntimeException(e); |
361 |
| - } |
362 |
| - } |
363 |
| - |
364 |
| - } |
365 |
| - |
366 | 265 | }
|
0 commit comments