Skip to content

Commit

Permalink
further updates
Browse files Browse the repository at this point in the history
  • Loading branch information
cpelley committed Oct 11, 2024
1 parent 870d2ad commit 581b5a5
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 24 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ This demonstrates:
- Passing data in memory.
- Execution with our chosen scheduler.

Our networkx graph is constructed from a list of edges (see official [edge](https://networkx.org/documentation/stable/reference/glossary.html#term-edge) definition) and settings. The former defines the connection between 'nodes' (see official [node](https://networkx.org/documentation/stable/reference/glossary.html#term-node) definition), while the later defines a lookup between node and the nodes attributes (see official [node attributes](https://networkx.org/documentation/stable/reference/glossary.html#term-node-attribute) definition). It is the nodes attributes that instruct DAGRunner on how to execute that given 'node'. In short, these attributes are passed directly to DAGRunner's [plugin-executor](https://github.com/MetOffice/dagrunner/blob/main/docs/dagrunner.execute_graph.md#function-plugin_executor) function (by default, see [Customise node execution](#customise-node-execution)). This takes a 'call' argument what plugin to be called along with any keyword arguments.

### Defining a custom processing module (plugin)

First, ensure that 'dagrunner' is on the `PYTHONPATH` (i.e. [installation](#installation)).
Expand Down
23 changes: 14 additions & 9 deletions dagrunner/execute_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@ def plugin_executor(
Args:
- `*args`: Positional arguments to be passed to the plugin callable.
- `call`: A tuple containing the callable object or python dot path to one, keyword
arguments to instantiate this class (optional and where this callable is a class)
and finally the keyword arguments to be passed to this callable.
- `call`: A tuple containing the callable object (plugin) or python dot path to one
and optionally keyword arguments on instantiating and calling to that plugin:
- `(CallableClass, kwargs_init, kwargs_call)` -> `CallableClass(**kwargs_init)(*args, **kwargs_call)`
- `(CallableClass, {}, kwargs_call)` -> `CallableClass()(*args, **kwargs_call)`
- `(CallableClass)` - `CallableClass()(*args)`
- `(callable, kwargs)` -> `callable(*args, **kwargs)`
- `(callable)` -> `callable(*args)`
- `verbose`: A boolean indicating whether to print verbose output.
- `dry_run`: A boolean indicating whether to perform a dry run without executing
the plugin.
Expand All @@ -119,7 +123,7 @@ def plugin_executor(
Raises:
- ValueError: If the `call` argument is not provided.
"""
""" # noqa: E501
logger.client_attach_socket_handler()

if common_kwargs is None:
Expand Down Expand Up @@ -269,7 +273,7 @@ def __init__(
networkx_graph: str,
networkx_graph_kwargs: dict = None,
plugin_executor: callable = plugin_executor,
scheduler: str = "processes",
scheduler: str = "multiprocessing",
num_workers: int = 1,
profiler_filepath: str = None,
dry_run: bool = False,
Expand All @@ -281,11 +285,12 @@ def __init__(
Args:
- `networkx_graph` (networkx.DiGraph, callable or str):
A networkx graph; dot path to a networkx graph or callable that returns
one; tuple representing (edges, nodes) or callable object that
returns a networkx.
Python dot path to a `networkx.DiGraph` or tuple(edges, settings) object, or
callable that returns one. When called via the library, we support passing
the `networkx.DiGraph` or `tuple(edges, settings)` objects directly.
- `networkx_graph_kwargs` (dict):
Keyword arguments to pass to the networkx graph callable. Optional.
Keyword arguments to pass to the `networkx_graph` when it represents a
callable. Optional.
- `plugin_executor` (callable):
A callable object that executes a plugin function or method with the provided
arguments and keyword arguments. By default, uses the `plugin_executor`
Expand Down
33 changes: 19 additions & 14 deletions docs/dagrunner.execute_graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ see [function: dagrunner.utils.visualisation.visualise_graph](dagrunner.utils.vi

## class: `ExecuteGraph`

[Source](../dagrunner/execute_graph.py#L266)
[Source](../dagrunner/execute_graph.py#L270)

### Call Signature:

```python
ExecuteGraph(networkx_graph: str, networkx_graph_kwargs: dict = None, <function plugin_executor>, scheduler: str = 'processes', num_workers: int = 1, profiler_filepath: str = None, dry_run: bool = False, verbose: bool = False, **kwargs)
ExecuteGraph(networkx_graph: str, networkx_graph_kwargs: dict = None, <function plugin_executor>, scheduler: str = 'multiprocessing', num_workers: int = 1, profiler_filepath: str = None, dry_run: bool = False, verbose: bool = False, **kwargs)
```

### function: `__call__`

[Source](../dagrunner/execute_graph.py#L363)
[Source](../dagrunner/execute_graph.py#L368)

#### Call Signature:

Expand All @@ -38,23 +38,24 @@ Call self as a function.

### function: `__init__`

[Source](../dagrunner/execute_graph.py#L267)
[Source](../dagrunner/execute_graph.py#L271)

#### Call Signature:

```python
__init__(self, networkx_graph: str, networkx_graph_kwargs: dict = None, <function plugin_executor>, scheduler: str = 'processes', num_workers: int = 1, profiler_filepath: str = None, dry_run: bool = False, verbose: bool = False, **kwargs)
__init__(self, networkx_graph: str, networkx_graph_kwargs: dict = None, <function plugin_executor>, scheduler: str = 'multiprocessing', num_workers: int = 1, profiler_filepath: str = None, dry_run: bool = False, verbose: bool = False, **kwargs)
```

Execute a networkx graph using a chosen scheduler.

Args:
- `networkx_graph` (networkx.DiGraph, callable or str):
A networkx graph; dot path to a networkx graph or callable that returns
one; tuple representing (edges, nodes) or callable object that
returns a networkx.
Python dot path to a `networkx.DiGraph` or tuple(edges, settings) object, or
callable that returns one. When called via the library, we support passing
the `networkx.DiGraph` or `tuple(edges, settings)` objects directly.
- `networkx_graph_kwargs` (dict):
Keyword arguments to pass to the networkx graph callable. Optional.
Keyword arguments to pass to the `networkx_graph` when it represents a
callable. Optional.
- `plugin_executor` (callable):
A callable object that executes a plugin function or method with the provided
arguments and keyword arguments. By default, uses the `plugin_executor`
Expand All @@ -78,7 +79,7 @@ Args:

### function: `visualise`

[Source](../dagrunner/execute_graph.py#L360)
[Source](../dagrunner/execute_graph.py#L365)

#### Call Signature:

Expand Down Expand Up @@ -107,7 +108,7 @@ Status: experimental.

## function: `main`

[Source](../dagrunner/execute_graph.py#L374)
[Source](../dagrunner/execute_graph.py#L379)

### Call Signature:

Expand Down Expand Up @@ -138,9 +139,13 @@ is the `SKIP_EVENT` object.

Args:
- `*args`: Positional arguments to be passed to the plugin callable.
- `call`: A tuple containing the callable object or python dot path to one, keyword
arguments to instantiate this class (optional and where this callable is a class)
and finally the keyword arguments to be passed to this callable.
- `call`: A tuple containing the callable object (plugin) or python dot path to one
and optionally keyword arguments on instantiating and calling to that plugin:
- `(CallableClass, kwargs_init, kwargs_call)` -> `CallableClass(**kwargs_init)(*args, **kwargs_call)`
- `(CallableClass, {}, kwargs_call)` -> `CallableClass()(*args, **kwargs_call)`
- `(CallableClass)` - `CallableClass()(*args)`
- `(callable, kwargs)` -> `callable(*args, **kwargs)`
- `(callable)` -> `callable(*args)`
- `verbose`: A boolean indicating whether to print verbose output.
- `dry_run`: A boolean indicating whether to perform a dry run without executing
the plugin.
Expand Down
1 change: 0 additions & 1 deletion docs/dagrunner.utils.logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ attributes in a LogRecord are described by:
(typically at application startup time)
%(thread)d Thread ID (if available)
%(threadName)s Thread name (if available)
%(taskName)s Task name (if available)
%(process)d Process ID (if available)
%(message)s The result of record.getMessage(), computed just as
the record is emitted
Expand Down

0 comments on commit 581b5a5

Please sign in to comment.