You can use the before_node_run
and after_node_run
Hooks to add extra behavior before and after a node's execution. Furthermore, you can apply extra behavior to not only an individual node or an entire Kedro pipeline, but also to a subset of nodes, based on their tags or namespaces: for example, suppose we want to add the following extra behavior to a node:
from kedro.pipeline.node import Node
def say_hello(node: Node):
"""An extra behaviour for a node to say hello before running."""
print(f"Hello from {node.name}")
Then you can either add it to a single node based on the node's name:
# src/<package_name>/hooks.py
from kedro.framework.hooks import hook_impl
from kedro.pipeline.node import Node
class ProjectHooks:
@hook_impl
def before_node_run(self, node: Node):
# adding extra behaviour to a single node
if node.name == "hello":
say_hello(node)
Or add it to a group of nodes based on their tags:
# src/<package_name>/hooks.py
from kedro.framework.hooks import hook_impl
from kedro.pipeline.node import Node
class ProjectHooks:
@hook_impl
def before_node_run(self, node: Node):
if "hello" in node.tags:
say_hello(node)
Or add it to all nodes in the entire pipeline:
# src/<package_name>/hooks.py
from kedro.framework.hooks import hook_impl
from kedro.pipeline.node import Node
class ProjectHooks:
@hook_impl
def before_node_run(self, node: Node):
# adding extra behaviour to all nodes in the pipeline
say_hello(node)
If your use case takes advantage of a decorator, for example to retry a node's execution using a library such as tenacity, you can still decorate the node's function directly:
from tenacity import retry
@retry
def my_flaky_node_function():
...
Or applying it in the before_node_run
Hook as follows:
# src/<package_name>/hooks.py
from tenacity import retry
from kedro.framework.hooks import hook_impl
from kedro.pipeline.node import Node
class ProjectHooks:
@hook_impl
def before_node_run(self, node: Node):
# adding retrying behaviour to nodes tagged as flaky
if "flaky" in node.tags:
node.func = retry(node.func)
We recommend using the before_dataset_loaded
/after_dataset_loaded
and before_dataset_saved
/after_dataset_saved
Hooks to customise the dataset load
and save
methods where appropriate.
For example, you can add logging about the dataset load runtime as follows:
import logging
import time
from typing import Any
from kedro.framework.hooks import hook_impl
from kedro.pipeline.node import Node
class LoggingHook:
"""A hook that logs how many time it takes to load each dataset."""
def __init__(self):
self._timers = {}
@property
def _logger(self):
return logging.getLogger(__name__)
@hook_impl
def before_dataset_loaded(self, dataset_name: str, node: Node) -> None:
start = time.time()
self._timers[dataset_name] = start
@hook_impl
def after_dataset_loaded(self, dataset_name: str, data: Any, node: Node) -> None:
start = self._timers[dataset_name]
end = time.time()
self._logger.info(
"Loading dataset %s before node '%s' takes %0.2f seconds",
dataset_name,
node.name,
end - start,
)
We recommend using the after_context_created
Hook to add credentials to the session's config loader instance from any external credentials manager. In this example we show how to load credentials from Azure KeyVault.
Here is the example KeyVault instance, note the KeyVault and secret names:
These credentials will be used to access these datasets in the data catalog:
weather:
type: spark.SparkDataset
filepath: s3a://your_bucket/data/01_raw/weather*
file_format: csv
credentials: s3_creds
cars:
type: pandas.CSVDataset
filepath: https://your_data_store.blob.core.windows.net/data/01_raw/cars.csv
file_format: csv
credentials: abs_creds
We can then use the following hook implementation to fetch and inject these credentials:
# hooks.py
from kedro.framework.hooks import hook_impl
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
class AzureSecretsHook:
@hook_impl
def after_context_created(self, context) -> None:
keyVaultName = "keyvault-0542abb" # or os.environ["KEY_VAULT_NAME"] if you would like to provide it through environment variables
KVUri = f"https://{keyVaultName}.vault.azure.net"
my_credential = DefaultAzureCredential()
client = SecretClient(vault_url=KVUri, credential=my_credential)
secrets = {
"abs_creds": "azure-blob-store",
"s3_creds": "s3-bucket-creds",
}
azure_creds = {
cred_name: client.get_secret(secret_name).value
for cred_name, secret_name in secrets.items()
}
context.config_loader["credentials"] = {
**context.config_loader["credentials"],
**azure_creds,
}
Finally, register the Hook in settings.py
:
from my_project.hooks import AzureSecretsHook
HOOKS = (AzureSecretsHook(),)
Note: `DefaultAzureCredential()` is Azure's recommended approach to authorise access to data in your storage accounts. For more information, consult the [documentation about how to authenticate to Azure and authorize access to blob data](https://learn.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python).
Use the after_catalog_created
Hook to access metadata
to extend Kedro.
class MetadataHook:
@hook_impl
def after_catalog_created(
self,
catalog: DataCatalog,
):
for dataset_name, dataset in catalog.datasets.__dict__.items():
print(f"{dataset_name} metadata: \n {str(dataset.metadata)}")
You can use Hooks to launch a post-mortem debugging session with pdb
using Kedro Hooks when an error occurs during a pipeline run. ipdb could be integrated in the same manner.
To start a debugging session when an error is raised within your node
that is not caught, implement the on_node_error
Hook specification:
import pdb
import sys
import traceback
from kedro.framework.hooks import hook_impl
class PDBNodeDebugHook:
"""A hook class for creating a post mortem debugging with the PDB debugger
whenever an error is triggered within a node. The local scope from when the
exception occured is available within this debugging session.
"""
@hook_impl
def on_node_error(self):
_, _, traceback_object = sys.exc_info()
# Print the traceback information for debugging ease
traceback.print_tb(traceback_object)
# Drop you into a post mortem debugging session
pdb.post_mortem(traceback_object)
You can then register this PDBNodeDebugHook
in your project's settings.py
:
HOOKS = (PDBNodeDebugHook(),)
To start a debugging session when an error is raised within your pipeline
that is not caught, implement the on_pipeline_error
Hook specification:
import pdb
import sys
import traceback
from kedro.framework.hooks import hook_impl
class PDBPipelineDebugHook:
"""A hook class for creating a post mortem debugging with the PDB debugger
whenever an error is triggered within a pipeline. The local scope from when the
exception occured is available within this debugging session.
"""
@hook_impl
def on_pipeline_error(self):
# We don't need the actual exception since it is within this stack frame
_, _, traceback_object = sys.exc_info()
# Print the traceback information for debugging ease
traceback.print_tb(traceback_object)
# Drop you into a post mortem debugging session
pdb.post_mortem(traceback_object)
You can then register this PDBPipelineDebugHook
in your project's settings.py
:
HOOKS = (PDBPipelineDebugHook(),)