Skip to content

SubDag Operator broken when used with pools - infinite retry of tasks #1225

@syvineckruyk

Description

@syvineckruyk

Dear Airflow Maintainers,

Before I tell you about my issue, let me describe my environment:

Environment

Centos 6
CeleryExecutor (redis)
Two workers, 1 scheduler, 1 webserver
MySql

  • Version of Airflow 1.6.2.10, 1.7.0rc1, HEAD
  • Example code to reproduce the bug (as a code snippet in markdown)
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators import BashOperator, SubDagOperator



dag_name = 'test_18'
test_pool = 'test_pool'
email_to = 'your@email.com'

now = datetime.now()
start_time = datetime(now.year, now.month, now.day, 12, 0, 0)
start_time = start_time + timedelta(days=-1)

default_args = {
    'owner': 'Test',
    'depends_on_past': True,
    'start_date': start_time,
    'email': [email_to],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 * * * *')


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 10',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 10 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=30),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
  • Screen shots of your DAG's graph and tree views:
  1. sub-task of non pooled subdag fails
    af_issue_1
  2. subdag enters retry state
    af_issue_2
  3. after all retries exhausted non pooled subdag fails
    af_issue_6
  4. sub-task of pooled subdag fails
    af_issue_3
  5. pooled subdag remains in running state, sub-task of pooled subdag enters queued state
    af_issue_4
  6. sub-task of pooled subdag enters running state
    af_issue_5
  7. sub-task failures of pooled subdags generate email alerts for all failures
    af_issue_7
  8. logs (from UI) also show observed behavior
Id  Dttm    Dag Id  Task Id Event   Execution Date  Owner   Extra
1910    03-27T15:37:36  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1911    03-27T15:37:46  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1916    03-27T15:39:18  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1918    03-27T15:39:28  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1924    03-27T15:41:18  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1930    03-27T15:41:28  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1912    03-27T15:37:54  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1914    03-27T15:38:05  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1917    03-27T15:39:23  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1920    03-27T15:39:33  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1921    03-27T15:40:54  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1922    03-27T15:41:04  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1925    03-27T15:41:26  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1926    03-27T15:41:36  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1927    03-27T15:41:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1928    03-27T15:42:02  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1929    03-27T15:42:18  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1931    03-27T15:42:28  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1933    03-27T15:43:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1934    03-27T15:44:02  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1935    03-27T15:44:30  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1936    03-27T15:44:40  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1937    03-27T15:44:56  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1938    03-27T15:45:06  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1939    03-27T15:46:35  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1940    03-27T15:46:45  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1941    03-27T15:47:42  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1942    03-27T15:47:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1943    03-27T15:49:20  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1944    03-27T15:49:30  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
...

Now that you know a little about me, let me tell you about the issue I am having:

Description of Issue

When subdag operators are used with pools, sub-task failures are not "bubbled" up to the subdag operator. The contained failed task will retry indefinitely even when retries is set to 0.

When a subtask failure occurs the subdag operator remains in the running state, the sub-task quickly enters a queued state, and shortly there after re-enters a running state.

  • What did you expect to happen?
  • sub-task fails
  • subdag operator enters retry state
  • subdag operator enters running state when retry_delay has been met
  • sub-tasks get queued
  • sub-tasks enter running state
  • What happened instead?
  • sub-task fails
  • sub-task enters queued state
  • sub-task runs
  • rinse, repeat
  • Here is how you can reproduce this issue on your machine:

    Reproduction Steps

  • create a pool
  • set the email and pool in included DAG code
  • submit to your dags folder
  • observe non pooled subdag handle failure as specified
  • observe pooled subdag enter infinite loop

This behavior has been observed on multiple versions. I did the testing for submitting this issue on 1.7.0rc1.

I also tested on HEAD 2016-03-26. The issue seems to get worst. The subdag which is not pooled (which works as expected on 1.7.0rc1) enters the retry state as it should .. but then never re-enters a running state. Hanging indefinitely.

Metadata

Metadata

Assignees

Labels

kind:bugThis is a clearly a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions