Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>String</td>
<td>A string of default JVM options to prepend to <code class="highlighter-rouge">env.java.opts.taskmanager</code>. This is intended to be set by administrators.</td>
</tr>
<tr>
<td><h5>env.java.home</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Location where Java is installed. If not specified, Flink will use your default Java installation.</td>
</tr>
<tr>
<td><h5>env.java.opts.all</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1740,6 +1740,8 @@ public final class ConfigConstants {

// ----------------------------- Environment Variables ----------------------------

public static final String ENV_JAVA_HOME = "JAVA_HOME";

/** The environment variable name which contains the location of the configuration directory. */
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
import static org.apache.flink.configuration.StructuredOptionsSplitter.escapeWithSingleQuote;
Expand Down Expand Up @@ -665,6 +666,29 @@ public static boolean filterPrefixMapKey(String key, String candidate) {
return candidate.startsWith(prefixKey);
}

/**
* Set the JAVA_HOME variable in the provided environment map.
*
* <p>This method follows a specific priority order to determine the JAVA_HOME value:
*
* <ol>
* <li>If the environment map already contains the JAVA_HOME key, the method does nothing.
* <li>Otherwise, it attempts to retrieve JAVA_HOME from the Flink configuration using {@link
* CoreOptions#FLINK_JAVA_HOME}.
* <li>If it isn't found in configuration, it falls back to the system environment variable.
* <li>If a value is found through either source, it is added to the environment map.
* </ol>
*/
public static void setJavaHomeEnv(Configuration configuration, Map<String, String> env) {
if (!env.containsKey(ENV_JAVA_HOME)) {
Optional.ofNullable(
configuration
.getOptional(CoreOptions.FLINK_JAVA_HOME)
.orElse(System.getenv(ENV_JAVA_HOME)))
.ifPresent(javaHomeStr -> env.put(ENV_JAVA_HOME, javaHomeStr));
}
}

static Map<String, String> convertToPropertiesPrefixed(
Map<String, Object> confData, String key, boolean standardYaml) {
final String prefixKey = key + ".";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,17 @@ public static String[] mergeListsToArray(List<String> base, List<String> append)
// process parameters
// ------------------------------------------------------------------------

public static final ConfigOption<String> FLINK_JAVA_HOME =
ConfigOptions.key("env.java.home")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Location where Java is installed. If not specified,"
+ " Flink will use your default Java installation.")
.build());

public static final ConfigOption<String> FLINK_JVM_OPTIONS =
ConfigOptions.key("env.java.opts.all")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.clusterframework;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.ResourceManagerOptions;

import java.util.HashMap;
Expand Down Expand Up @@ -90,6 +91,9 @@ public static ContaineredTaskManagerParameters create(
}
}

// set JAVA_HOME
ConfigurationUtils.setJavaHomeEnv(config, envVars);

// done
return new ContaineredTaskManagerParameters(taskExecutorProcessSpec, envVars);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,9 @@ Map<String, String> generateApplicationMasterEnv(
ConfigurationUtils.getPrefixedKeyValuePairs(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,
this.flinkConfiguration));

// set JAVA_HOME
ConfigurationUtils.setJavaHomeEnv(this.flinkConfiguration, env);
// set Flink app class path
env.put(ENV_FLINK_CLASSPATH, classPathStr);
// Set FLINK_LIB_DIR to `lib` folder under working dir in container
Expand Down
78 changes: 64 additions & 14 deletions flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
Expand All @@ -44,8 +45,11 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -56,6 +60,19 @@ class UtilsTest {
private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";

private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC =
new TaskExecutorProcessSpec(
new CPUResource(1.0),
new MemorySize(0), // frameworkHeapSize
new MemorySize(0), // frameworkOffHeapSize
new MemorySize(111), // taskHeapSize
new MemorySize(0), // taskOffHeapSize
new MemorySize(222), // networkMemSize
new MemorySize(0), // managedMemorySize
new MemorySize(333), // jvmMetaspaceSize
new MemorySize(0), // jvmOverheadSize
Collections.emptyList());

@Test
void testDeleteApplicationFiles(@TempDir Path tempDir) throws Exception {
final Path applicationFilesDir = Files.createTempDirectory(tempDir, ".flink");
Expand Down Expand Up @@ -208,20 +225,8 @@ void testGetYarnConfiguration() {
@Test
void testGetTaskManagerShellCommand() {
final Configuration cfg = new Configuration();
final TaskExecutorProcessSpec taskExecutorProcessSpec =
new TaskExecutorProcessSpec(
new CPUResource(1.0),
new MemorySize(0), // frameworkHeapSize
new MemorySize(0), // frameworkOffHeapSize
new MemorySize(111), // taskHeapSize
new MemorySize(0), // taskOffHeapSize
new MemorySize(222), // networkMemSize
new MemorySize(0), // managedMemorySize
new MemorySize(333), // jvmMetaspaceSize
new MemorySize(0), // jvmOverheadSize
Collections.emptyList());
final ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>());
new ContaineredTaskManagerParameters(TASK_EXECUTOR_PROCESS_SPEC, new HashMap<>());

// no logging, with/out krb5
final String java = "$JAVA_HOME/bin/java";
Expand All @@ -238,7 +243,8 @@ void testGetTaskManagerShellCommand() {
+ " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set
final String mainClass = "org.apache.flink.yarn.UtilsTest";
final String dynamicConfigs =
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim();
TaskExecutorProcessUtils.generateDynamicConfigsStr(TASK_EXECUTOR_PROCESS_SPEC)
.trim();
final String basicArgs = "--configDir ./conf";
final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1";
final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
Expand Down Expand Up @@ -674,6 +680,50 @@ void testGenerateJvmOptsString() {
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS));
}

@Test
void testGetTaskManagerEnvsWithJavaHomeSet() {
final Configuration cfg = new Configuration();
cfg.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
final ContaineredTaskManagerParameters containeredParams =
ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
final Map<String, String> envVars = containeredParams.taskManagerEnv();
assertThat(envVars).containsEntry(ENV_JAVA_HOME, "/opt/jdk").containsEntry("key", "val");
}

@Test
void testGetTaskManagerEnvsWithEnvJavaHomeSet() {
final Configuration cfg = new Configuration();
final String newJavaHome = "/usr/lib/jvm/java-openjdk-17";
final Map<String, String> oldEnv = System.getenv();
try {
Map<String, String> newEnv = new HashMap<>(System.getenv());
newEnv.put(ConfigConstants.ENV_JAVA_HOME, newJavaHome);
CommonTestUtils.setEnv(newEnv);

cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
final ContaineredTaskManagerParameters containeredParams =
ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
final Map<String, String> envVars = containeredParams.taskManagerEnv();
assertThat(envVars)
.containsEntry(ENV_JAVA_HOME, newJavaHome)
.containsEntry("key", "val");
} finally {
CommonTestUtils.setEnv(oldEnv);
}
}

@Test
void testGetTaskManagerEnvsWithoutJavaHomeSet() {
final Configuration cfg = new Configuration();
final String origJavaHome = System.getenv(ConfigConstants.ENV_JAVA_HOME);
cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
final ContaineredTaskManagerParameters containeredParams =
ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
final Map<String, String> envVars = containeredParams.taskManagerEnv();
assertThat(envVars.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(origJavaHome);
}

private static void verifyUnitResourceVariousSchedulers(
YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) {
yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,11 +921,14 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
final String fakeLocalFlinkJar = "./lib/flink_dist.jar";
final String fakeClassPath = fakeLocalFlinkJar + ":./usrlib/user.jar";
final ApplicationId appId = ApplicationId.newInstance(0, 0);
final Configuration flinkConfig = new Configuration();
flinkConfig.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
final Map<String, String> masterEnv =
getTestMasterEnv(
new Configuration(), flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);
flinkConfig, flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);

assertThat(masterEnv)
.containsEntry(ConfigConstants.ENV_JAVA_HOME, "/opt/jdk")
.containsEntry(ConfigConstants.ENV_FLINK_LIB_DIR, "./lib")
.containsEntry(YarnConfigKeys.ENV_APP_ID, appId.toString())
.containsEntry(
Expand All @@ -940,6 +943,44 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
.containsEntry(YarnConfigKeys.ENV_CLIENT_HOME_DIR, flinkHomeDir.getPath());
}

@Test
public void testContainerEnvJavaHomeNotOverriddenByDefault(@TempDir File flinkHomeDir)
throws IOException {
final Configuration flinkConfig = new Configuration();
final String origJavaHome = System.getenv(ConfigConstants.ENV_JAVA_HOME);
final Map<String, String> masterEnv =
getTestMasterEnv(
flinkConfig,
flinkHomeDir,
"",
"./lib/flink_dist.jar",
ApplicationId.newInstance(0, 0));
assertThat(masterEnv.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(origJavaHome);
}

@Test
public void testContainerEnvJavaHomeNewValue(@TempDir File flinkHomeDir) throws IOException {
final Configuration flinkConfig = new Configuration();
final String newJavaHome = "/usr/lib/jvm/java-openjdk-17";
final Map<String, String> oldEnv = System.getenv();

try {
Map<String, String> newEnv = new HashMap<>(System.getenv());
newEnv.put(ConfigConstants.ENV_JAVA_HOME, newJavaHome);
CommonTestUtils.setEnv(newEnv);
final Map<String, String> masterEnv =
getTestMasterEnv(
flinkConfig,
flinkHomeDir,
"",
"./lib/flink_dist.jar",
ApplicationId.newInstance(0, 0));
assertThat(masterEnv.get(ConfigConstants.ENV_JAVA_HOME)).isEqualTo(newJavaHome);
} finally {
CommonTestUtils.setEnv(oldEnv);
}
}

@Test
public void testEnvFlinkLibDirVarNotOverriddenByContainerEnv(@TempDir File tmpDir)
throws IOException {
Expand Down
Loading