-
Notifications
You must be signed in to change notification settings - Fork 16.4k
[AIRFLOW-6590] Use batch db operations in jobs #7370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-6590] Use batch db operations in jobs #7370
Conversation
The PR changes numerous single selects / updates in base, scheduler, and backfill jobs to bulk operations.
potiuk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one comment to Exception raised.
38f1feb to
a05147f
Compare
Codecov Report
@@ Coverage Diff @@
## master #7370 +/- ##
=========================================
Coverage ? 86.31%
=========================================
Files ? 873
Lines ? 40769
Branches ? 0
=========================================
Hits ? 35188
Misses ? 5581
Partials ? 0
Continue to review full report at Codecov.
|
|
Nice work - taking a look now. |
|
Please rebase to latest master @nuclearpinguin - we had a six drama on Travis but it's solved now. |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice change!
A couple of style points etc, and one question about the change to the tests.
f831a0e to
fb8500b
Compare
fb8500b to
543a91a
Compare
| # them in the executor. | ||
| simple_task_instances = [SimpleTaskInstance(ti) for ti in | ||
| tis_to_set_to_queued] | ||
| simple_task_instances = [SimpleTaskInstance(ti) for ti in tis_to_set_to_queued] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nuclearpinguin I believe that this should be before session.commit().
Session.commit() should flush the session, and then sqlalchemy will need to requery each TI individually to reconstruct the STI
* [AIRFLOW-6590] Use batch db operations in jobs The PR changes numerous single selects / updates in base, scheduler, and backfill jobs to bulk operations. * fixup! [AIRFLOW-6590] Use batch db operations in jobs * fixup! fixup! [AIRFLOW-6590] Use batch db operations in jobs
The PR changes numerous single selects / updates in base, scheduler, and backfill jobs to bulk operations.
Measure
To measure the number of SQL queries I required two things:
sqlalchemy_statsindebugsection of Airflow config..csvwith all queries per job. The script is located inscripts/perf/sql_queries.pyand it works out of the box in the breeze environment.I am using
SequentialExecutorandscripts/perf/dags/sql_perf_dag.pyand local Postgres.Result
On the average, the number of queries was reduced by ~14% and time fo queries by ~10.9%.
Thanks for helping to @evgenyshulman from Databand!
Issue link: AIRFLOW-6590
Make sure to mark the boxes below before creating PR: [x]
[AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID** For document-only changes commit message can start with
[AIRFLOW-XXXX].In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.