Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ protected void openInternal(String[] additionalFilesNotFromConf,
}
}

/**
* Builds the map of common local resources (app jar + localized resources) for the Tez session.
* Extracted for testability to ensure resources are properly registered.
*/
@VisibleForTesting
Map<String, LocalResource> buildCommonLocalResources() {
final Map<String, LocalResource> commonLocalResources = new HashMap<>();
commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
for (LocalResource lr : this.resources.localizedResources) {
commonLocalResources.put(DagUtils.getBaseName(lr), lr);
}
return commonLocalResources;
}

/**
* Opens a Tez session without performing a complete rollback/cleanup on failure.
*
Expand All @@ -318,14 +332,9 @@ protected void openInternal(String[] additionalFilesNotFromConf,
*/
@VisibleForTesting
void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, IOException {
final Map<String, LocalResource> commonLocalResources = new HashMap<>();
final Map<String, LocalResource> commonLocalResources = buildCommonLocalResources();
final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));

commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
for (LocalResource lr : this.resources.localizedResources) {
commonLocalResources.put(DagUtils.getBaseName(lr), lr);
}

if (llapMode) {
// localize llap client jars
addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -90,4 +91,42 @@ void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) throws
Assert.assertFalse("Scratch dir is not supposed to exist after cleanup: " + scratchDirPath.get(),
Files.exists(Paths.get(scratchDirPath.get())));
}

/**
* Tests whether commonLocalResources is populated with app jar and localized resources when opening
* a Tez session.
*/
@Test
public void testCommonLocalResourcesPopulatedOnSessionOpen() throws Exception {
Path jarPath = Files.createTempFile("test-jar", ".jar");
Files.write(jarPath, "testCommonLocalResourcesPopulated".getBytes(), StandardOpenOption.APPEND);

HiveConf hiveConf = new HiveConfForTest(getClass());
hiveConf.set("hive.security.authorization.manager",
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory");
SessionState.start(hiveConf);

TezSessionState.HiveResources resources =
new TezSessionState.HiveResources(new org.apache.hadoop.fs.Path("/tmp"));

TezSessionState tempSession = new TezSessionState(SessionState.get().getSessionId(), hiveConf);

LocalResource localizedLr = tempSession.createJarLocalResource(jarPath.toUri().toString());
resources.localizedResources.add(localizedLr);

final TezSessionState sessionStateForTest = new TezSessionState(SessionState.get().getSessionId(), hiveConf) {
@Override
void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) {
Map<String, LocalResource> commonLocalResources = buildCommonLocalResources();
Assert.assertEquals("commonLocalResources must contain exactly 2 jars (hive-exec app jar + localized test jar)",
2, commonLocalResources.size());
Assert.assertTrue("commonLocalResources must contain the hive-exec app jar",
commonLocalResources.keySet().stream().anyMatch(k -> k.contains("hive-exec")));
Assert.assertTrue("commonLocalResources must contain the added localized test jar",
commonLocalResources.containsKey(DagUtils.getBaseName(localizedLr)));
}
};

sessionStateForTest.open(resources);
}
}