2020from ..core import core , job_environment , logger , utils
2121
2222
23+ # pylint: disable=duplicate-code
2324class OarInfoWatcher (core .InfoWatcher ):
2425
2526 submitit_state_mapping = {
@@ -38,7 +39,7 @@ class OarInfoWatcher(core.InfoWatcher):
3839 }
3940
4041 def _make_command (self ) -> tp .Optional [tp .List [str ]]:
41- to_check = { x for x in self ._registered - self ._finished }
42+ to_check = sorted ( set ( self ._registered - self ._finished ))
4243 if not to_check :
4344 return None
4445 command = ["oarstat" , "-f" , "-J" ]
@@ -88,7 +89,7 @@ def __init__(self, folder: tp.Union[Path, str], job_id: str, tasks: tp.Sequence[
8889 if len (tasks ) > 1 :
8990 raise NotImplementedError
9091 super ().__init__ (folder , job_id , tasks )
91- self ._resubmitted_job = None
92+ self ._resubmitted_job : tp . Optional [ OarJob [ core . R ]] = None
9293
9394 def _interrupt (self , timeout : bool = False ) -> None :
9495 """Sends preemption or timeout signal to the job (for testing purpose)
@@ -174,8 +175,6 @@ def _get_resubmitted_job(self) -> tp.Optional["OarJob[core.R]"]:
174175 folder = self ._paths .folder , job_id = resubmitted_job_id , tasks = [0 ]
175176 )
176177 return self ._resubmitted_job
177- else :
178- return None
179178 except Exception as e :
180179 logger .get_logger ().error (
181180 f"Getting error with _get_resubmitted_job() by command { command } :\n "
@@ -216,6 +215,10 @@ class OarExecutor(core.PicklingExecutor):
216215 max_num_timeout: int
217216 Maximum number of time the job can be resubmitted after timeout (if
218217 the instance is derived from helpers.Checkpointable)
218+ python: str
219+ Command to launch python. This allow to use singularity for example.
220+ Caller is responsible to provide a valid shell command here.
221+ By default reuse the current python executable
219222
220223 Note
221224 ----
@@ -231,8 +234,9 @@ class OarExecutor(core.PicklingExecutor):
231234 job_class = OarJob
232235 watcher = OarInfoWatcher (delay_s = 600 )
233236
234- def __init__ (self , folder : tp .Union [Path , str ], max_num_timeout : int = 3 ) -> None :
237+ def __init__ (self , folder : tp .Union [Path , str ], max_num_timeout : int = 3 , python : str = None ) -> None :
235238 super ().__init__ (folder , max_num_timeout = max_num_timeout )
239+ self .python = python
236240 if not self .affinity () > 0 :
237241 raise RuntimeError ('Could not detect "oarsub", are you indeed on a OAR cluster?' )
238242
@@ -259,7 +263,7 @@ def _internal_update_parameters(self, **kwargs: tp.Any) -> None:
259263 Parameters
260264 ----------
261265 See oar documentation for most parameters.
262- Most useful parameters are: core , walltime, gpu, queue.
266+ Most useful parameters are: cores , walltime, gpu, queue.
263267
264268 Below are the parameters that differ from OAR documentation:
265269
@@ -326,7 +330,7 @@ def _internal_process_submissions(
326330 if any (isinstance (d .function , helpers .Checkpointable ) for d in delayed_submissions ) and any (
327331 not isinstance (d .function , helpers .Checkpointable ) for d in delayed_submissions
328332 ):
329- raise Exception (
333+ raise ValueError (
330334 "OarExecutor does not support a job array that mixes checkpointable and non-checkpointable functions."
331335 "\n Please make groups of similar function calls in the job array."
332336 )
@@ -361,9 +365,8 @@ def _internal_process_submissions(
361365
362366 @property
363367 def _submitit_command_str (self ) -> str :
364- return " " .join (
365- [shlex .quote (sys .executable ), "-u -m submitit.core._submit" , shlex .quote (str (self .folder ))]
366- )
368+ python = self .python or shlex .quote (sys .executable )
369+ return " " .join ([python , "-u -m submitit.core._submit" , shlex .quote (str (self .folder ))])
367370
368371 def _num_tasks (self ) -> int :
369372 return 1
@@ -434,7 +437,7 @@ def _make_oarsub_string(
434437 folder : tp .Union [str , Path ],
435438 map_count : tp .Optional [int ] = None , # used internally
436439 nodes : tp .Optional [int ] = None ,
437- core : tp .Optional [int ] = None ,
440+ cores : tp .Optional [int ] = None ,
438441 gpu : tp .Optional [int ] = None ,
439442 walltime : tp .Optional [str ] = None ,
440443 timeout_min : tp .Optional [int ] = None ,
@@ -449,7 +452,7 @@ def _make_oarsub_string(
449452 Parameters
450453 ----------
451454 See oar documentation for most parameters.
452- Most useful parameters are: core , walltime, gpu, queue.
455+ Most useful parameters are: cores , walltime, gpu, queue.
453456
454457 Below are the parameters that differ from OAR documentation:
455458
@@ -474,13 +477,13 @@ def _make_oarsub_string(
474477 # OAR resource hierarchy: nodes > gpu > core
475478 resource_hierarchy = ""
476479 if nodes is not None :
477- resource_hierarchy += "/nodes=%d" % nodes
480+ resource_hierarchy += f "/nodes={ nodes } "
478481 if gpu is not None :
479- resource_hierarchy += "/gpu=%d" % gpu
480- if core is not None :
481- resource_hierarchy += "/core=%d" % core
482+ resource_hierarchy += f "/gpu={ gpu } "
483+ if cores is not None :
484+ resource_hierarchy += f "/core={ cores } "
482485 if walltime is not None :
483- walltime = "walltime=%s" % walltime
486+ walltime = f "walltime={ walltime } "
484487 resource_request = "," .join (filter (None , (resource_hierarchy , walltime )))
485488 if resource_request :
486489 parameters ["l" ] = resource_request
0 commit comments