Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,6 @@ def _per_task_process(key, ti: TaskInstance, session):
_per_task_process(key, ti, session)
try:
session.commit()
# break the retry loop
break
Comment on lines -659 to -660
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal opinion: nothing bad with break in try block, try ... else do not add viable benefits here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try block should contain only code that is expected to throw an exception.

Copy link
Member

@potiuk potiuk Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started to like more the try/except/else. It does have a nice property where it clerly separates the "exception handling" logic with "no exception" logic. In this case it does not add much value, but I think there is an educational value. I think one reason we do not use it is that it's somewhat under-used (even if pretty old) Python language construct and by using it in more places in Airflow we might get people learn more about it and use more of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We step on the field: tabs vs spaces and " vs '.

According to the documentation for python

The try … except statement has an optional else clause, which, when present, must follow all except clauses. 
It is useful for code that must be executed if the try clause does not raise an exception.

Personally I do not know the tool which automatically replace it, so I can't see how we can't prevent new code without else clause appear in codebase. UnboundLocalError should in theory prevented by ruff or mypy anyway

Copy link
Member

@potiuk potiuk Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tihink we can't enforce it, but we can at least have a lot examples in our code that people learn to use it.

But yes. this is a bit of "tabs vs. spaces". Maybe worth discussing it at the devlist what we prefer if this is controversial and get to lazy consensus or even vote, This has been done multiple times in the past for various conventions we use. We even enforced some of those. And as everything here - as long as we agree on something as a convention, the "disagree but engage" rule will kick in and we will all follow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so @eumiro - maybe that's a good point to start a discussion on devlist where you would explain and ask people for opinions (just be prepared for a bit of flamewar)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all I'm not against rule "use else in try..except", we just need to use in places where it actually need, rather than always. IMO this is informal rule, in category "might/might not" and not in "should/should not" and definitely not in "must".

I'm sure that we have a rule readability first and in previous version it very easy to see what is going on:

  1. Try to commit
  2. If success exit from loop
  3. If OperationError happen ...

In new version

  1. Try to commit
  2. If OperationError happen ...
  3. (in previous series of try..except tv series 🤣 ) if no exception happen then exit from loop

Someone could also appeal to the fact that else in try...except would cost additional ... couple nanoseconds (OMG in Application which communicate with DB right after commit 🤣 ), and we could easily start to play code golf and optimise in place where it not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we just started tabs vs spaces and " vs '

I've also have another topic for "Python Holy War": "use := since we have Python 3.8 or try to avoid this operator"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D

except OperationalError:
self.log.error(
"Failed to commit task state due to operational error. "
Expand All @@ -669,6 +667,9 @@ def _per_task_process(key, ti: TaskInstance, session):
if i == max_attempts - 1:
raise
# retry the loop
else:
# break the retry loop
break
except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
self.log.debug(e)

Expand Down Expand Up @@ -815,11 +816,10 @@ def _execute_dagruns(
for dagrun_info in dagrun_infos:
for dag in self._get_dag_with_subdags():
dag_run = self._get_dag_run(dagrun_info, dag, session=session)
if dag_run is None:
continue
tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session)
ti_status.active_runs.append(dag_run)
ti_status.to_run.update(tis_map or {})
if dag_run is not None:
tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session)
ti_status.active_runs.append(dag_run)
ti_status.to_run.update(tis_map or {})

processed_dag_run_dates = self._process_backfill_task_instances(
ti_status=ti_status,
Expand Down
23 changes: 10 additions & 13 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,13 +897,11 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION)
)
for dag_run in paused_runs:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if dag is None:
continue

dag_run.dag = dag
_, callback_to_run = dag_run.update_state(execute_callbacks=False, session=session)
if callback_to_run:
self._send_dag_callbacks_to_processor(dag, callback_to_run)
if dag is not None:
dag_run.dag = dag
_, callback_to_run = dag_run.update_state(execute_callbacks=False, session=session)
if callback_to_run:
self._send_dag_callbacks_to_processor(dag, callback_to_run)
except Exception as e: # should not fail the scheduler
self.log.exception("Failed to update dag run state for paused dags due to %s", e)

Expand Down Expand Up @@ -1073,13 +1071,12 @@ def _do_scheduling(self, session: Session) -> int:
)
for dag_run, callback_to_run in callback_tuples:
dag = cached_get_dag(dag_run.dag_id)

if not dag:
if dag:
# Sending callbacks there as in standalone_dag_processor they are adding to the database,
# so it must be done outside of prohibit_commit.
self._send_dag_callbacks_to_processor(dag, callback_to_run)
else:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
# Sending callbacks there as in standalone_dag_processor they are adding to the database,
# so it must be done outside of prohibit_commit.
self._send_dag_callbacks_to_processor(dag, callback_to_run)

with prohibit_commit(session) as guard:
# Without this, the session has an invalid view of the DB
Expand Down