@@ -114,7 +114,8 @@ public class TezSessionState {
114114 private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator .class .getName ();
115115
116116 private final HiveConf conf ;
117- private Path tezScratchDir ;
117+ @ VisibleForTesting
118+ Path tezScratchDir ;
118119 private LocalResource appJarLr ;
119120 private TezClient session ;
120121 private Future <TezClient > sessionFuture ;
@@ -260,7 +261,8 @@ public void beginOpen(String[] additionalFiles, LogHelper console)
260261 openInternal (additionalFiles , true , console , null );
261262 }
262263
263- protected void openInternal (String [] additionalFilesNotFromConf ,
264+ @ VisibleForTesting
265+ void openInternal (String [] additionalFilesNotFromConf ,
264266 boolean isAsync , LogHelper console , HiveResources resources )
265267 throws IOException , URISyntaxException , TezException {
266268 // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
@@ -272,9 +274,6 @@ protected void openInternal(String[] additionalFilesNotFromConf,
272274 this .queueName = confQueueName ;
273275 this .doAsEnabled = conf .getBoolVar (HiveConf .ConfVars .HIVE_SERVER2_ENABLE_DOAS );
274276
275- final boolean llapMode = "llap" .equalsIgnoreCase (HiveConf .getVar (
276- conf , HiveConf .ConfVars .HIVE_EXECUTION_MODE ));
277-
278277 // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
279278 UserGroupInformation ugi = Utils .getUGI ();
280279 user = ugi .getShortUserName ();
@@ -292,12 +291,37 @@ protected void openInternal(String[] additionalFilesNotFromConf,
292291 LOG .info ("Created new resources: " + this .resources );
293292 }
294293
295- // unless already installed on all the cluster nodes, we'll have to
296- // localize hive-exec.jar as well.
294+ // unless already installed on all the cluster nodes, we'll have to localize hive-exec.jar as well.
297295 appJarLr = createJarLocalResource (utils .getExecJarPathLocal (conf ));
298296
299- // configuration for the application master
300- final Map <String , LocalResource > commonLocalResources = new HashMap <String , LocalResource >();
297+ try {
298+ openInternalUnsafe (isAsync , console );
299+ } catch (Exception e ) {
300+ LOG .info ("Failed to open session, deleting scratch dir to prevent resource leak..." , e );
301+ cleanupScratchDir ();
302+ throw e ;
303+ }
304+ }
305+
306+ /**
307+ * Opens a Tez session without performing a complete rollback/cleanup on failure.
308+ *
309+ * <p><strong>Callers MUST guard this method with try/catch and perform cleanup</strong>
310+ * of partially initialized state (such as localized files in the scratch directory).
311+ * This method is not safe on its own.</p>
312+ *
313+ * @param isAsync whether to open the Tez session asynchronously in a separate thread
314+ * @param console a {@link LogHelper} used to log session startup events
315+ *
316+ * @throws TezException if the session fails to start (including failures during
317+ * container launch or session initialization)
318+ * @throws IOException if local resource localization or I/O setup fails
319+ */
320+ @ VisibleForTesting
321+ void openInternalUnsafe (boolean isAsync , LogHelper console ) throws TezException , IOException {
322+ final Map <String , LocalResource > commonLocalResources = new HashMap <>();
323+ final boolean llapMode = "llap" .equalsIgnoreCase (HiveConf .getVar (conf , HiveConf .ConfVars .HIVE_EXECUTION_MODE ));
324+
301325 commonLocalResources .put (DagUtils .getBaseName (appJarLr ), appJarLr );
302326 for (LocalResource lr : this .resources .localizedResources ) {
303327 commonLocalResources .put (DagUtils .getBaseName (lr ), lr );
@@ -312,7 +336,7 @@ protected void openInternal(String[] additionalFilesNotFromConf,
312336 }
313337
314338 // Create environment for AM.
315- Map <String , String > amEnv = new HashMap <String , String >();
339+ Map <String , String > amEnv = new HashMap <>();
316340 MRHelpers .updateEnvBasedOnMRAMEnv (conf , amEnv );
317341
318342 // and finally we're ready to create and start the session
@@ -383,27 +407,24 @@ protected void openInternal(String[] additionalFilesNotFromConf,
383407 startSessionAndContainers (session , conf , commonLocalResources , tezConfig , false );
384408 this .session = session ;
385409 } else {
386- FutureTask <TezClient > sessionFuture = new FutureTask <>(new Callable <TezClient >() {
387- @ Override
388- public TezClient call () throws Exception {
389- TezClient result = null ;
390- try {
391- result = startSessionAndContainers (
392- session , conf , commonLocalResources , tezConfig , true );
393- } catch (Throwable t ) {
394- // The caller has already stopped the session.
395- LOG .error ("Failed to start Tez session" , t );
396- throw (t instanceof Exception ) ? (Exception )t : new Exception (t );
397- }
398- // Check interrupt at the last moment in case we get cancelled quickly.
399- // This is not bulletproof but should allow us to close session in most cases.
400- if (Thread .interrupted ()) {
401- LOG .info ("Interrupted while starting Tez session" );
402- closeAndIgnoreExceptions (result );
403- return null ;
404- }
405- return result ;
410+ FutureTask <TezClient > sessionFuture = new FutureTask <>(() -> {
411+ TezClient result = null ;
412+ try {
413+ result = startSessionAndContainers (
414+ session , conf , commonLocalResources , tezConfig , true );
415+ } catch (Throwable t ) {
416+ // The caller has already stopped the session.
417+ LOG .error ("Failed to start Tez session" , t );
418+ throw (t instanceof Exception ) ? (Exception )t : new Exception (t );
419+ }
420+ // Check interrupt at the last moment in case we get cancelled quickly.
421+ // This is not bulletproof but should allow us to close session in most cases.
422+ if (Thread .interrupted ()) {
423+ LOG .info ("Interrupted while starting Tez session" );
424+ closeAndIgnoreExceptions (result );
425+ return null ;
406426 }
427+ return result ;
407428 });
408429 new Thread (sessionFuture , "Tez session start thread" ).start ();
409430 // We assume here nobody will try to get session before open() returns.
0 commit comments