Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs: large update to the concepts/ section #702

Merged
merged 17 commits into from
Feb 18, 2024
2 changes: 1 addition & 1 deletion data_quality.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ from hamilton.function_modifiers import check_output

@check_output(
data_type=np.int64,
data_in_range=(0,100),
range=(0,100),
importance="warn",
)
def some_int_data_between_0_and_100() -> pd.Series:
Expand Down
Binary file added docs/_static/execute_b.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/execute_c.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_function-modifiers/config_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_function-modifiers/config_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_function-modifiers/custom_viz.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_function-modifiers/load_from.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_function-modifiers/save_to.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_function-modifiers/schema.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions docs/concepts/_snippets/data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import dataclasses
from os import PathLike
from typing import Any, Collection, Dict, Tuple, Type, Union

import xgboost

from hamilton.io import utils
from hamilton.io.data_adapters import DataLoader


@dataclasses.dataclass
class XGBoostJsonReader(DataLoader):
path: Union[str, bytearray, PathLike]

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [xgboost.XGBModel]

def load_data(self, type_: Type) -> Tuple[xgboost.XGBModel, Dict[str, Any]]:
# uses the XGBoost library
model = type_()
model.load_model(self.path)
metadata = utils.get_file_metadata(self.path)
return model, metadata

@classmethod
def name(cls) -> str:
return "json" # the name for `from_.{name}`
26 changes: 26 additions & 0 deletions docs/concepts/_snippets/data_saver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import dataclasses
from os import PathLike
from typing import Any, Collection, Dict, Type, Union

import xgboost

from hamilton.io import utils
from hamilton.io.data_adapters import DataSaver


@dataclasses.dataclass
class XGBoostJsonWriter(DataSaver):
path: Union[str, PathLike]

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [xgboost.XGBModel]

def save_data(self, data: xgboost.XGBModel) -> Dict[str, Any]:
# uses the XGBoost library
data.save_model(self.path)
return utils.get_file_metadata(self.path)

@classmethod
def name(cls) -> str:
return "json" # the name for `to.{name}`
Binary file added docs/concepts/_snippets/driver_ctx.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions docs/concepts/_snippets/driver_ctx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pandas as pd
import xgboost


def preprocessed_df(raw_df: pd.DataFrame) -> pd.DataFrame:
"""preprocess raw data"""
return ...


def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
"""Train model on preprocessed data"""
return ...


if __name__ == "__main__":
import __main__

from hamilton import driver

dr = driver.Builder().with_modules(__main__).build()

data_path = "..."
model_dir = "..."
inputs = dict(raw_df=pd.read_parquet(data_path))
final_vars = ["model"]

results = dr.execute(final_vars, inputs=inputs)
results["model"].save_model(f"{model_dir}/model.json")
Binary file added docs/concepts/_snippets/materializer_ctx.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
33 changes: 33 additions & 0 deletions docs/concepts/_snippets/materializer_ctx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pandas as pd
import xgboost


def preprocessed_df(raw_df: pd.DataFrame) -> pd.DataFrame:
"""preprocess raw data"""
return ...


def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
"""Train model on preprocessed data"""
return ...


if __name__ == "__main__":
import __main__

from hamilton import driver
from hamilton.io.materialization import from_, to

# this registers DataSaver and DataLoader objects
from hamilton.plugins import pandas_extensions, xgboost_extensions # noqa: F401

dr = driver.Builder().with_modules(__main__).build()

data_path = "..."
model_dir = "..."
materializers = [
from_.parquet(path=data_path, target="raw_df"),
to.json(path=f"{model_dir}/model.json", dependencies=["model"], id="model__json"),
]

dr.materialize(*materializers)
Binary file added docs/concepts/_snippets/node_ctx.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
38 changes: 38 additions & 0 deletions docs/concepts/_snippets/node_ctx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pandas as pd
import xgboost


def raw_df(data_path: str) -> pd.DataFrame:
"""Load raw data from parquet file"""
return pd.read_parquet(data_path)


def preprocessed_df(raw_df: pd.DataFrame) -> pd.DataFrame:
"""preprocess raw data"""
return ...


def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
"""Train model on preprocessed data"""
return ...


def save_model(model: xgboost.XGBModel, model_dir: str) -> None:
"""Save trained model to JSON format"""
model.save_model(f"{model_dir}/model.json")


if __name__ == "__main__":
import __main__

from hamilton import driver

dr = driver.Builder().with_modules(__main__).build()

data_path = "..."
model_dir = "..."
inputs = dict(data_path=data_path, model_dir=model_dir)
final_vars = ["save_model"]

results = dr.execute(final_vars, inputs=inputs)
# results["save_model"] == None
Binary file added docs/concepts/_visualization/between.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_visualization/custom_style.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_visualization/display_all.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_visualization/downstream.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_visualization/execution.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_visualization/materialization.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/concepts/_visualization/upstream.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 0 additions & 21 deletions docs/concepts/best-practices/function-modifiers.rst

This file was deleted.

1 change: 0 additions & 1 deletion docs/concepts/best-practices/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ A set of best-practices to help you get the most out of Hamilton quickly and eas
function-naming
migrating-to-hamilton
code-organization
function-modifiers
common-indices
output-immutability
using-within-your-etl-system
Expand Down
169 changes: 169 additions & 0 deletions docs/concepts/builder.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#######
Builder
#######

The :doc:`driver` page covered the basics of building the Driver, visualizing the dataflow, and executing the dataflow. We learned how to create the dataflow by passing a Python module to ``Builder().with_modules()``.

On this page, how to configure your Driver with the ``driver.Builder()``. There will be mentions of advanced concepts, which are further explained on their respective page.

.. note::

As your Builder code grows complex, defining it over multiple lines can improve readability. This is possible by using parentheses after the assignment ``=``

.. code-block:: python

dr = (
driver.Builder()
.with_modules(my_dataflow)
.build()
)

The order of Builder statements doesn't matter as long as ``.build()`` is last.


with_modules()
--------------

This passes dataflow modules to the Driver. When passing multiple modules, the Driver assembles them into a single dataflow.

.. code-block:: python

# my_dataflow.py
def A() -> int:
"""Constant value 35"""
return 35

def B(A: int) -> float:
"""Divide A by 3"""
return A / 3

.. code-block:: python

# my_other_dataflow.py
def C(A: int, B: float) -> float:
"""Square A and multiply by B"""
return A**2 * B

.. code-block:: python

# run.py
from hamilton import driver
import my_dataflow
import my_other_dataflow

dr = driver.Builder().with_modules(my_dataflow, my_other_dataflow).build()

.. image:: ../_static/abc_basic.png
:align: center


It encourages organizing code into logical modules (e.g., feature processing, model training, model evaluation). ``features.py`` might depend on PySpark and ``model_training.py`` on XGBoost. By organizing modules by dependencies, it's easier to reuse the XGBoost model training module in a project that doesn't use PySpark and avoid version conflicts.

.. code-block:: python

# run.py
from hamilton import driver
import features
import model_training
import model_evaluation

dr = (
driver.Builder()
.with_modules(features, model_training, model_evaluation)
.build()
)


with_config()
-------------

This is directly related to the ``@config`` function decorator (see :ref:`config-decorators`) and doesn't have any effect in its absence. By passing a dictionary to ``with_config()``, you configure which functions will be used to create the dataflow. You can't change the config after the Driver is created. Instead, you need to rebuild the Driver with the new config values.

.. code-block:: python

# my_dataflow.py
from hamilton.function_modifiers import config

def A() -> int:
"""Constant value 35"""
return 35

@config.when_not(version="remote")
def B__default(A: int) -> float:
"""Divide A by 3"""
return A / 3

@config.when(version="remote")
def B__remote(A: int) -> float:
"""Divide A by 2"""
return A / 2

.. code-block:: python

# run.py
from hamilton import driver
import my_dataflow

dr = (
driver.Builder()
.with_modules(my_dataflow)
.with_config(dict(version="remote"))
.build()
)

with_adapters()
---------------

This allows to add multiple Lifecycle hooks to the Driver. This is a very flexible abstraction to develop custom plugins to do logging, telemetry, alerts, and more. The following adds a hook to launch debugger when reaching the node ``"B"``:

.. code-block:: python

# run.py
from hamilton import driver, lifecycle
import my_dataflow

debug_hook = lifecycle.default.PDBDebugger(node_filter="B", during=True)
dr = (
driver.Builder()
.with_modules(my_dataflow)
.with_adapters(debug_hook)
.build()
)

Other hooks are available to output a progress bar in the terminal, do experiment tracking for your Hamilton runs, cache results to disk, send logs to DataDog, and more!

enable_dynamic_execution()
--------------------------

This directly relates to the Builder ``with_local_executor()`` and ``with_remote_executor()`` and the ``Parallelizable/Collect`` functions (see :doc:`parallel-task`). For the Driver to be able to parse them, you need to set ``allow_experimental_mode=True`` like the following:

.. code-block:: python

# run.py
from hamilton import driver
import my_dataflow # <- this contains Parallelizable/Collect nodes

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True) # set True
.with_modules(my_dataflow)
.build()
)

By enabling dynamic execution, reasonable defaults are used for local and remote executors. You also specify them explicitly as such:

.. code-block:: python

# run.py
from hamilton import driver
from hamilton.execution import executors
import my_dataflow

dr = (
driver.Builder()
.with_modules(my_dataflow)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
.build()
)
Loading