Skip to content

Commit

Permalink
Merge branch '0.1.12' of https://github.com/dyvenia/viadot into 0.1.12
Browse files Browse the repository at this point in the history
  • Loading branch information
trymzet committed Jul 8, 2021
2 parents 3d82d46 + 506d8b3 commit d7a103d
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions viadot/flows/flow_of_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,31 @@ class Pipeline(Flow):
def __init__(
self,
name: str,
project_name: str,
extract_flows_names: List[str],
transform_flow_name: str,
*args: List[any],
**kwargs: Dict[str, Any]
):
self.extract_flows_names = extract_flows_names
self.transform_flow_name = transform_flow_name
self.project_name = project_name
super().__init__(*args, name=name, **kwargs)
self.gen_flow()

def gen_start_flow_run_task(self, flow_name: str, flow: Flow = None) -> Task:
t = start_flow_run_task.bind(flow_name=flow_name, flow=flow)
t = start_flow_run_task.bind(
flow_name=flow_name, project_name=self.project_name, flow=flow
)
return t

def gen_flow(self) -> Flow:
extract_flow_runs = apply_map(
self.gen_start_flow_run_task, self.extract_flows_names, flow=self
)
transform_flow_run = start_flow_run_task_2.bind(
flow_name=self.transform_flow_name, flow=self
flow_name=self.transform_flow_name,
project_name=self.project_name,
flow=self,
)
transform_flow_run.set_upstream(extract_flow_runs, flow=self)

0 comments on commit d7a103d

Please sign in to comment.