-
Notifications
You must be signed in to change notification settings - Fork 167
Update / add task execution hooks #1269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
65ccf5f
87c33c0
68c6eb6
1680fe8
aff7fdf
47d9553
18e6874
c7c4993
7d42e6d
b70e9b2
7c68dce
03d3961
bccc69b
438b500
2ced77e
9f56201
5686ece
66b1093
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| ============================== | ||
| plugins.h_rich.RichProgressBar | ||
| ============================== | ||
|
|
||
| Provides a progress bar for Hamilton execution. Must have `rich` installed to use it: | ||
|
|
||
| `pip install sf-hamilton[rich]` (use quotes if using zsh) | ||
|
|
||
|
|
||
| .. autoclass:: hamilton.plugins.h_rich.RichProgressBar | ||
| :special-members: __init__ | ||
| :members: | ||
| :inherited-members: |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| =================================== | ||
| lifecycle.api.TaskExecutionHook | ||
| =================================== | ||
|
|
||
|
|
||
| .. autoclass:: hamilton.lifecycle.api.TaskExecutionHook | ||
| :special-members: __init__ | ||
| :members: | ||
| :inherited-members: |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| =================================== | ||
| lifecycle.api.TaskGroupingHook | ||
| =================================== | ||
|
|
||
|
|
||
| .. autoclass:: hamilton.lifecycle.api.TaskGroupingHook | ||
| :special-members: __init__ | ||
| :members: | ||
| :inherited-members: |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,10 @@ | |
| # To really fix this we should move everything user-facing out of base, which is a pretty sloppy name for a package anyway | ||
| # And put it where it belongs. For now we're OK with the TYPE_CHECKING hack | ||
| if TYPE_CHECKING: | ||
| from hamilton.execution.grouping import NodeGroupPurpose | ||
| from hamilton.graph import FunctionGraph | ||
| else: | ||
| NodeGroupPurpose = None | ||
|
|
||
| from hamilton.graph_types import HamiltonGraph, HamiltonNode | ||
| from hamilton.lifecycle.base import ( | ||
|
|
@@ -27,6 +30,8 @@ | |
| BasePostGraphExecute, | ||
| BasePostNodeExecute, | ||
| BasePostTaskExecute, | ||
| BasePostTaskExpand, | ||
| BasePostTaskGroup, | ||
| BasePreGraphExecute, | ||
| BasePreNodeExecute, | ||
| BasePreTaskExecute, | ||
|
|
@@ -379,13 +384,17 @@ def pre_task_execute( | |
| nodes: List["node.Node"], | ||
| inputs: Dict[str, Any], | ||
| overrides: Dict[str, Any], | ||
| spawning_task_id: Optional[str], | ||
| purpose: NodeGroupPurpose, | ||
| ): | ||
| self.run_before_task_execution( | ||
| run_id=run_id, | ||
| task_id=task_id, | ||
| nodes=[HamiltonNode.from_node(n) for n in nodes], | ||
| inputs=inputs, | ||
| overrides=overrides, | ||
| spawning_task_id=spawning_task_id, | ||
| purpose=purpose, | ||
| ) | ||
|
|
||
| def post_task_execute( | ||
|
|
@@ -397,6 +406,8 @@ def post_task_execute( | |
| results: Optional[Dict[str, Any]], | ||
| success: bool, | ||
| error: Exception, | ||
| spawning_task_id: Optional[str], | ||
| purpose: NodeGroupPurpose, | ||
| ): | ||
| self.run_after_task_execution( | ||
| run_id=run_id, | ||
|
|
@@ -405,6 +416,8 @@ def post_task_execute( | |
| results=results, | ||
| success=success, | ||
| error=error, | ||
| spawning_task_id=spawning_task_id, | ||
| purpose=purpose, | ||
| ) | ||
|
|
||
| @abc.abstractmethod | ||
|
|
@@ -416,6 +429,8 @@ def run_before_task_execution( | |
| nodes: List[HamiltonNode], | ||
| inputs: Dict[str, Any], | ||
| overrides: Dict[str, Any], | ||
| spawning_task_id: Optional[str], | ||
| purpose: NodeGroupPurpose, | ||
| **future_kwargs, | ||
| ): | ||
| """Implement this to run something after task execution. Tasks are tols used to group nodes. | ||
|
|
@@ -428,6 +443,8 @@ def run_before_task_execution( | |
| :param inputs: Inputs to the task | ||
| :param overrides: Overrides passed to the task | ||
| :param future_kwargs: Reserved for backwards compatibility. | ||
| :param spawning_task_id: ID of the task that spawned this task | ||
| :param purpose: Purpose of the current task group | ||
| """ | ||
| pass | ||
|
|
||
|
|
@@ -441,6 +458,8 @@ def run_after_task_execution( | |
| results: Optional[Dict[str, Any]], | ||
| success: bool, | ||
| error: Exception, | ||
| spawning_task_id: Optional[str], | ||
| purpose: NodeGroupPurpose, | ||
| **future_kwargs, | ||
| ): | ||
| """Implement this to run something after task execution. See note in run_before_task_execution. | ||
|
|
@@ -452,6 +471,8 @@ def run_after_task_execution( | |
| :param success: Whether the task was successful | ||
| :param error: The error the task threw, if any | ||
| :param future_kwargs: Reserved for backwards compatibility. | ||
| :param spawning_task_id: ID of the task that spawned this task | ||
| :param purpose: Purpose of the current task group | ||
| """ | ||
| pass | ||
|
|
||
|
|
@@ -614,6 +635,42 @@ def validate_graph( | |
| return self.run_to_validate_graph(graph=HamiltonGraph.from_graph(graph)) | ||
|
|
||
|
|
||
| class TaskGroupingHook(BasePostTaskGroup, BasePostTaskExpand): | ||
| """Implement this to run something after task grouping or task expansion. This will allow you to | ||
| capture information about the tasks during `Parallelize`/`Collect` blocks in dynamic DAG execution.""" | ||
|
|
||
| @override | ||
| @final | ||
| def post_task_group(self, *, run_id: str, task_ids: List[str]): | ||
| return self.run_after_task_grouping(run_id=run_id, task_ids=task_ids) | ||
|
|
||
| @override | ||
| @final | ||
| def post_task_expand(self, *, run_id: str, task_id: str, parameters: Dict[str, Any]): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Idea -- maybe we allow parameters to be |
||
| return self.run_after_task_expansion(run_id=run_id, task_id=task_id, parameters=parameters) | ||
|
|
||
| @abc.abstractmethod | ||
| def run_after_task_grouping(self, *, run_id: str, task_ids: List[str], **future_kwargs): | ||
| """Hook that is called after task grouping. | ||
| :param run_id: ID of the run, unique in scope of the driver. | ||
| :param task_ids: List of tasks that were grouped together. | ||
| :param future_kwargs: Additional keyword arguments -- this is kept for backwards compatibility. | ||
| """ | ||
| pass | ||
|
|
||
| @abc.abstractmethod | ||
| def run_after_task_expansion( | ||
| self, *, run_id: str, task_id: str, parameters: Dict[str, Any], **future_kwargs | ||
| ): | ||
| """Hook that is called after task expansion. | ||
| :param run_id: ID of the run, unique in scope of the driver. | ||
| :param task_id: ID of the task that was expanded. | ||
| :param parameters: Parameters that were passed to the task. | ||
| :param future_kwargs: Additional keyword arguments -- this is kept for backwards compatibility. | ||
| """ | ||
| pass | ||
|
|
||
|
|
||
| class GraphConstructionHook(BasePostGraphConstruct, abc.ABC): | ||
| """Hook that is run after graph construction. This allows you to register/capture info on the graph. | ||
| Note that, in the case of materialization, this may be called multiple times (once when we create the graph, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, there's a slight design issue here, curious as to your thoughts. The fact that we create a dict with all the parameterization values is not part of the contract -- the idea is we could go to having a generator where they're not all decided for now. Not married to this (it's a bit baked in that it's a list now), but curious what you're planning on using this value for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conversely, it could change to storing the whole set of values as a list regardless of whether it's a generator or not, only if this hook exists -- that could be an implementation detail we could handle later (e.g. add something that materializes it) -- this would allow us to release this now and not change the contract.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's funny, while I was writing this I was concerned about exhausting the parameterization values if it was/became a generator. My immediate use case was to determine the number of expanded tasks, however I thought it might be useful to someone down that road to inspect the expansion results. With that said, I absolutely love the idea of materializing the values only when the hook is defined - I am going to add a comment to the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I think a comment is good here -- it's kind of a fun one. You could also have the hook take in an optional value...