-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-6004] Untangle Executors class to avoid cyclic imports #6596
[AIRFLOW-6004] Untangle Executors class to avoid cyclic imports #6596
Conversation
27a579b
to
431be4e
Compare
61fd317
to
cbca48d
Compare
cbca48d
to
13b87f7
Compare
51b6db0
to
b981f63
Compare
d566ee7
to
3961615
Compare
I fixed few of those in the fixup that's coming but I left the gist of it for a follow-up commit where I implemented also validation in pre-commit. |
3f755dc
to
c5a8adf
Compare
I think NOW I resolved everything that needs to be resolved in this commit (some more commits are following), I let Travis CI to take a look now. I also made the commit message much shorter - we do not need the full context there, I left more of it in JIRA. @ashb -> let me know if you think it's OK. |
6a2fd29
to
29d4061
Compare
@@ -66,20 +63,17 @@ def execute_async(self, | |||
command: CommandType, | |||
queue: Optional[str] = None, | |||
executor_config: Optional[Any] = None) -> None: | |||
if not self.futures: | |||
raise AirflowException("Executor should be started first.") | |||
assert self.futures, NOT_STARTED_MESSAGE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably leave the if
conditions instead of asserts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am happy to discuss it and see other's opinion there. I was myself doubtful about asserts but they are really nicer and in many cases they simply make sense (see above) my comments. And if we discourage asserts we should remove them everywhere (they were used in other parts of the code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good case for asserts.
Under normal circumstances they should pass, and it's only if you're developing really that you might get this wrong, so having this be asserts that could be disabled by python -O
seems like the right thing to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather check our options first. How we may avoid assertions.
-
If we need a oneliner:
if not self.futures: raise Error(NOT_STARTED_MESSAGE)
. It is not very elegant but easy to understand. -
Another option to consider is writing a single checker and call it wether we need it:
def _check_started(self): if None in (self.client, self.futures): raise AirflowError(NOT_STARTED_MESSAGE)
-
Or go more pythonic and introduce decorator.
def check_futures(fn): def wrapped(self): if not self.futures: raise AirflowError(NOT_STARTED_MESSAGE)``` return wrapped @check_futures def execute_async() <...>
-
If we expect that some methods need some time to wait for executor start, tenacity could be useful:
from tenacity import retry @retry(stop=stop_after_attempt(3)) def execute_async() try: <...> except Exception: raise AirflowError(NOT_STARTED_MESSAGE)
My vote goes to decorator solution -- either 3 or 4. Anyway, I think assertions could be occasionally useful for internal checks only. Like you checking all good and raise your exception with meaningful message to end user. Otherwise it looks like a dirty hack. Just a simple AssertionError would confuse average pythonist as developer expects (and official docs clearly states that) assertion usage in test environment only. Confusion harm would outweight readability benefit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
solution no.3 is not mypy compatibilie. It is not too smart to understand this. Most often, we want to have assertions to make pylint happy.
airflow/utils/dag_processing.py
Outdated
self._max_runs: int = max_runs | ||
self._processor_factory: Callable[[str, List[Any]], AbstractDagFileProcessor] = processor_factory | ||
self._signal_conn: Connection = signal_conn | ||
self._async_mode: bool = async_mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style point that w should decide upon: In cases like this where th function arg is typed we don't also need to specify the type on the instance attribute - mypy will do that automatically.
So this could be written as:
def __init__(self:
dag_directory: str,
file_paths: List[str],
max_runs: int,
processor_factory: Callable[[str, List[Any]], AbstractDagFileProcessor],
processor_timeout: timedelta,
signal_conn: Connection,
async_mode: bool = True):
self._file_paths file_paths
self._file_path_queue: List[str] = []
self._dag_directory = dag_directory
self._max_runs = max_runs
self._processor_factory = processor_factory
self._signal_conn = signal_conn
self._async_mode = async_mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I can fix it here. And I agree it's not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-emptively approving once the three changes (UPDATING, remove two instances of load_test_config
) are done.
ae153e0
to
5c31496
Compare
Rebased with latest master and pushed all changes. |
5c31496
to
3ddc9fb
Compare
And @ashb -> the "UPDATING.md" I think there was no warning as it was not possible to detect this way of importing. So I will leave it in UPDATING just in case someone has a problem with this way of import not working. |
There are cyclic imports detected seemingly randomly by pylint checks when some of the PRs are run in CI It was not deterministic because pylint usually uses as many processors as many are available and it splits the list of .py files between the separate pylint processors - depending on how the split is done, pylint check might or might not detect it. The cycle is always detected when all files are used. In order to make it more deterministic, all pylint and mypy errors were resolved in all executors package and in dag_processor. At the same time plugins_manager had also been moved out of the executors and all of the operators/hooks/sensors/macros because it was also causing cyclic dependencies and it's far easier to untangle those dependencies in executor when we move the intialisation of all plugins to plugins_manager. Additionally require_serial is set in pre-commit configuration to make sure cycle detection is deterministic.
3ddc9fb
to
0904b51
Compare
…he#6596) There are cyclic imports detected seemingly randomly by pylint checks when some of the PRs are run in CI It was not deterministic because pylint usually uses as many processors as many are available and it splits the list of .py files between the separate pylint processors - depending on how the split is done, pylint check might or might not detect it. The cycle is always detected when all files are used. In order to make it more deterministic, all pylint and mypy errors were resolved in all executors package and in dag_processor. At the same time plugins_manager had also been moved out of the executors and all of the operators/hooks/sensors/macros because it was also causing cyclic dependencies and it's far easier to untangle those dependencies in executor when we move the intialisation of all plugins to plugins_manager. Additionally require_serial is set in pre-commit configuration to make sure cycle detection is deterministic.
There are cyclic imports detected seemingly randomly by pylint checks when some
of the PRs are run in CI:
************* Module airflow.utils.log.json_formatter
airflow/utils/log/json_formatter.py:1:0: R0401: Cyclic import
(airflow.executors -> airflow.executors.kubernetes_executor ->
airflow.kubernetes.pod_generator) (cyclic-import)
airflow/utils/log/json_formatter.py:1:0: R0401: Cyclic import (airflow ->
airflow.executors -> airflow.executors.kubernetes_executor ->
airflow.kubernetes.pod_launcher) (cyclic-import)
airflow/utils/log/json_formatter.py:1:0: R0401: Cyclic import
(airflow.executors -> airflow.executors.kubernetes_executor ->
airflow.kubernetes.worker_configuration -> airflow.kubernetes.pod_generator)
(cyclic-import)
The problem is that airflow's init contains a few convenience imports
(AirflowException, Executors etc.) but it also imports a number of packages
(for example kubernetes_executor) that in turn import the airflow package
objects - for example airflow.Executor. This leads to cyclic imports if you
import first the executors before airflow. Similar problem happens with
executor.init.py containing class "Executors" imported by all executors but
at the same time some of the executors (for example KubernetesExecutor) import
the very same Executor class.
This might happen in pylint checks in pre-commit because they split a number of
files they process between the multiple threads you have at your machine and
sometimes it might happen that the files are imported in different order.
As a solution, the executors "list" should be moved to a separate module.
Also the name of constants was changed to not to be confused with class
names and Executors class was renamed to AvailableExecutors.
Make sure you have checked all steps below.
Jira
Description
Tests
Commits
Documentation