Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 90 additions & 84 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from past.builtins import basestring
from collections import defaultdict, Counter
from datetime import datetime
from itertools import product
import getpass
import logging
import socket
Expand All @@ -40,7 +39,9 @@
from airflow.utils.email import send_email
from airflow.utils.logging import LoggingMixin
from airflow.utils import asciiart
from airflow.settings import Stats

DagRun = models.DagRun
Base = models.Base
ID_LEN = models.ID_LEN
Stats = settings.Stats
Expand Down Expand Up @@ -401,11 +402,16 @@ def schedule_dag(self, dag):
dr.end_date = datetime.now()
session.commit()

qry = session.query(func.max(DagRun.execution_date)).filter_by(
dag_id = dag.dag_id).filter(
or_(DagRun.external_trigger == False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX+'%')))
# this query should be replace by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX+'%')
))
)
last_scheduled_run = qry.scalar()

# don't schedule @once again
Expand Down Expand Up @@ -464,7 +470,6 @@ def process_dag(self, dag, queue):
function takes a lock on the DAG and timestamps the last run
in ``last_scheduler_run``.
"""
TI = models.TaskInstance
DagModel = models.DagModel
session = settings.Session()

Expand All @@ -474,84 +479,78 @@ def process_dag(self, dag, queue):
executors.LocalExecutor, executors.SequentialExecutor):
pickle_id = dag.pickle(session).id

db_dag = session.query(DagModel).filter_by(dag_id=dag.dag_id).first()
# obtain db lock
db_dag = session.query(DagModel).filter_by(
dag_id=dag.dag_id
).with_for_update().one()

last_scheduler_run = db_dag.last_scheduler_run or datetime(2000, 1, 1)
secs_since_last = (
datetime.now() - last_scheduler_run).total_seconds()
# if db_dag.scheduler_lock or
secs_since_last = (datetime.now() - last_scheduler_run).total_seconds()

if secs_since_last < self.heartrate:
# release db lock
session.commit()
session.close()
return None
else:
# Taking a lock
db_dag.scheduler_lock = True
db_dag.last_scheduler_run = datetime.now()
session.commit()

active_runs = dag.get_active_runs()
# Release the db lock
# the assumption here is that process_dag will take less
# time than self.heartrate otherwise we might unlock too
# quickly and this should moved below, but that would increase
# the time the record is locked and is blocking for other calls.
db_dag.last_scheduler_run = datetime.now()
session.commit()

self.logger.info('Getting list of tasks to skip for active runs.')
skip_tis = set()
if active_runs:
qry = (
session.query(TI.task_id, TI.execution_date)
.filter(
TI.dag_id == dag.dag_id,
TI.execution_date.in_(active_runs),
TI.state.in_((State.RUNNING, State.SUCCESS, State.FAILED)),
)
)
skip_tis = {(ti[0], ti[1]) for ti in qry.all()}
# update the state of the previously active dag runs
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING)
active_dag_runs = []
for run in dag_runs:
# do not consider runs that are executed in the future
if run.execution_date > datetime.now():
continue

descartes = [obj for obj in product(dag.tasks, active_runs)]
could_not_run = set()
self.logger.info('Checking dependencies on {} tasks instances, minus {} '
'skippable ones'.format(len(descartes), len(skip_tis)))
# todo: run.task is transient but needs to be set
run.dag = dag
# todo: preferably the integrity check happens at dag collection time
run.verify_integrity()
run.update_state()
if run.state == State.RUNNING:
active_dag_runs.append(run)

for run in active_dag_runs:
tis = run.get_task_instances(session=session, state=(State.NONE,
State.UP_FOR_RETRY))

# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
# update_state above which has already checked these tasks
for ti in tis:
task = dag.get_task(ti.task_id)

for task, dttm in descartes:
if task.adhoc or (task.task_id, dttm) in skip_tis:
continue
ti = TI(task, dttm)
# fixme: ti.task is transient but needs to be set
ti.task = task

ti.refresh_from_db()
if ti.state in (
State.RUNNING, State.QUEUED, State.SUCCESS, State.FAILED):
continue
elif ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Queuing task: {}'.format(ti))
queue.put((ti.key, pickle_id))
elif ti.is_premature():
continue
else:
self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti))
could_not_run.add(ti)

# this type of deadlock happens when dagruns can't even start and so
# the TI's haven't been persisted to the database.
if len(could_not_run) == len(descartes) and len(could_not_run) > 0:
self.logger.error(
'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id))
(session
.query(models.DagRun)
.filter(
models.DagRun.dag_id == dag.dag_id,
models.DagRun.state == State.RUNNING,
models.DagRun.execution_date.in_(active_runs))
.update(
{models.DagRun.state: State.FAILED},
synchronize_session='fetch'))

# Releasing the lock
self.logger.debug("Unlocking DAG (scheduler_lock)")
db_dag = (
session.query(DagModel)
.filter(DagModel.dag_id == dag.dag_id)
.first()
)
db_dag.scheduler_lock = False
session.merge(db_dag)
session.commit()
# future: remove adhoc
if task.adhoc:
continue

if ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Queuing task: {}'.format(ti))

ti.refresh_from_db(session=session, lock_for_update=True)
# another scheduler could have picked this task
# todo: UP_FOR_RETRY still could create a race condition
if ti.state is State.SCHEDULED:
session.commit()
self.logger.debug("Task {} was picked up by another scheduler"
.format(ti))
continue
elif ti.state is State.NONE:
ti.state = State.SCHEDULED
session.merge(ti)

session.commit()
queue.put((ti.key, pickle_id))

session.close()

Expand Down Expand Up @@ -643,13 +642,13 @@ def prioritize_queued(self, session, executor, dagbag):

session.commit()

def _split_dags(self, dags, size):
def _split(self, items, size):
"""
This function splits a list of dags into chunks of int size.
_split_dags([1,2,3,4,5,6], 3) becomes [[1,2,3],[4,5,6]]
This function splits a list of items into chunks of int size.
_split([1,2,3,4,5,6], 3) becomes [[1,2,3],[4,5,6]]
"""
size = max(1, size)
return [dags[i:i + size] for i in range(0, len(dags), size)]
return [items[i:i + size] for i in range(0, len(items), size)]

def _do_dags(self, dagbag, dags, tis_out):
"""
Expand Down Expand Up @@ -713,7 +712,7 @@ def _execute(self):
format(multiprocessing.cpu_count(),
self.max_threads,
len(dags)))
dags = self._split_dags(dags, math.ceil(len(dags) / self.max_threads))
dags = self._split(dags, math.ceil(len(dags) / self.max_threads))
tis_q = multiprocessing.Queue()
jobs = [multiprocessing.Process(target=self._do_dags,
args=(dagbag, dags[i], tis_q))
Expand All @@ -738,6 +737,7 @@ def _execute(self):
"heartbeat")
duration_sec = (datetime.now() - loop_start_dttm).total_seconds()
self.logger.info("Loop took: {} seconds".format(duration_sec))
Stats.timing("scheduler_loop", duration_sec * 1000)
try:
self.import_errors(dagbag)
except Exception as e:
Expand Down Expand Up @@ -843,22 +843,22 @@ def _execute(self):
while tasks_to_run and not deadlocked:
not_ready.clear()
for key, ti in list(tasks_to_run.items()):

ti.refresh_from_db()
ti.refresh_from_db(session=session, lock_for_update=True)
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))

# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if key not in started:
if ti.state == State.SUCCESS:
succeeded.add(key)
tasks_to_run.pop(key)
session.commit()
continue
elif ti.state == State.SKIPPED:
skipped.add(key)
tasks_to_run.pop(key)
session.commit()
continue

# Is the task runnable? -- then run it
Expand All @@ -867,6 +867,10 @@ def _execute(self):
ignore_depends_on_past=ignore_depends_on_past,
flag_upstream_failed=True):
self.logger.debug('Sending {} to executor'.format(ti))
if ti.state == State.NONE:
ti.state = State.SCHEDULED
session.merge(ti)
session.commit()
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
Expand All @@ -880,6 +884,8 @@ def _execute(self):
elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
not_ready.add(key)

session.commit()

self.heartbeat()
executor.heartbeat()

Expand Down Expand Up @@ -950,7 +956,7 @@ def _execute(self):

# executor reports success but task does not - this is weird
elif ti.state not in (
State.SUCCESS,
State.SCHEDULED,
State.QUEUED,
State.UP_FOR_RETRY):
self.logger.error(
Expand Down
Loading