2020import org .apache .commons .cli .HelpFormatter ;
2121import org .apache .commons .cli .Options ;
2222import org .apache .commons .cli .ParseException ;
23+ import org .apache .commons .io .FileUtils ;
2324import org .apache .hadoop .yarn .api .records .ApplicationId ;
2425import org .apache .hadoop .yarn .exceptions .YarnException ;
26+ import org .apache .hadoop .yarn .submarine .client .cli .param .ParametersHolder ;
2527import org .apache .hadoop .yarn .submarine .client .cli .param .RunJobParameters ;
28+ import org .apache .hadoop .yarn .submarine .client .cli .param .RunJobParameters .UnderscoreConverterPropertyUtils ;
29+ import org .apache .hadoop .yarn .submarine .client .cli .param .yaml .YamlConfigFile ;
30+ import org .apache .hadoop .yarn .submarine .client .cli .param .yaml .YamlParseException ;
2631import org .apache .hadoop .yarn .submarine .common .ClientContext ;
2732import org .apache .hadoop .yarn .submarine .common .exception .SubmarineException ;
2833import org .apache .hadoop .yarn .submarine .runtimes .common .JobMonitor ;
2934import org .apache .hadoop .yarn .submarine .runtimes .common .JobSubmitter ;
3035import org .apache .hadoop .yarn .submarine .runtimes .common .StorageKeyConstants ;
3136import org .slf4j .Logger ;
3237import org .slf4j .LoggerFactory ;
38+ import org .yaml .snakeyaml .Yaml ;
39+ import org .yaml .snakeyaml .constructor .Constructor ;
3340
41+ import java .io .File ;
42+ import java .io .FileNotFoundException ;
3443import java .io .IOException ;
3544import java .util .HashMap ;
3645import java .util .Map ;
3746
3847public class RunJobCli extends AbstractCli {
3948 private static final Logger LOG =
4049 LoggerFactory .getLogger (RunJobCli .class );
50+ private static final String YAML_PARSE_FAILED = "Failed to parse " +
51+ "YAML config" ;
4152
4253 private Options options ;
4354 private RunJobParameters parameters = new RunJobParameters ();
@@ -51,10 +62,10 @@ public RunJobCli(ClientContext cliContext) {
5162 }
5263
5364 @ VisibleForTesting
54- public RunJobCli (ClientContext cliContext , JobSubmitter jobSubmitter ,
65+ RunJobCli (ClientContext cliContext , JobSubmitter jobSubmitter ,
5566 JobMonitor jobMonitor ) {
5667 super (cliContext );
57- options = generateOptions ();
68+ this . options = generateOptions ();
5869 this .jobSubmitter = jobSubmitter ;
5970 this .jobMonitor = jobMonitor ;
6071 }
@@ -65,6 +76,8 @@ public void printUsages() {
6576
6677 private Options generateOptions () {
6778 Options options = new Options ();
79+ options .addOption (CliConstants .YAML_CONFIG , true ,
80+ "Config file (in YAML format)" );
6881 options .addOption (CliConstants .NAME , true , "Name of the job" );
6982 options .addOption (CliConstants .INPUT_PATH , true ,
7083 "Input of the job, could be local or other FS directory" );
@@ -77,7 +90,7 @@ private Options generateOptions() {
7790 + "exported model is not placed under ${checkpoint_path}"
7891 + "could be local or other FS directory. This will be used to serve." );
7992 options .addOption (CliConstants .N_WORKERS , true ,
80- "Numnber of worker tasks of the job, by default it's 1" );
93+ "Number of worker tasks of the job, by default it's 1" );
8194 options .addOption (CliConstants .N_PS , true ,
8295 "Number of PS tasks of the job, by default it's 0" );
8396 options .addOption (CliConstants .WORKER_RES , true ,
@@ -119,7 +132,7 @@ private Options generateOptions() {
119132 + "uses --" + CliConstants .DOCKER_IMAGE + " as default." );
120133 options .addOption (CliConstants .QUICKLINK , true , "Specify quicklink so YARN"
121134 + "web UI shows link to given role instance and port. When "
122- + "--tensorboard is speciied , quicklink to tensorboard instance will "
135+ + "--tensorboard is specified , quicklink to tensorboard instance will "
123136 + "be added automatically. The format of quick link is: "
124137 + "Quick_link_label=http(or https)://role-name:port. For example, "
125138 + "if want to link to first worker's 7070 port, and text of quicklink "
@@ -149,7 +162,7 @@ private Options generateOptions() {
149162 "by the job under security environment" );
150163 options .addOption (CliConstants .DISTRIBUTE_KEYTAB , false , "Distribute " +
151164 "local keytab to cluster machines for service authentication. If not " +
152- "sepcified , pre-destributed keytab of which path specified by" +
165+ "specified , pre-distributed keytab of which path specified by" +
153166 " parameter" + CliConstants .KEYTAB + " on cluster machines will be " +
154167 "used" );
155168 options .addOption ("h" , "help" , false , "Print help" );
@@ -180,10 +193,10 @@ private void parseCommandLineAndGetRunJobParameters(String[] args)
180193 // Do parsing
181194 GnuParser parser = new GnuParser ();
182195 CommandLine cli = parser .parse (options , args );
183- parameters . updateParametersByParsedCommandline (cli , options ,
184- clientContext );
196+ ParametersHolder parametersHolder = createParametersHolder (cli );
197+ parameters . updateParameters ( parametersHolder , clientContext );
185198 } catch (ParseException e ) {
186- LOG .error ("Exception in parse:" , e .getMessage ());
199+ LOG .error ("Exception in parse: {} " , e .getMessage ());
187200 printUsages ();
188201 throw e ;
189202 }
@@ -195,6 +208,51 @@ private void parseCommandLineAndGetRunJobParameters(String[] args)
195208 replacePatternsInParameters ();
196209 }
197210
211+ private ParametersHolder createParametersHolder (CommandLine cli ) {
212+ String yamlConfigFile =
213+ cli .getOptionValue (CliConstants .YAML_CONFIG );
214+ if (yamlConfigFile != null ) {
215+ YamlConfigFile yamlConfig = readYamlConfigFile (yamlConfigFile );
216+ if (yamlConfig == null ) {
217+ throw new YamlParseException (String .format (
218+ YAML_PARSE_FAILED + ", file is empty: %s" , yamlConfigFile ));
219+ } else if (yamlConfig .getConfigs () == null ) {
220+ throw new YamlParseException (String .format (YAML_PARSE_FAILED +
221+ ", config section should be defined, but it cannot be found in " +
222+ "YAML file '%s'!" , yamlConfigFile ));
223+ }
224+ LOG .info ("Using YAML configuration!" );
225+ return ParametersHolder .createWithCmdLineAndYaml (cli , yamlConfig );
226+ } else {
227+ LOG .info ("Using CLI configuration!" );
228+ return ParametersHolder .createWithCmdLine (cli );
229+ }
230+ }
231+
232+ private YamlConfigFile readYamlConfigFile (String filename ) {
233+ Constructor constructor = new Constructor (YamlConfigFile .class );
234+ constructor .setPropertyUtils (new UnderscoreConverterPropertyUtils ());
235+ try {
236+ LOG .info ("Reading YAML configuration from file: {}" , filename );
237+ Yaml yaml = new Yaml (constructor );
238+ return yaml .loadAs (FileUtils .openInputStream (new File (filename )),
239+ YamlConfigFile .class );
240+ } catch (FileNotFoundException e ) {
241+ logExceptionOfYamlParse (filename , e );
242+ throw new YamlParseException (YAML_PARSE_FAILED +
243+ ", file does not exist!" );
244+ } catch (Exception e ) {
245+ logExceptionOfYamlParse (filename , e );
246+ throw new YamlParseException (
247+ String .format (YAML_PARSE_FAILED + ", details: %s" , e .getMessage ()));
248+ }
249+ }
250+
251+ private void logExceptionOfYamlParse (String filename , Exception e ) {
252+ LOG .error (String .format ("Exception while parsing YAML file %s" , filename ),
253+ e );
254+ }
255+
198256 private void setDefaultDirs () throws IOException {
199257 // Create directories if needed
200258 String jobDir = parameters .getCheckpointPath ();
@@ -248,8 +306,7 @@ private void storeJobInformation(String jobName, ApplicationId applicationId,
248306
249307 @ Override
250308 public int run (String [] args )
251- throws ParseException , IOException , YarnException , InterruptedException ,
252- SubmarineException {
309+ throws ParseException , IOException , YarnException , SubmarineException {
253310 if (CliUtils .argsForHelp (args )) {
254311 printUsages ();
255312 return 0 ;
0 commit comments