Skip to content

Commit ba00291

Browse files
committed
refactor: Moves execution to CLI package; cleans up duplicated code
Also adds docs, and outputs error messages to `stderr` instead of `stdout` by marking `err=True` on echo statements.
1 parent a8a4a1d commit ba00291

File tree

3 files changed

+92
-128
lines changed

3 files changed

+92
-128
lines changed

back-end/CLI/cli.py

Lines changed: 92 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,123 @@
1-
import sys
2-
31
import click
4-
import os
5-
import uuid
2+
import json
63

7-
from pyworkflow import Workflow
4+
from pyworkflow import Workflow, WorkflowException
85
from pyworkflow import NodeException
6+
from pyworkflow.nodes import ReadCsvNode, WriteCsvNode
97

108

119
class Config(object):
1210
def __init__(self):
1311
self.verbose = False
1412

13+
1514
pass_config = click.make_pass_decorator(Config, ensure=True)
1615

16+
1717
@click.group()
1818
def cli():
1919
pass
2020

2121

2222
@cli.command()
23-
@click.argument('filename', type=click.Path(exists=True), nargs=-1)
23+
@click.argument('filenames', type=click.Path(exists=True), nargs=-1)
2424
@click.option('--verbose', is_flag=True, help='Enables verbose mode.')
25-
def execute(filename, verbose):
25+
def execute(filenames, verbose):
26+
"""Execute Workflow file(s)."""
27+
# Check whether to log to terminal, or redirect output
28+
log = click.get_text_stream('stdout').isatty()
2629

27-
write_to_stdout = not click.get_text_stream('stdout').isatty()
30+
# Execute each workflow in the args
31+
for workflow_file in filenames:
2832

29-
#execute each one of the workflows in the ar
30-
for workflow_file in filename:
33+
if workflow_file is None:
34+
click.echo('Please specify a workflow to run', err=True)
35+
return
3136

32-
stdin_files = []
37+
if log:
38+
click.echo('Loading workflow file from %s' % workflow_file)
3339

34-
if not click.get_text_stream('stdin').isatty():
35-
stdin_text = click.get_text_stream('stdin')
40+
try:
41+
workflow = open_workflow(workflow_file)
42+
execute_workflow(workflow, log, verbose)
43+
except OSError as e:
44+
click.echo(f"Issues loading workflow file: {e}", err=True)
45+
except WorkflowException as e:
46+
click.echo(f"Issues during workflow execution\n{e}", err=True)
47+
48+
49+
def execute_workflow(workflow, log, verbose):
50+
"""Execute a workflow file, node-by-node.
51+
52+
Retrieves the execution order from the Workflow and iterates through nodes.
53+
If any I/O nodes are present AND stdin/stdout redirection is provided in the
54+
command-line, overwrite the stored options and then replace before saving.
55+
56+
Args:
57+
workflow - Workflow object loaded from file
58+
log - True, for outputting to terminal; False for stdout redirection
59+
verbose - True, for outputting debug information; False otherwise
60+
"""
61+
execution_order = workflow.execution_order()
62+
63+
# Execute each node in the order returned by the Workflow
64+
for node in execution_order:
65+
try:
66+
node_to_execute = workflow.get_node(node)
67+
original_file_option = pre_execute(workflow, node_to_execute, log)
3668

37-
# write standard in to a new file in local filesystem
38-
file_name = str(uuid.uuid4())
69+
if verbose:
70+
print('Executing node of type ' + str(type(node_to_execute)))
3971

40-
# TODO small issue here, might be better to upload this file to the workflow directory instead of cwd
41-
new_file_path = os.path.join(os.getcwd(), file_name)
72+
# perform execution
73+
executed_node = workflow.execute(node)
4274

43-
# read from std in and upload a new file in project directory
44-
with open(new_file_path, 'w') as f:
45-
f.write(stdin_text.read())
75+
# If file was replaced with stdin/stdout, restore original option
76+
if original_file_option is not None:
77+
executed_node.option_values["file"] = original_file_option
4678

47-
stdin_files.append(file_name)
79+
# Update Node in Workflow with changes (saved data file)
80+
workflow.update_or_add_node(executed_node)
81+
except NodeException as e:
82+
click.echo(f"Issues during node execution\n{e}", err=True)
4883

49-
if workflow_file is None:
50-
click.echo('Please specify a workflow to run')
51-
return
52-
try:
53-
if not write_to_stdout:
54-
click.echo('Loading workflow file from %s' % workflow_file)
84+
if verbose:
85+
click.echo('Completed workflow execution!')
5586

56-
Workflow.execute_workflow(workflow_file, stdin_files, write_to_stdout, verbose)
5787

58-
if verbose:
59-
click.echo('Completed workflow execution!')
88+
def pre_execute(workflow, node_to_execute, log):
89+
"""Pre-execution steps, to overwrite file options with stdin/stdout.
90+
91+
If stdin is not a tty, and the Node is ReadCsv, replace file with buffer.
92+
If stdout is not a tty, and the Node is WriteCsv, replace file with buffer.
93+
94+
Args:
95+
workflow - Workflow object loaded from file
96+
node_to_execute - The Node to execute
97+
log - True, for outputting to terminal; False for stdout redirection
98+
"""
99+
stdin = click.get_text_stream('stdin')
100+
101+
if type(node_to_execute) is ReadCsvNode and not stdin.isatty():
102+
new_file_location = stdin
103+
elif type(node_to_execute) is WriteCsvNode and not log:
104+
new_file_location = click.get_text_stream('stdout')
105+
else:
106+
# No file redirection needed
107+
return None
108+
109+
# save original file info
110+
original_file_option = node_to_execute.option_values["file"]
111+
112+
# replace with value from stdin and save
113+
node_to_execute.option_values["file"] = new_file_location
114+
workflow.update_or_add_node(node_to_execute)
115+
116+
return original_file_option
117+
60118

119+
def open_workflow(workflow_file):
120+
with open(workflow_file) as f:
121+
json_content = json.load(f)
61122

62-
except NodeException as ne:
63-
click.echo("Issues during node execution")
64-
click.echo(ne)
123+
return Workflow.from_json(json_content['pyworkflow'])

back-end/pyworkflow/pyworkflow/nodes/io/read_csv.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,3 @@ def execute(self, predecessor_data, flow_vars):
4646
return df.to_json()
4747
except Exception as e:
4848
raise NodeException('read csv', str(e))
49-
50-
def execute_for_read(self, predecessor_data, flow_vars, file_to_read):
51-
try:
52-
fname = file_to_read
53-
sep = self.options["sep"].get_value()
54-
hdr = self.options["header"].get_value()
55-
df = pd.read_csv(fname, sep=sep, header=hdr)
56-
return df.to_json()
57-
except Exception as e:
58-
raise NodeException('read csv', str(e))

back-end/pyworkflow/pyworkflow/workflow.py

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -556,91 +556,6 @@ def to_session_dict(self):
556556
except nx.NetworkXError as e:
557557
raise WorkflowException('to_session_dict', str(e))
558558

559-
def execute_read_csv(self, node_id, csv_location):
560-
# TODO: some duplicated code here from execute common method. Need to refactor.
561-
"""Execute read_csv from a file specified to standard input.
562-
Current use case: CLI.
563-
"""
564-
preceding_data = list()
565-
flow_vars = list()
566-
node_to_execute = self.get_node(node_id)
567-
if node_to_execute is None:
568-
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)
569-
# Pass in data to current Node to use in execution
570-
output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location)
571-
572-
# Save new execution data to disk
573-
node_to_execute.data = Workflow.store_node_data(self, node_id, output)
574-
575-
if node_to_execute.data is None:
576-
raise WorkflowException('execute', 'There was a problem saving node output.')
577-
578-
return node_to_execute
579-
580-
def execute_write_csv(self, node_id):
581-
node_to_execute = self.get_node(node_id)
582-
583-
if node_to_execute is None:
584-
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)
585-
586-
# Load predecessor data and FlowNode values
587-
preceding_data = self.load_input_data(node_to_execute.node_id)
588-
flow_nodes = self.load_flow_nodes(node_to_execute.option_replace)
589-
590-
try:
591-
# Validate input data, and replace flow variables
592-
node_to_execute.validate_input_data(len(preceding_data))
593-
execution_options = node_to_execute.get_execution_options(self, flow_nodes)
594-
# Pass in data to current Node to use in execution
595-
output = node_to_execute.execute(preceding_data, execution_options)
596-
597-
#printing the output in order to include in std_out
598-
print(output)
599-
600-
# Save new execution data to disk
601-
node_to_execute.data = Workflow.store_node_data(self, node_id, output)
602-
except NodeException as e:
603-
raise e
604-
605-
if node_to_execute.data is None:
606-
raise WorkflowException('execute', 'There was a problem saving node output.')
607-
608-
return node_to_execute
609-
610-
@staticmethod
611-
def execute_workflow(workflow_location, stdin_files, write_to_stdout, verbose_mode):
612-
"""Execute entire workflow at a certain location.
613-
Current use case: CLI.
614-
"""
615-
#load the file at workflow_location
616-
with open(workflow_location) as f:
617-
json_content = json.load(f)
618-
619-
#convert it to a workflow
620-
workflow_instance = Workflow.from_json(json_content['pyworkflow'])
621-
622-
#get the execution order
623-
execution_order = workflow_instance.execution_order()
624-
625-
#execute each node in the order returned by execution order method
626-
#TODO exception handling: stop and provide details on which node failed to execute
627-
for node in execution_order:
628-
629-
if verbose_mode:
630-
print('Executing node of type ' + str(type(workflow_instance.get_node(node))))
631-
632-
if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0:
633-
csv_location = stdin_files[0]
634-
executed_node = workflow_instance.execute_read_csv(node, csv_location)
635-
# delete file at index 0
636-
del stdin_files[0]
637-
elif type(workflow_instance.get_node(node)) is WriteCsvNode and write_to_stdout:
638-
executed_node = workflow_instance.execute_write_csv(node)
639-
else:
640-
executed_node = workflow_instance.execute(node)
641-
642-
workflow_instance.update_or_add_node(executed_node)
643-
644559

645560
class WorkflowUtils:
646561
@staticmethod

0 commit comments

Comments
 (0)