Skip to content

Commit

Permalink
[FLINK-11533] [container] Throw FlinkParseException in ParserResultFa…
Browse files Browse the repository at this point in the history
…ctory
  • Loading branch information
uce committed Mar 1, 2019
1 parent bd60104 commit b54fb2e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

Expand Down Expand Up @@ -75,11 +76,10 @@ public Options getOptions() {
}

@Override
public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) {
public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) throws FlinkParseException {
final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortString);
final int restPort = getRestPort(commandLine);
final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
Expand All @@ -96,11 +96,28 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
jobId);
}

private static JobID getJobId(CommandLine commandLine) {
private int getRestPort(CommandLine commandLine) throws FlinkParseException {
final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
try {
return Integer.parseInt(restPortString);
} catch (NumberFormatException e) {
throw flinkParseException(REST_PORT_OPTION, e);
}
}

private static JobID getJobId(CommandLine commandLine) throws FlinkParseException {
String jobId = commandLine.getOptionValue(JOB_ID_OPTION.getOpt());
if (jobId == null) {
return DEFAULT_JOB_ID;
}
return JobID.fromHexString(jobId);
try {
return JobID.fromHexString(jobId);
} catch (IllegalArgumentException e) {
throw flinkParseException(JOB_ID_OPTION, e);
}
}

private static FlinkParseException flinkParseException(Option option, Exception cause) {
return new FlinkParseException(String.format("Failed to parse '--%s' option", option.getLongOpt()), cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Optional;
import java.util.Properties;

import static org.apache.flink.container.entrypoint.StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID;
Expand All @@ -35,6 +37,7 @@
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
Expand Down Expand Up @@ -115,15 +118,17 @@ public void testSetJobIdManually() throws FlinkParseException {
}

@Test
public void testInvalidJobIdThrows() throws FlinkParseException {
public void testInvalidJobIdThrows() {
final String invalidJobId = "0xINVALID";
final String[] args = {"--configDir", "/foo/bar", "--job-classname", "foobar", "--job-id", invalidJobId};

try {
commandLineParser.parse(args);
fail("Did not throw expected FlinkParseException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString(invalidJobId));
} catch (FlinkParseException e) {
Optional<IllegalArgumentException> cause = ExceptionUtils.findThrowable(e, IllegalArgumentException.class);
assertTrue(cause.isPresent());
assertThat(cause.get().getMessage(), containsString(invalidJobId));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.entrypoint.parser;

import org.apache.flink.runtime.entrypoint.FlinkParseException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

Expand All @@ -43,6 +45,7 @@ public interface ParserResultFactory<T> {
*
* @param commandLine to extract the options from
* @return Result of the parsing
* @throws FlinkParseException Thrown on failures while parsing command line arguments
*/
T createResult(@Nonnull CommandLine commandLine);
T createResult(@Nonnull CommandLine commandLine) throws FlinkParseException;
}

0 comments on commit b54fb2e

Please sign in to comment.