Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,32 @@ def backfill(args, dag=None):


def trigger_dag(args):
session = settings.Session()
# TODO: verify dag_id
dag = get_dag(args)

if not dag:
logging.error("Cannot find dag {}".format(args.dag_id))
sys.exit(1)
Copy link
Contributor

@aoen aoen May 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should use different exit codes for different errors statuses, also it looks like the other parts of the CLI raise instead of exiting (though exiting is the right call since this is user facing), maybe turn the raises in the top-level functions into sys.exits too to be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aoen I agree, but the full change should be part of a different PR I think. I can change to raising and create a Jira for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, and thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


execution_date = datetime.now()
run_id = args.run_id or "manual__{0}".format(execution_date.isoformat())
dr = session.query(DagRun).filter(
DagRun.dag_id == args.dag_id, DagRun.run_id == run_id).first()

conf = {}
if args.conf:
conf = json.loads(args.conf)
dr = DagRun.find(dag_id=args.dag_id, run_id=run_id)
if dr:
logging.error("This run_id already exists")
else:
trigger = DagRun(
dag_id=args.dag_id,
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=conf,
external_trigger=True)
session.add(trigger)
logging.info("Created {}".format(trigger))
session.commit()
logging.error("This run_id {} already exists".format(run_id))
raise AirflowException()

run_conf = {}
if args.conf:
run_conf = json.loads(args.conf)

trigger = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True
)
logging.info("Created {}".format(trigger))


def variables(args):
Expand Down
14 changes: 5 additions & 9 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,11 @@ def schedule_dag(self, dag):
if dag.schedule_interval:
DagRun = models.DagRun
session = settings.Session()
qry = session.query(DagRun).filter(
DagRun.dag_id == dag.dag_id,
DagRun.external_trigger == False,
DagRun.state == State.RUNNING,
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False
)
active_runs = qry.all()
if len(active_runs) >= dag.max_active_runs:
return
for dr in active_runs:
Expand Down Expand Up @@ -457,16 +456,13 @@ def schedule_dag(self, dag):
return

if next_run_date and schedule_end and schedule_end <= datetime.now():
next_run = DagRun(
dag_id=dag.dag_id,
next_run = dag.create_dagrun(
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=datetime.now(),
state=State.RUNNING,
external_trigger=False
)
session.add(next_run)
session.commit()
return next_run

def process_dag(self, dag, queue):
Expand Down
97 changes: 96 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,8 @@ def dag(self, dag):
"The DAG assigned to {} can not be changed.".format(self))
elif self.task_id not in dag.task_dict:
dag.add_task(self)
self._dag = dag

self._dag = dag

def has_dag(self):
"""
Expand Down Expand Up @@ -3073,6 +3074,56 @@ def cli(self):
args = parser.parse_args()
args.func(args, self)

@provide_session
def create_dagrun(self,
run_id,
execution_date,
state,
start_date=None,
external_trigger=False,
conf=None,
session=None):
"""
Creates a dag run from this dag including the tasks associated with this dag. Returns the dag
run.
:param run_id: defines the the run id for this dag run
:type run_id: string
:param execution_date: the execution date of this dag run
:type execution_date: datetime
:param state: the state of the dag run
:type state: State
:param start_date: the date this dag run should be evaluated
:type state_date: datetime
:param external_trigger: whether this dag run is externally triggered
:type external_trigger: bool
:param session: database session
:type session: Session
"""
run = DagRun(
dag_id=self.dag_id,
run_id=run_id,
execution_date=execution_date,
start_date=start_date,
external_trigger=external_trigger,
conf=conf,
state=state
)
session.add(run)

# create the associated taskinstances
# state is None at the moment of creation
for task in self.tasks:
if task.adhoc:
continue

ti = TaskInstance(task, execution_date)
session.add(ti)

session.commit()

run.refresh_from_db()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this (and maybe consider adding a comment in the code too with your answer)?

Copy link
Contributor Author

@bolkedebruin bolkedebruin May 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah the important bit: this create_dagrun creates the full dagrun and thus includes creating the taskinstances for this dagrun. Considering that a Dag is an ordered combination of tasks a DagRun should also be this at any given point in time. Ie. the current situation violates this: DagRuns are created shallow, without TaskInstances.

Having this in resolves issues with some occurrences of deadlocks and depend_on_past and I do consider it important for the integrity of airflow going forward and is one of the reasons of using "create_dagrun" to make sure we will be consistent in creating dag runs.

state for the create TaskInstances will be None / NULL and that does not interfere with any of the checks (confirmed by inspection and tests)

And yes I will add a comment in the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for the thorough explanation!

return run


class Chart(Base):
__tablename__ = "chart"
Expand Down Expand Up @@ -3368,6 +3419,50 @@ def __repr__(self):
def id_for_date(klass, date, prefix=ID_FORMAT_PREFIX):
return prefix.format(date.isoformat()[:19])

@provide_session
def refresh_from_db(self, session=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: docs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about docstrings, I had pylint setup on landscape.io to only require a docstring if the method is 10+ lines which I thought was reasonable

"""
Reloads the current dagrun from the database
:param session: database session
"""
DR = DagRun

dr = session.query(DR).filter(
DR.dag_id == self.dag_id,
DR.execution_date == self.execution_date,
DR.run_id == self.run_id
).one()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh first time I see one(). sqla has a huge api... I just looked it up and one() raises if empty, meaning the if bellow is not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I was wondering why it wasn't used a bit more. Kept the 'if' in for consistency, but obviously it should be removed. I will provide a small update for that

if dr:
self.id = dr.id
self.state = dr.state

@staticmethod
@provide_session
def find(dag_id, run_id=None, state=None, external_trigger=None, session=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for encapsulating the logic by the way, this is amazing.

"""
Returns a set of dag runs for the given search criteria.
:param run_id: defines the the run id for this dag run
:type run_id: string
:param state: the state of the dag run
:type state: State
:param external_trigger: whether this dag run is externally triggered
:type external_trigger: bool
:param session: database session
:type session: Session
"""
DR = DagRun

qry = session.query(DR).filter(DR.dag_id == dag_id)
if run_id:
qry = qry.filter(DR.run_id == run_id)
if state:
qry = qry.filter(DR.state == state)
if external_trigger:
qry = qry.filter(DR.external_trigger == external_trigger)
dr = qry.all()

return dr


class Pool(Base):
__tablename__ = "slot_pool"
Expand Down
33 changes: 11 additions & 22 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_schedule_dag_no_previous_runs(self):
.format(dag_run.execution_date))
assert dag_run.state == State.RUNNING
assert dag_run.external_trigger == False
dag.clear()

def test_schedule_dag_fake_scheduled_previous(self):
"""
Expand All @@ -131,14 +132,10 @@ def test_schedule_dag_fake_scheduled_previous(self):
owner='Also fake',
start_date=DEFAULT_DATE))
scheduler = jobs.SchedulerJob(test_mode=True)
trigger = models.DagRun(
dag_id=dag.dag_id,
run_id=models.DagRun.id_for_date(DEFAULT_DATE),
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
external_trigger=True)
settings.Session().add(trigger)
settings.Session().commit()
dag.create_dagrun(run_id=models.DagRun.id_for_date(DEFAULT_DATE),
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
external_trigger=True)
dag_run = scheduler.schedule_dag(dag)
assert dag_run is not None
assert dag_run.dag_id == dag.dag_id
Expand Down Expand Up @@ -166,6 +163,7 @@ def test_schedule_dag_once(self):

assert dag_run is not None
assert dag_run2 is None
dag.clear()

def test_schedule_dag_start_end_dates(self):
"""
Expand All @@ -180,16 +178,13 @@ def test_schedule_dag_start_end_dates(self):
start_date=start_date,
end_date=end_date,
schedule_interval=delta)
dag.add_task(models.BaseOperator(task_id='faketastic',
owner='Also fake'))

# Create and schedule the dag runs
dag_runs = []
scheduler = jobs.SchedulerJob(test_mode=True)
for i in range(runs):
date = dag.start_date + i * delta
task = models.BaseOperator(task_id='faketastic__%s' % i,
owner='Also fake',
start_date=date)
dag.task_dict[task.task_id] = task
dag_runs.append(scheduler.schedule_dag(dag))

additional_dag_run = scheduler.schedule_dag(dag)
Expand Down Expand Up @@ -219,19 +214,12 @@ def test_schedule_dag_no_end_date_up_to_today_only(self):
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_no_end_date_up_to_today_only',
start_date=start_date,
schedule_interval=delta)
dag.add_task(models.BaseOperator(task_id='faketastic',
owner='Also fake'))

dag_runs = []
scheduler = jobs.SchedulerJob(test_mode=True)
for i in range(runs):
# Create the DagRun
date = dag.start_date + i * delta
task = models.BaseOperator(task_id='faketastic__%s' % i,
owner='Also fake',
start_date=date)

dag.task_dict[task.task_id] = task

# Schedule the DagRun
dag_run = scheduler.schedule_dag(dag)
dag_runs.append(dag_run)

Expand Down Expand Up @@ -730,6 +718,7 @@ def test_trigger_dag(self):
cli.trigger_dag,
self.parser.parse_args([
'trigger_dag', 'example_bash_operator',
'--run_id', 'trigger_dag_xxx',
'-c', 'NOT JSON'])
)

Expand Down
6 changes: 3 additions & 3 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ def test_dagrun_root_fail(self):

def test_dagrun_deadlock(self):
"""
Deadlocked DagRun is marked a failure
Do not deadlock

Test that a deadlocked dagrun is marked as a failure by having
Test that a dagrun is marked as a running by having
depends_on_past and an execution_date after the start_date
"""
self.evaluate_dagrun(
Expand All @@ -263,7 +263,7 @@ def test_dagrun_deadlock(self):
'test_depends_on_past': None,
'test_depends_on_past_2': None,
},
dagrun_state=State.FAILED,
dagrun_state=State.RUNNING,
advance_execution_date=True)

def test_scheduler_pooled_tasks(self):
Expand Down