Skip to content

Commit

Permalink
[AIRFLOW-6004] Untangle Executors class to avoid cyclic imports (#6596)
Browse files Browse the repository at this point in the history
There are cyclic imports detected seemingly randomly by pylint checks when some
    of the PRs are run in CI

    It was not deterministic because pylint usually uses as many processors as
    many are available and it splits the list of .py files between the separate
    pylint processors - depending on how the split is done, pylint check might
    or might not detect it. The cycle is always detected when all files are used.

    In order to make it more deterministic, all pylint and mypy errors were resolved
    in all executors package and in dag_processor.

    At the same time plugins_manager had also been moved out of the executors
    and all of the operators/hooks/sensors/macros because it was also causing
    cyclic dependencies and it's far easier to untangle those dependencies
    in executor when we move the intialisation of all plugins to plugins_manager.

    Additionally require_serial is set in pre-commit configuration to
    make sure cycle detection is deterministic.
  • Loading branch information
potiuk authored Dec 3, 2019
1 parent 9fed459 commit a36cfe0
Show file tree
Hide file tree
Showing 65 changed files with 1,221 additions and 934 deletions.
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,14 @@ repos:
files: \.py$
exclude: ^tests/.*\.py$|^airflow/_vendor/.*$
pass_filenames: true
- id: pylint
require_serial: true # Pylint tests should be run in one chunk to detect all cycles
- id: pylint-tests
name: Run pylint for tests
language: system
entry: "./scripts/ci/pre_commit_pylint_tests.sh"
files: ^tests/.*\.py$
pass_filenames: true
require_serial: false # For tests, it's perfectly OK to run pylint in parallel
- id: flake8
name: Run flake8
language: system
Expand Down
4 changes: 2 additions & 2 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ This is the current syntax for `./breeze <./breeze>`_:
-S, --static-check <STATIC_CHECK>
Run selected static checks for currently changed files. You should specify static check that
you would like to run or 'all' to run all checks. One of
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint setup-order shellcheck].
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint pylint-test setup-order shellcheck].
You can pass extra arguments including options to to the pre-commit framework as
<EXTRA_ARGS> passed after --. For example:
Expand All @@ -886,7 +886,7 @@ This is the current syntax for `./breeze <./breeze>`_:
-F, --static-check-all-files <STATIC_CHECK>
Run selected static checks for all applicable files. You should specify static check that
you would like to run or 'all' to run all checks. One of
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint setup-order shellcheck].
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint pylint-test setup-order shellcheck].
You can pass extra arguments including options to the pre-commit framework as
<EXTRA_ARGS> passed after --. For example:
Expand Down
4 changes: 3 additions & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ image built locally):
----------------------------------- ---------------------------------------------------------------- ------------
``pydevd`` Check for accidentally commited pydevd statements.
----------------------------------- ---------------------------------------------------------------- ------------
``pylint`` Runs pylint. *
``pylint`` Runs pylint for main code. *
----------------------------------- ---------------------------------------------------------------- ------------
``pylint-tests`` Runs pylint for tests. *
----------------------------------- ---------------------------------------------------------------- ------------
``python-no-log-warn`` Checks if there are no deprecate log warn.
----------------------------------- ---------------------------------------------------------------- ------------
Expand Down
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ assists users migrating to a new version.

## Airflow Master

### Removal of airflow.AirflowMacroPlugin class

The class was there in airflow package but it has not been used (apparently since 2015).
It has been removed.

### Changes to settings

CONTEXT_MANAGER_DAG was removed from settings. It's role has been taken by `DagContext` in
Expand Down
21 changes: 3 additions & 18 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,8 @@

settings.initialize()

login = None # type: Optional[Callable]
from airflow.plugins_manager import integrate_plugins

from airflow import executors
from airflow import hooks
from airflow import macros
from airflow import operators
from airflow import sensors
login: Optional[Callable] = None


class AirflowMacroPlugin:
# pylint: disable=missing-docstring
def __init__(self, namespace):
self.namespace = namespace


operators._integrate_plugins() # pylint: disable=protected-access
sensors._integrate_plugins() # pylint: disable=protected-access
hooks._integrate_plugins() # pylint: disable=protected-access
executors._integrate_plugins() # pylint: disable=protected-access
macros._integrate_plugins() # pylint: disable=protected-access
integrate_plugins()
2 changes: 1 addition & 1 deletion airflow/cli/commands/flower_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import daemon
from daemon.pidfile import TimeoutPIDLockFile

from airflow import conf
from airflow.configuration import conf
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, sigint_handler

Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/serve_logs_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""Serve logs command"""
import os

from airflow import conf
from airflow.configuration import conf
from airflow.utils import cli as cli_utils


Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from contextlib import redirect_stderr, redirect_stdout

from airflow import DAG, AirflowException, conf, jobs, settings
from airflow.executors import get_default_executor
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import DagPickle, TaskInstance
from airflow.ti_deps.dep_context import SCHEDULER_QUEUED_DEPS, DepContext
from airflow.utils import cli as cli_utils, db
Expand Down Expand Up @@ -69,7 +69,7 @@ def _run(args, dag, ti):
print(e)
raise e

executor = get_default_executor()
executor = ExecutorLoader.get_default_executor()
executor.start()
print("Sending to executor.")
executor.queue_task_instance(
Expand Down
3 changes: 2 additions & 1 deletion airflow/cli/commands/worker_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import daemon
from daemon.pidfile import TimeoutPIDLockFile

from airflow import conf, settings
from airflow import settings
from airflow.configuration import conf
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler

Expand Down
84 changes: 1 addition & 83 deletions airflow/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -16,84 +14,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# pylint: disable=missing-docstring

import sys
from typing import Optional

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.utils.log.logging_mixin import LoggingMixin

DEFAULT_EXECUTOR = None # type: Optional[BaseExecutor]


def _integrate_plugins():
"""Integrate plugins to the context."""
from airflow.plugins_manager import executors_modules
for executors_module in executors_modules:
sys.modules[executors_module.__name__] = executors_module
globals()[executors_module._name] = executors_module # pylint: disable=protected-access


def get_default_executor():
"""Creates a new instance of the configured executor if none exists and returns it"""
global DEFAULT_EXECUTOR # pylint: disable=global-statement

if DEFAULT_EXECUTOR is not None:
return DEFAULT_EXECUTOR

executor_name = conf.get('core', 'EXECUTOR')

DEFAULT_EXECUTOR = _get_executor(executor_name)

log = LoggingMixin().log
log.info("Using executor %s", executor_name)

return DEFAULT_EXECUTOR


class Executors:
LocalExecutor = "LocalExecutor"
SequentialExecutor = "SequentialExecutor"
CeleryExecutor = "CeleryExecutor"
DaskExecutor = "DaskExecutor"
KubernetesExecutor = "KubernetesExecutor"


def _get_executor(executor_name):
"""
Creates a new instance of the named executor.
In case the executor name is not know in airflow,
look for it in the plugins
"""
if executor_name == Executors.LocalExecutor:
return LocalExecutor()
elif executor_name == Executors.SequentialExecutor:
return SequentialExecutor()
elif executor_name == Executors.CeleryExecutor:
from airflow.executors.celery_executor import CeleryExecutor
return CeleryExecutor()
elif executor_name == Executors.DaskExecutor:
from airflow.executors.dask_executor import DaskExecutor
return DaskExecutor()
elif executor_name == Executors.KubernetesExecutor:
from airflow.executors.kubernetes_executor import KubernetesExecutor
return KubernetesExecutor()
else:
# Loading plugins
_integrate_plugins()
executor_path = executor_name.split('.')
if len(executor_path) != 2:
raise AirflowException(
"Executor {0} not supported: "
"please specify in format plugin_module.executor".format(executor_name))

if executor_path[0] in globals():
return globals()[executor_path[0]].__dict__[executor_path[1]]()
else:
raise AirflowException("Executor {0} not supported.".format(executor_name))
"""Executors."""
Loading

0 comments on commit a36cfe0

Please sign in to comment.