Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve kedro run as a package #1423

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import importlib
from pathlib import Path
import sys

from kedro.framework.cli.utils import KedroCliError, load_entry_points
from kedro.framework.project import configure_project
Expand Down Expand Up @@ -36,11 +37,19 @@ def _find_run_command_in_plugins(plugins):
return group.commands["run"]


def main(*args, **kwargs):
def main(**kwargs):
package_name = Path(__file__).parent.name
configure_project(package_name)
run = _find_run_command(package_name)
run(*args, **kwargs)
# This is what happens under the hood of run.__call__. The click context contains
# arguments coming from the CLI command (sys.argv, e.g. ["--pipeline", "ds"]) and
# default values of arguments that are not supplied. We forward the context to the
# invocation of run. Any **kwargs supplied (e.g. `main(pipeline="ds")` will
# overwrite the arguments supplied by the context. Overall this means we have a
# system where the same main function can handle both arguments coming from CLI and
# a Python API.
with run.make_context("run", sys.argv[1:]) as ctx:
return ctx.forward(run, **kwargs)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion kedro/framework/cli/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def run(
node_names = _get_values_as_tuple(node_names) if node_names else node_names

with KedroSession.create(env=env, extra_params=params) as session:
session.run(
return session.run(
tags=tag,
runner=runner(is_async=is_async),
node_names=node_names,
Expand Down
107 changes: 104 additions & 3 deletions kedro/framework/project/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""``kedro.framework.project`` module provides utitlity to
configure a Kedro project and access its settings."""
# pylint: disable=redefined-outer-name,unused-argument,global-statement
import logging

import sys

import inspect

import importlib
import logging.config
import operator
from collections.abc import MutableMapping
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Callable, List

from dynaconf import LazySettings
from dynaconf.validator import ValidationError, Validator
Expand Down Expand Up @@ -95,11 +101,22 @@ def inner(self, *args, **kwargs):
return inner


def _load_callable_wrapper(func):
"""Wrap a method in _ProjectPipelines so that data is loaded on first access.
Taking inspiration from dynaconf.utils.functional.new_method_proxy
"""
# pylint: disable=protected-access
def inner(self, *args, **kwargs):
self._load_callable()
return func(self, *args, **kwargs)

return inner


class _ProjectPipelines(MutableMapping):
"""A read-only lazy dictionary-like object to hold the project pipelines.
On configure it will store the pipelines module.
On first data access, e.g. through __getitem__, it will load the registered pipelines and merge
them with pipelines defined from hooks.
On first data access, e.g. through __getitem__, it will load the registered pipelines.
"""

def __init__(self) -> None:
Expand Down Expand Up @@ -150,13 +167,94 @@ def configure(self, pipelines_module: Optional[str] = None) -> None:
__str__ = _load_data_wrapper(str)


class _Run(Callable):
def __init__(self) -> None:
self._cli_module: Optional[str] = None
self._is_callable_loaded = False
self._callable: Optional[Callable] = None

@staticmethod
def _get_run_callable(cli_module: str):
from kedro.framework.cli.utils import (
KedroCliError,
load_entry_points,
) ### needs to go in here to avoid circular imports

def _find_run_command_in_plugins(plugins):
for group in plugins:
if "run" in group.commands:
return group.commands["run"]

try:
project_cli = importlib.import_module(cli_module)
# fail gracefully if cli.py does not exist
except ModuleNotFoundError as exc:
if cli_module not in str(exc):
raise
plugins = load_entry_points("project")
run = _find_run_command_in_plugins(plugins) if plugins else None
if run:
# use run command from installed plugin if it exists
return run
# use run command from the framework project
from kedro.framework.cli.project import run

return run
# fail badly if cli.py exists, but has no `cli` in it
if not hasattr(project_cli, "cli"):
raise KedroCliError(f"Cannot load commands from {cli_module}")
return project_cli.run

def _load_callable(self):
if self._cli_module is None or self._is_callable_loaded:
return

self._callable = self._get_run_callable(self._cli_module)
self._is_callable_loaded = True

self.__signature__ = inspect.signature(self._callable.callback)
self.__annotations__ = self._callable.callback.__annotations__
with self._callable.make_context("run", []) as ctx:
self.__doc__ = self._callable.get_help(ctx)

def configure(self, cli_module: Optional[str] = None) -> None:
self._cli_module = cli_module
self._is_callable_loaded = False
self._callable = None
# NEED ALL THREE OF THESE??

@_load_callable_wrapper
def __call__(self, args: Optional[List[str]] = None, **kwargs):
# This is what happens under the hood of click. The click context contains
# a list of arguments (e.g. ["--pipeline", "ds"]) and default values of
# arguments that are not supplied. We forward the context to the
# invocation of run. Any **kwargs supplied (e.g. `pipeline="ds"` will
# overwrite the arguments supplied by the context. Overall this means
# that an instantiation of _Run can handle both arguments from the CLI
# (when invoked from a packaged project through the main entry point) and
# using a Python API.
# NOTE args rather than *args. So do run(["-p", "ds"]), not run("-p", "ds").
args = args or []
logging.info(args)
logging.info(kwargs)
with self._callable.make_context("run", args) as ctx:
return ctx.forward(self._callable, **kwargs)

# Presentation methods
__repr__ = _load_callable_wrapper(repr)
__str__ = _load_callable_wrapper(str)
__doc__ = _load_callable_wrapper(__doc__)


PACKAGE_NAME = None
LOGGING = None

settings = _ProjectSettings()

pipelines = _ProjectPipelines()

run = _Run()


def configure_project(package_name: str):
"""Configure a Kedro project by populating its settings with values
Expand All @@ -168,6 +266,9 @@ def configure_project(package_name: str):
pipelines_module = f"{package_name}.pipeline_registry"
pipelines.configure(pipelines_module)

cli_module = f"{package_name}.cli"
run.configure(cli_module)

# Once the project is successfully configured once, store PACKAGE_NAME as a
# global variable to make it easily accessible. This is used by validate_settings()
# below, and also by ParallelRunner on Windows, as package_name is required every
Expand Down
Original file line number Diff line number Diff line change
@@ -1,47 +1,28 @@
"""{{ cookiecutter.project_name }} file for ensuring the package is executable
as `{{ cookiecutter.repo_name }}` and `python -m {{ cookiecutter.python_package }}`
"""
import importlib
import sys
from pathlib import Path

from kedro.framework.cli.utils import KedroCliError, load_entry_points
from kedro.framework.project import configure_project
from kedro.framework.project import configure_project, run


def _find_run_command(package_name):
try:
project_cli = importlib.import_module(f"{package_name}.cli")
# fail gracefully if cli.py does not exist
except ModuleNotFoundError as exc:
if f"{package_name}.cli" not in str(exc):
raise
plugins = load_entry_points("project")
run = _find_run_command_in_plugins(plugins) if plugins else None
if run:
# use run command from installed plugin if it exists
return run
# use run command from `kedro.framework.cli.project`
from kedro.framework.cli.project import run
# def main(args=None, **kwargs):
# package_name = Path(__file__).parent.name
# configure_project(package_name)
# run(args or sys.argv[1:], **kwargs)
# return 0

return run
# fail badly if cli.py exists, but has no `cli` in it
if not hasattr(project_cli, "cli"):
raise KedroCliError(f"Cannot load commands from {package_name}.cli")
return project_cli.run


def _find_run_command_in_plugins(plugins):
for group in plugins:
if "run" in group.commands:
return group.commands["run"]


def main(*args, **kwargs):
def main(**kwargs):
package_name = Path(__file__).parent.name
configure_project(package_name)
run = _find_run_command(package_name)
run(*args, **kwargs)
if kwargs:
run(**kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is unfinished? as I can see in the other __main__.py the result of session.run is return but here it doesn't return the output

else:
run(sys.argv[1:])
return 0


if __name__ == "__main__":
main()
sys.exit(main())