Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create guide for Machine Learning Engine operators #8207 #8968

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
155 commits
Select commit Hold shift + click to select a range
a28c66f
[AIRFLOW-4734] Upsert functionality for PostgresHook.insert_rows() (#…
oxymor0n Apr 30, 2020
4a1d71d
Fix the process of requirements generations (#8648)
potiuk Apr 30, 2020
b185b36
Reduce response payload size of /dag_stats and /task_stats (#8633)
XD-DENG Apr 30, 2020
4421f01
Improve template capabilities of EMR job and step operators (#8572)
oripwk May 1, 2020
6560f29
Enhanced documentation around Cluster Policy (#8661)
vardancse May 1, 2020
511d98e
[AIRFLOW-4363] Fix JSON encoding error (#8287)
retornam May 1, 2020
ce50538
Add check for pre-2.0 style hostname_callable config value (#8637)
dimberman May 1, 2020
0a7b500
Fix displaying Executor Class Name in "Base Job" table (#8679)
kaxil May 2, 2020
d92e848
Persist start/end date and duration for DummyOperator Task Instance (…
XD-DENG May 2, 2020
0954140
Ensure "started"/"ended" in tooltips are not shown if job not started…
XD-DENG May 2, 2020
19ac45a
Add support for fetching logs from running pods (#8626)
msumit May 3, 2020
1100cea
Remove _get_pretty_exception_message in PrestoHook
fusuiyi123 May 3, 2020
62796b9
Improve tutorial - Include all imports statements (#8670)
Lyalpha May 3, 2020
dd6a7bc
Group Google services in one section (#8623)
mik-laj May 3, 2020
ac59735
Refactor test_variable_command.py (#8535)
May 3, 2020
bc45fa6
Add system test and docs for Facebook Ads operators (#8503)
randr97 May 3, 2020
0b598a2
Fix connection add/edit for spark (#8685)
XD-DENG May 3, 2020
ffbbbfc
Sort connection type list in add/edit page alphabetically (#8692)
XD-DENG May 3, 2020
d8cb0b5
Support k8s auth method in Vault Secrets provider (#8640)
May 3, 2020
67caae0
Add system test for gcs_to_bigquery (#8556)
joppevos May 4, 2020
aec768b
[AIRFLOW-7008] Add perf kit with common used decorators/contexts (#7650)
mik-laj May 4, 2020
c3a46b9
Invalid output in test_variable assertion (#8698)
mik-laj May 4, 2020
5ddc458
Change provider:GCP to provider:Google for Labeler Bot (#8697)
mik-laj May 4, 2020
caa60b1
Remove config side effects from tests (#8607)
turbaszek May 4, 2020
923f423
Check consistency between the reference list and howto directory (#8690)
mik-laj May 4, 2020
b31ad51
Prevent clickable sorting on non sortable columns in TI view (#8681)
Acehaidrey May 4, 2020
6600e47
Import Connection directly from multiprocessing.connection. (#8711)
jhtimmins May 4, 2020
2c92a29
Fix typo in Google Display & Video 360 guide
michalslowikowski00 May 5, 2020
41b4c27
Carefully parse warning messages when building documentation (#8693)
mik-laj May 5, 2020
8d6f1aa
Support num_retries field in env var for GCP connection (#8700)
mik-laj May 5, 2020
c717d12
Add __repr__ for DagTag so tags display properly in /dagmodel/show (#…
XD-DENG May 5, 2020
487b5cc
Add guide for Apache Spark operators (#8305)
May 5, 2020
520aeed
Fix pickling failure when spawning processes (#8671)
jhtimmins May 6, 2020
25ee421
Support all RuntimeEnvironment parameters in DataflowTemplatedJobStar…
mik-laj May 6, 2020
d923b5b
Add jinja template test for AirflowVersion (#8505)
mik-laj May 6, 2020
e673413
Avoid loading executors in jobs (#7888)
mik-laj May 6, 2020
3437bea
Optimize count query on /home (#8729)
mik-laj May 6, 2020
336aa27
Correctly deserialize dagrun_timeout field on DAGs (#8735)
ashb May 6, 2020
2e9ef45
Stop Stalebot on Github issues (#8738)
kaxil May 6, 2020
fd6e057
Make loading plugins from entrypoint fault-tolerant (#8732)
kaxil May 6, 2020
bd29ee3
Ensure test_logging_config.test_reload_module works in spawn mode. (#…
jhtimmins May 6, 2020
d15839d
Latest debian-buster release broke image build (#8758)
potiuk May 7, 2020
ff5b701
Add google_api_to_s3_transfer example dags and system tests (#8581)
feluelle May 7, 2020
7c04604
Add google_api_to_s3_transfer docs howto link (#8761)
feluelle May 7, 2020
723c52c
Add documentation for SpannerDeployInstanceOperator (#8750)
ephraimbuddy May 7, 2020
6e4f5fa
[AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise …
lokeshlal May 7, 2020
b7566e1
Add SQL query tracking for pytest (#8754)
mik-laj May 8, 2020
58aefb2
Added SDFtoGCSOperator (#8740)
michalslowikowski00 May 8, 2020
b37ce29
Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)
vshshjn7 May 8, 2020
2bd3e76
Support same short flags for `create user` as 1.10 did for `user_crea…
ashb May 8, 2020
09770e4
Add WorldRemit as Airflow user (#8786)
May 8, 2020
a091c1f
fix typing errors reported by dmypy (#8773)
May 8, 2020
42c5975
Update example SingularityOperator DAG (#8790)
ashb May 8, 2020
791d1a7
Backport packages are renamed to include backport in their name (#8767)
potiuk May 9, 2020
100f530
Fixed test-target command (#8795)
potiuk May 9, 2020
db1b51d
Make celery worker_prefetch_multiplier configurable (#8695)
nadflinn May 9, 2020
bc19778
[AIP-31] Implement XComArg to pass output from one operator to the ne…
jonathanshir May 9, 2020
7506c73
Add default `conf` parameter to Spark JDBC Hook (#8787)
May 9, 2020
5e1c33a
Fix docs on creating CustomOperator (#8678)
JonnyWaffles May 10, 2020
21cc7d7
Document default timeout value for SSHOperator (#8744)
abhilash1in May 10, 2020
cd635dd
[AIRFLOW-5906] Add authenticator parameter to snowflake_hook (#8642)
koszti May 10, 2020
c7788a6
Add imap_attachment_to_s3 example dag and system test (#8669)
feluelle May 10, 2020
a715aa6
Correctly store non-default Nones in serialized tasks/dags (#8772)
ashb May 10, 2020
280f1f0
Correctly restore upstream_task_ids when deserializing Operators (#8775)
ashb May 10, 2020
cbebed2
Allow passing backend_kwargs to AWS SSM client (#8802)
kaxil May 10, 2020
79ef8be
Added Upload Multiple Entity Read Files to specified big query datase…
michalslowikowski00 May 10, 2020
e1cc17e
Remove old airflow logger causing side effects in tests (#8746)
kaxil May 10, 2020
9bb91ef
Add comments to breeze scripts (#8797)
potiuk May 10, 2020
493b685
Add separate example DAGs and system tests for google cloud speech (#…
ephraimbuddy May 10, 2020
bed1995
Avoid color info in response of /dag_stats & /task_stats (#8742)
XD-DENG May 11, 2020
b59adab
Support cron presets in date_range function (#7777)
Rcharriol May 11, 2020
5f3774a
[AIRFLOW-6921] Fetch celery states in bulk (#7542)
mik-laj May 11, 2020
d5c4001
Useful help information in test-target and docker-compose commands (#…
potiuk May 11, 2020
a6434a5
Fix bash command in performance test dag (#8812)
ashb May 11, 2020
0c3db84
[AIRFLOW-7068] Create EC2 Hook, Operator and Sensor (#7731)
mustafagok May 11, 2020
5ae76d8
Option to set end_date for performance testing dag. (#8817)
ashb May 11, 2020
2ec0130
[AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream (#7…
teddyhartanto May 11, 2020
1fb9f07
Synchronize extras between airflow and providers (#8819)
potiuk May 11, 2020
d590e5e
Add option to propagate tags in ECSOperator (#8811)
JPonte May 11, 2020
f410d64
Use fork when test relies on mock.patch in parent process. (#8794)
jhtimmins May 11, 2020
3ad4f96
[AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an …
kaxil May 11, 2020
4375607
Fix typo. 'zobmies' => 'zombies'. (#8832)
jhtimmins May 12, 2020
78a48db
Add support for non-default orientation in `dag show` command (#8834)
klsnreddy May 12, 2020
7533378
Access function to be pickled as attribute, not method, to avoid erro…
jhtimmins May 12, 2020
1d12c34
Refactor BigQuery check operators (#8813)
turbaszek May 12, 2020
4b06fde
Fix Flake8 errors (#8841)
kaxil May 12, 2020
6911dfe
Fix template fields in Google operators (#8840)
turbaszek May 12, 2020
01db738
Azure storage 0.37.0 is not installable any more (#8833)
potiuk May 12, 2020
578fc51
[AIRFLOW-4543] Update slack operator to support slackclient v2 (#5519)
serkef May 12, 2020
7236862
[AIRFLOW-2310] Enable AWS Glue Job Integration (#6007)
abdulbasitds May 12, 2020
8b54919
Refactor BigQuery hook methods to use python library (#8631)
turbaszek May 12, 2020
7d69987
Remove duplicate code from perf_kit (#8843)
kaxil May 12, 2020
e1e833b
Update GoogleBaseHook to not follow 308 and use 60s timeout (#8816)
waiyan1612 May 13, 2020
8a94d18
Fix Environment Variable in perf/scheduler_dag_execution_timing.py (#…
kaxil May 13, 2020
ed3f513
Correctly pass sleep time from AWSAthenaOperator down to the hook. (#…
ashb May 13, 2020
f1dc2e0
The librabbitmq library stopped installing for python3.7 (#8853)
potiuk May 13, 2020
c3af681
Convert tests/jobs/test_base_job.py to pytest (#8856)
ashb May 13, 2020
81fb9d6
Add metric for monitoring email notification failures (#8771)
May 13, 2020
2878f17
Relax Flask-Appbuilder version to ~=2.3.4 (#8857)
feluelle May 13, 2020
e61b9bb
Add AWS EMR System tests (#8618)
xinbinhuang May 13, 2020
fc862a3
Do not create a separate process for one task in CeleryExecutor (#8855)
mik-laj May 14, 2020
961c710
Make Custom XCom backend a subsection of XCom docs (#8869)
turbaszek May 14, 2020
fe42191
Don't use ProcessorAgent to test ProcessorManager (#8871)
ashb May 14, 2020
4813b94
Create log file w/abs path so tests pass on MacOS (#8820)
jhtimmins May 14, 2020
35c523f
Fix list formatting of plugins doc. (#8873)
ashb May 15, 2020
85bbab2
Add EMR operators howto docs (#8863)
xinbinhuang May 15, 2020
f82ad45
Fix KubernetesPodOperator pod name length validation (#8829)
dsaiztc May 15, 2020
92585ca
Added automated release notes generation for backport operators (#8807)
potiuk May 15, 2020
82de6f7
Spend less time waiting for DagFileProcessor processes to complete (#…
ashb May 15, 2020
a3a4bac
JIRA and Github issues explanation (#8539)
mschickensoup May 16, 2020
f4edd90
Speed up TestAwsLambdaHook by not actually running a function (#8882)
ashb May 16, 2020
15273f0
Check for same task instead of Equality to detect Duplicate Tasks (#8…
kaxil May 16, 2020
a3a3411
Fix master failing on generating requirements (#8885)
potiuk May 16, 2020
f3521fb
Regenerate readme files for backport package release (#8886)
potiuk May 16, 2020
f6d5917
Updated docs for experimental API /dags/<DAG_ID>/dag_runs (#8800)
randr97 May 16, 2020
707bb0c
[AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7…
jstern May 16, 2020
a546a10
Add Snowflake system test (#8422)
dhuang May 16, 2020
8985df0
Monitor pods by labels instead of names (#6377)
dimberman May 16, 2020
ff342fc
Added SalesforceHook missing method to return only dataframe (#8565) …
pranjalmittal May 17, 2020
12c5e5d
Prepare release candidate for backport packages (#8891)
potiuk May 17, 2020
2121f49
Avoid failure on transient requirements in CI image (#8892)
potiuk May 17, 2020
841d816
Allow setting the pooling time in DLPHook (#8824)
xuan616 May 19, 2020
dd57ec9
Fix task and dag stats on home page (#8865)
May 19, 2020
375d1ca
Release candidate 2 for backport packages 2020.05.20 (#8898)
potiuk May 19, 2020
bae5cc2
Fix race in Celery tests by pre-creating result tables (#8909)
ashb May 19, 2020
499493c
[AIRFLOW-6586] Improvements to gcs sensor (#7197)
May 19, 2020
ce7fdea
UX Fix: Prevent undesired text selection with DAG title selection in …
ryanahamilton May 19, 2020
fef00e5
Use Debian's provided JRE from Buster (#8919)
ashb May 20, 2020
5360045
Fix incorrect Env Var to stop Scheduler from creating DagRuns (#8920)
kaxil May 20, 2020
51d9557
Re-run all tests when Dockerfile or Github worflow change (#8924)
ashb May 20, 2020
c6224e2
Remove unused self.max_threads argument in SchedulerJob (#8935)
kaxil May 21, 2020
12c22e0
Added Greytip to Airflow Users list (#8887)
Sarankrishna May 21, 2020
8476c1e
Hive/Hadoop minicluster needs JDK8 and JAVA_HOME to work (#8938)
ashb May 21, 2020
f17b4bb
Fix DagRun Prefix for Performance script (#8934)
kaxil May 21, 2020
a9dfd7d
Remove side-effect of session in FAB (#8940)
mik-laj May 21, 2020
f3f74c7
Add TaskInstance state to TI Tooltip to be colour-blind friendlier (#…
harrisjoseph May 21, 2020
8d3acd7
Fix docstring in DagFileProcessor._schedule_task_instances (#8948)
kaxil May 21, 2020
47413d9
Remove singularity from CI images (#8945)
ashb May 21, 2020
16206cd
Update example webserver_config.py to show correct CSRF config (#8944)
ashb May 21, 2020
97b6cc7
Add note in Updating.md about the removel of DagRun.ID_PREFIX (#8949)
kaxil May 21, 2020
41481bb
Python base images are stored in cache (#8943)
potiuk May 21, 2020
b26b3ca
Don't hard-code constants in scheduler_dag_execution_timing (#8950)
ashb May 21, 2020
113982b
Make scheduler_dag_execution_timing grok dynamic start date of elasti…
ashb May 21, 2020
90a07d8
Cache 1 10 ci images (#8955)
dimberman May 21, 2020
dd72040
Pin Version of Azure Cosmos to <4 (#8956)
kaxil May 21, 2020
94a7673
Pin google-cloud-datacatalog to <0.8 (#8957)
kaxil May 21, 2020
9a4a2d1
[AIRFLOW-5262] Update timeout exception to include dag (#8466)
curiousjazz77 May 22, 2020
b055151
Add context to execution_date_fn in ExternalTaskSensor (#8702)
Acehaidrey May 22, 2020
f107338
Add support for spark python and submit tasks in Databricks operator(…
siddartha-ravichandran May 22, 2020
e742ef7
Fix typo in test_project_structure (#8978)
mik-laj May 23, 2020
4d67704
Remove duplicate line from CONTRIBUTING.rst (#8981)
kaxil May 23, 2020
db70da2
Flush pending Sentry exceptions before exiting (#7232)
mikeclarke May 23, 2020
cf5cf45
Support YAML input for CloudBuildCreateOperator (#8808)
joppevos May 23, 2020
bdb8369
Add secrets to test_deprecated_packages (#8979)
mik-laj May 23, 2020
f3456b1
Fix formatting code block in TESTING.rst (#8985)
ad-m May 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133
)



* use preferred boolean check idiom

Co-Authored-By: Jarek Potiuk <jarek@potiuk.com>

* add test coverage for AirflowFailException

* add docs for some exception usage patterns

* autoformatting

* remove extraneous newline, poke travis build

* clean up TaskInstance.handle_failure

Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.

* let sphinx generate docs for exceptions module

* keep session kwarg last in handle_failure

* explain allowed_top_level

* add black-box tests for retry/fail immediately cases

* don't lose safety measures in logging date attrs

* fix flake8 too few blank lines

* grammar nitpick

* add import to AirflowFailException example

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
  • Loading branch information
jstern and potiuk authored May 16, 2020
commit 707bb0c725fbc32929eea162993aa8fb9854fa9a
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class AirflowSkipException(AirflowException):
"""Raise when the task should be skipped"""


class AirflowFailException(AirflowException):
"""Raise when the task should be failed without retrying"""


class AirflowDagCycleException(AirflowException):
"""Raise when there is a cycle in Dag definition"""

Expand Down
112 changes: 55 additions & 57 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException, AirflowRescheduleException, AirflowSkipException, AirflowTaskTimeout,
AirflowException, AirflowFailException, AirflowRescheduleException, AirflowSkipException,
AirflowTaskTimeout,
)
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.models.log import Log
Expand Down Expand Up @@ -1067,6 +1068,10 @@ def signal_handler(signum, frame):
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context)
return
except AirflowFailException as e:
self.refresh_from_db()
self.handle_failure(e, test_mode, context, force_fail=True)
raise
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success/failed externally
Expand Down Expand Up @@ -1177,7 +1182,7 @@ def _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')

@provide_session
def handle_failure(self, error, test_mode=None, context=None, session=None):
def handle_failure(self, error, test_mode=None, context=None, force_fail=False, session=None):
if test_mode is None:
test_mode = self.test_mode
if context is None:
Expand All @@ -1198,64 +1203,51 @@ def handle_failure(self, error, test_mode=None, context=None, session=None):
if context is not None:
context['exception'] = error

# Let's go deeper
try:
# Since this function is called only when the TaskInstance state is running,
# try_number contains the current try_number (not the next). We
# only mark task instance as FAILED if the next task instance
# try_number exceeds the max_tries.
if self.is_eligible_to_retry():
self.state = State.UP_FOR_RETRY
self.log.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
self.email_alert(error)
# Set state correctly and figure out how to log it,
# what callback to call if any, and how to decide whether to email

# Since this function is called only when the TaskInstance state is running,
# try_number contains the current try_number (not the next). We
# only mark task instance as FAILED if the next task instance
# try_number exceeds the max_tries ... or if force_fail is truthy

if force_fail or not self.is_eligible_to_retry():
self.state = State.FAILED
if force_fail:
log_message = "Immediate failure requested. Marking task as FAILED."
else:
self.state = State.FAILED
if task.retries:
self.log.info(
'All retries failed; marking task as FAILED.'
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'execution_date') and self.execution_date else '',
self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'start_date') and self.start_date else '',
self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'end_date') and self.end_date else '')
else:
self.log.info(
'Marking task as FAILED.'
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'execution_date') and self.execution_date else '',
self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'start_date') and self.start_date else '',
self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'end_date') and self.end_date else '')
if task.email_on_failure and task.email:
self.email_alert(error)
except Exception as e2:
self.log.error('Failed to send email to: %s', task.email)
self.log.exception(e2)
log_message = "Marking task as FAILED."
email_for_state = task.email_on_failure
callback = task.on_failure_callback
else:
self.state = State.UP_FOR_RETRY
log_message = "Marking task as UP_FOR_RETRY."
email_for_state = task.email_on_retry
callback = task.on_retry_callback

self.log.info(
'%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
log_message,
self.dag_id,
self.task_id,
self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
self._safe_date('start_date', '%Y%m%dT%H%M%S'),
self._safe_date('end_date', '%Y%m%dT%H%M%S')
)
if email_for_state and task.email:
try:
self.email_alert(error)
except Exception as e2:
self.log.error('Failed to send email to: %s', task.email)
self.log.exception(e2)

# Handling callbacks pessimistically
try:
if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
task.on_retry_callback(context)
if self.state == State.FAILED and task.on_failure_callback:
task.on_failure_callback(context)
except Exception as e3:
self.log.error("Failed at executing callback")
self.log.exception(e3)
if callback:
try:
callback(context)
except Exception as e3:
self.log.error("Failed at executing callback")
self.log.exception(e3)

if not test_mode:
session.merge(self)
Expand All @@ -1265,6 +1257,12 @@ def is_eligible_to_retry(self):
"""Is task instance is eligible for retry"""
return self.task.retries and self.try_number <= self.max_tries

def _safe_date(self, date_attr, fmt):
result = getattr(self, date_attr, None)
if result is not None:
return result.strftime(fmt)
return ''

@provide_session
def get_template_context(self, session=None) -> Dict[str, Any]:
task = self.task
Expand Down
13 changes: 12 additions & 1 deletion docs/autoapi_templates/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,21 @@ persisted in the database.

airflow/models/index

.. _pythonapi:exceptions:

Exceptions
----------

.. toctree::
:includehidden:
:glob:
:maxdepth: 1

airflow/exceptions/index

Secrets Backends
----------------
Airflow uses relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects.
Airflow relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects.
All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`.

.. toctree::
Expand Down
45 changes: 45 additions & 0 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,51 @@ template string:
See `Jinja documentation <https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_
to find all available options.

.. _exceptions:

Exceptions
==========

Airflow defines a number of exceptions; most of these are used internally, but a few
are relevant to authors of custom operators or python callables called from ``PythonOperator``
tasks. Normally any exception raised from an ``execute`` method or python callable will either
cause a task instance to fail if it is not configured to retry or has reached its limit on
retry attempts, or to be marked as "up for retry". A few exceptions can be used when different
behavior is desired:

* ``AirflowSkipException`` can be raised to set the state of the current task instance to "skipped"
* ``AirflowFailException`` can be raised to set the state of the current task to "failed" regardless
of whether there are any retry attempts remaining.

This example illustrates some possibilities

.. code:: python

from airflow.exceptions import AirflowFailException, AirflowSkipException

def fetch_data():
try:
data = get_some_data(get_api_key())
if not data:
# Set state to skipped and do not retry
# Downstream task behavior will be determined by trigger rules
raise AirflowSkipException("No data available.")
except Unauthorized:
# If we retry, our api key will still be bad, so don't waste time retrying!
# Set state to failed and move on
raise AirflowFailException("Our api key is bad!")
except TransientError:
print("Looks like there was a blip.")
# Raise the exception and let the task retry unless max attempts were reached
raise
handle(data)

task = PythonOperator(task_id="fetch_data", python_callable=fetch_data, retries=10)

.. seealso::
- :ref:`List of Airflow exceptions <pythonapi:exceptions>`


Packaged DAGs
'''''''''''''
While often you will specify DAGs in a single ``.py`` file it might sometimes
Expand Down
6 changes: 5 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,13 @@
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))

# Generate top-level

# do not exclude these top-level modules from the doc build:
allowed_top_level = ("exceptions.py",)

for path in glob(f"{ROOT_DIR}/airflow/*"):
name = os.path.basename(path)
if os.path.isfile(path):
if os.path.isfile(path) and not path.endswith(allowed_top_level):
exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}")
browsable_packages = ["operators", "hooks", "sensors", "providers", "executors", "models", "secrets"]
if os.path.isdir(path) and name not in browsable_packages:
Expand Down
58 changes: 57 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from sqlalchemy.orm.session import Session

from airflow import models, settings
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowFailException, AirflowSkipException
from airflow.models import (
DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as TI, TaskReschedule, Variable,
)
Expand Down Expand Up @@ -1514,6 +1514,62 @@ def test_handle_failure(self):
context_arg_2 = mock_on_retry_2.call_args[0][0]
assert context_arg_2 and "task_instance" in context_arg_2

# test the scenario where normally we would retry but have been asked to fail
mock_on_failure_3 = mock.MagicMock()
mock_on_retry_3 = mock.MagicMock()
task3 = DummyOperator(task_id="test_handle_failure_on_force_fail",
on_failure_callback=mock_on_failure_3,
on_retry_callback=mock_on_retry_3,
retries=1,
dag=dag)
ti3 = TI(task=task3, execution_date=start_date)
ti3.state = State.FAILED
ti3.handle_failure("test force_fail handling", force_fail=True)

context_arg_3 = mock_on_failure_3.call_args[0][0]
assert context_arg_3 and "task_instance" in context_arg_3
mock_on_retry_3.assert_not_called()

def test_does_not_retry_on_airflow_fail_exception(self):
def fail():
raise AirflowFailException("hopeless")

dag = models.DAG(dag_id='test_does_not_retry_on_airflow_fail_exception')
task = PythonOperator(
task_id='test_raise_airflow_fail_exception',
dag=dag,
python_callable=fail,
owner='airflow',
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
retries=1
)
ti = TI(task=task, execution_date=timezone.utcnow())
try:
ti.run()
except AirflowFailException:
pass # expected
self.assertEqual(State.FAILED, ti.state)

def test_retries_on_other_exceptions(self):
def fail():
raise AirflowException("maybe this will pass?")

dag = models.DAG(dag_id='test_retries_on_other_exceptions')
task = PythonOperator(
task_id='test_raise_other_exception',
dag=dag,
python_callable=fail,
owner='airflow',
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
retries=1
)
ti = TI(task=task, execution_date=timezone.utcnow())
try:
ti.run()
except AirflowException:
pass # expected
self.assertEqual(State.UP_FOR_RETRY, ti.state)

def _env_var_check_callback(self):
self.assertEqual('test_echo_env_variables', os.environ['AIRFLOW_CTX_DAG_ID'])
self.assertEqual('hive_in_python_op', os.environ['AIRFLOW_CTX_TASK_ID'])
Expand Down