Skip to content

Commit

Permalink
fixed typo attribute name
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jan 30, 2024
1 parent 247dea6 commit 9dc1fc9
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions streamflow/workflow/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,15 +1066,15 @@ async def run(self):
class LoopCombinatorStep(CombinatorStep):
def __init__(self, name: str, workflow: Workflow, combinator: Combinator):
super().__init__(name, workflow, combinator)
self.iteration_terminaton_checklist: MutableMapping[str, set[str]] = {}
self.iteration_termination_checklist: MutableMapping[str, set[str]] = {}

async def run(self):
# Set default status to SKIPPED
status = Status.SKIPPED
if self.input_ports:
input_tasks, terminated = [], []
for port_name, port in self.get_input_ports().items():
self.iteration_terminaton_checklist[port_name] = set()
self.iteration_termination_checklist[port_name] = set()
input_tasks.append(
asyncio.create_task(
port.get(posixpath.join(self.name, port_name)), name=port_name
Expand All @@ -1098,13 +1098,13 @@ async def run(self):
terminated.append(task_name)
# If an IterationTerminationToken is received, mark the corresponding iteration as terminated
elif check_iteration_termination(token):
if token.tag in self.iteration_terminaton_checklist[task_name]:
if token.tag in self.iteration_termination_checklist[task_name]:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Step {self.name} received iteration termination token {token.tag} "
f"for port {task_name}"
)
self.iteration_terminaton_checklist[task_name].remove(
self.iteration_termination_checklist[task_name].remove(
token.tag
)
# Otherwise, build combination and set default status to COMPLETED
Expand All @@ -1117,9 +1117,9 @@ async def run(self):
status = Status.COMPLETED
if (
".".join(token.tag.split(".")[:-1])
not in self.iteration_terminaton_checklist[task_name]
not in self.iteration_termination_checklist[task_name]
):
self.iteration_terminaton_checklist[task_name].add(
self.iteration_termination_checklist[task_name].add(
token.tag
)

Expand All @@ -1138,8 +1138,8 @@ async def run(self):
)
# Create a new task in place of the completed one if the port is not terminated
if not (
task_name in terminated
and len(self.iteration_terminaton_checklist[task_name]) == 0
task_name in terminated
and len(self.iteration_termination_checklist[task_name]) == 0
):
input_tasks.append(
asyncio.create_task(
Expand Down

0 comments on commit 9dc1fc9

Please sign in to comment.