-
Notifications
You must be signed in to change notification settings - Fork 16.4k
AIRFLOW-124 Implement create_dagrun #1506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1935,7 +1935,8 @@ def dag(self, dag): | |
| "The DAG assigned to {} can not be changed.".format(self)) | ||
| elif self.task_id not in dag.task_dict: | ||
| dag.add_task(self) | ||
| self._dag = dag | ||
|
|
||
| self._dag = dag | ||
|
|
||
| def has_dag(self): | ||
| """ | ||
|
|
@@ -3073,6 +3074,56 @@ def cli(self): | |
| args = parser.parse_args() | ||
| args.func(args, self) | ||
|
|
||
| @provide_session | ||
| def create_dagrun(self, | ||
| run_id, | ||
| execution_date, | ||
| state, | ||
| start_date=None, | ||
| external_trigger=False, | ||
| conf=None, | ||
| session=None): | ||
| """ | ||
| Creates a dag run from this dag including the tasks associated with this dag. Returns the dag | ||
| run. | ||
| :param run_id: defines the the run id for this dag run | ||
| :type run_id: string | ||
| :param execution_date: the execution date of this dag run | ||
| :type execution_date: datetime | ||
| :param state: the state of the dag run | ||
| :type state: State | ||
| :param start_date: the date this dag run should be evaluated | ||
| :type state_date: datetime | ||
| :param external_trigger: whether this dag run is externally triggered | ||
| :type external_trigger: bool | ||
| :param session: database session | ||
| :type session: Session | ||
| """ | ||
| run = DagRun( | ||
| dag_id=self.dag_id, | ||
| run_id=run_id, | ||
| execution_date=execution_date, | ||
| start_date=start_date, | ||
| external_trigger=external_trigger, | ||
| conf=conf, | ||
| state=state | ||
| ) | ||
| session.add(run) | ||
|
|
||
| # create the associated taskinstances | ||
| # state is None at the moment of creation | ||
| for task in self.tasks: | ||
| if task.adhoc: | ||
| continue | ||
|
|
||
| ti = TaskInstance(task, execution_date) | ||
| session.add(ti) | ||
|
|
||
| session.commit() | ||
|
|
||
| run.refresh_from_db() | ||
|
||
| return run | ||
|
|
||
|
|
||
| class Chart(Base): | ||
| __tablename__ = "chart" | ||
|
|
@@ -3368,6 +3419,50 @@ def __repr__(self): | |
| def id_for_date(klass, date, prefix=ID_FORMAT_PREFIX): | ||
| return prefix.format(date.isoformat()[:19]) | ||
|
|
||
| @provide_session | ||
| def refresh_from_db(self, session=None): | ||
|
||
| """ | ||
| Reloads the current dagrun from the database | ||
| :param session: database session | ||
| """ | ||
| DR = DagRun | ||
|
|
||
| dr = session.query(DR).filter( | ||
| DR.dag_id == self.dag_id, | ||
| DR.execution_date == self.execution_date, | ||
| DR.run_id == self.run_id | ||
| ).one() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh first time I see
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, I was wondering why it wasn't used a bit more. Kept the 'if' in for consistency, but obviously it should be removed. I will provide a small update for that |
||
| if dr: | ||
| self.id = dr.id | ||
| self.state = dr.state | ||
|
|
||
| @staticmethod | ||
| @provide_session | ||
| def find(dag_id, run_id=None, state=None, external_trigger=None, session=None): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for encapsulating the logic by the way, this is amazing. |
||
| """ | ||
| Returns a set of dag runs for the given search criteria. | ||
| :param run_id: defines the the run id for this dag run | ||
| :type run_id: string | ||
| :param state: the state of the dag run | ||
| :type state: State | ||
| :param external_trigger: whether this dag run is externally triggered | ||
| :type external_trigger: bool | ||
| :param session: database session | ||
| :type session: Session | ||
| """ | ||
| DR = DagRun | ||
|
|
||
| qry = session.query(DR).filter(DR.dag_id == dag_id) | ||
| if run_id: | ||
| qry = qry.filter(DR.run_id == run_id) | ||
| if state: | ||
| qry = qry.filter(DR.state == state) | ||
| if external_trigger: | ||
| qry = qry.filter(DR.external_trigger == external_trigger) | ||
| dr = qry.all() | ||
|
|
||
| return dr | ||
|
|
||
|
|
||
| class Pool(Base): | ||
| __tablename__ = "slot_pool" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should use different exit codes for different errors statuses, also it looks like the other parts of the CLI raise instead of exiting (though exiting is the right call since this is user facing), maybe turn the raises in the top-level functions into sys.exits too to be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aoen I agree, but the full change should be part of a different PR I think. I can change to raising and create a Jira for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, and thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/AIRFLOW-126