diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index 2b93e92a..6122b08a 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -334,7 +334,7 @@ async def _save_additional_params( }, } - def _get_timeout(self, job: Job, step: Step) -> int | None: + def _get_timeout(self, job: Job) -> int | None: timeout = 0 if isinstance(self.time_limit, int): timeout = self.time_limit @@ -355,7 +355,7 @@ def _get_timeout(self, job: Job, step: Step) -> int | None: ) if timeout and timeout < 0: raise WorkflowDefinitionException( - f"Invalid time limit for step {step.name}: {timeout}. Time limit should be >= 0." + f"Invalid time limit for step {self.step.name}: {timeout}. Time limit should be >= 0." ) elif timeout == 0: return None @@ -824,7 +824,7 @@ async def execute(self, job: Job) -> CWLCommandOutput: else stdout ) # Get timeout - timeout = self._get_timeout(job=job, step=self.step) + timeout = self._get_timeout(job=job) # Execute remote command start_time = time.time_ns() result, exit_code = await connector.run( diff --git a/streamflow/deployment/connector/kubernetes.py b/streamflow/deployment/connector/kubernetes.py index c7bedf17..fd8a2c8c 100644 --- a/streamflow/deployment/connector/kubernetes.py +++ b/streamflow/deployment/connector/kubernetes.py @@ -215,7 +215,7 @@ def __init__( def _configure_incluster_namespace(self): if self.namespace is None: if not os.path.isfile(SERVICE_NAMESPACE_FILENAME): - raise ConfigException("Service namespace file does not exists.") + raise ConfigException("Service namespace file does not exist.") with open(SERVICE_NAMESPACE_FILENAME) as f: self.namespace = f.read() diff --git a/streamflow/deployment/connector/queue_manager.py b/streamflow/deployment/connector/queue_manager.py index b8f9d0f5..01e4e559 100644 --- a/streamflow/deployment/connector/queue_manager.py +++ b/streamflow/deployment/connector/queue_manager.py @@ -389,12 +389,13 @@ def __init__( template_map=files_map, ) self.maxConcurrentJobs: int = maxConcurrentJobs - if logger.isEnabledFor(logging.WARNING): - logger.warning( - "The `maxConcurrentJobs` parameter is set to the default value 1, which prevents " - "multiple jobs to be concurrently submitted to the queue manager. Consider raising " - "this value to improve performance." - ) + if self.maxConcurrentJobs == 1: + if logger.isEnabledFor(logging.WARNING): + logger.warning( + "The `maxConcurrentJobs` parameter is set to the default value 1, which prevents " + "multiple jobs to be concurrently submitted to the queue manager. Consider raising " + "this value to improve performance." + ) self.pollingInterval: int = pollingInterval self.scheduledJobs: MutableSequence[str] = [] self.jobsCache: cachetools.Cache = cachetools.TTLCache( diff --git a/streamflow/deployment/connector/schemas/kubernetes.json b/streamflow/deployment/connector/schemas/kubernetes.json index c594be08..594a6872 100644 --- a/streamflow/deployment/connector/schemas/kubernetes.json +++ b/streamflow/deployment/connector/schemas/kubernetes.json @@ -8,7 +8,7 @@ "items": { "type": "string" }, - "description": "A list of yaml file to deploy. Files will be deployed in direct order and undeplyoed in reverse order" + "description": "A list of yaml file to deploy. Files will be deployed in direct order and undeployed in reverse order" }, "debug": { "type": "boolean", diff --git a/streamflow/deployment/connector/schemas/ssh.json b/streamflow/deployment/connector/schemas/ssh.json index cab7240a..63440d54 100644 --- a/streamflow/deployment/connector/schemas/ssh.json +++ b/streamflow/deployment/connector/schemas/ssh.json @@ -29,7 +29,7 @@ "string", "null" ], - "description": "Path to the SSH key needed to connect with Slurm environment" + "description": "Path to the SSH key needed to connect with the environment" }, "sshKeyPassphraseFile": { "type": [ @@ -133,7 +133,7 @@ }, "sshKey": { "type": "string", - "description": "Path to the SSH key needed to connect with Slurm environment" + "description": "Path to the SSH key needed to connect with the environment" }, "sshKeyPassphraseFile": { "type": "string", diff --git a/streamflow/log_handler.py b/streamflow/log_handler.py index 5906cdf6..1e964398 100644 --- a/streamflow/log_handler.py +++ b/streamflow/log_handler.py @@ -89,7 +89,7 @@ def filter(self, record): def highlight(self, msg): msg = str(msg) - msg_tok = msg.split() + msg_tok = msg.split(" ") for pattern, category in self.patterns.items(): if msg_tok[0] == pattern: if category == 0: diff --git a/streamflow/parser.py b/streamflow/parser.py index 87c9d2a7..58e163dd 100644 --- a/streamflow/parser.py +++ b/streamflow/parser.py @@ -127,7 +127,7 @@ def __call__(self, _parser, namespace, values, option_string=None): "--all", "-a", action="store_true", - help="If true, include all executions of the selected worwflow. " + help="If true, include all executions of the selected workflow. " "If false, include just the last one. (default: false)", ) prov_parser.add_argument( @@ -168,7 +168,7 @@ def __call__(self, _parser, namespace, values, option_string=None): "--all", "-a", action="store_true", - help="If true, include all executions of the selected worwflow. " + help="If true, include all executions of the selected workflow. " "If false, include just the last one. (default: false)", ) report_parser.add_argument( diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index b14cc8db..5fd1ec65 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -514,10 +514,10 @@ async def run(self): ) # Propagate the connector in the output port self.get_output_port().put( - await self.persist_token( + await self._persist_token( token=Token(value=self.deployment_config.name), port=self.get_output_port(), - inputs=_get_token_ids(inputs.values()), + input_token_ids=_get_token_ids(inputs.values()), ) ) else: