Skip to content
Closed
8 changes: 2 additions & 6 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ class GroupCommand(NamedTuple):
name="list",
help="List the tasks within a DAG",
func=lazy_load_command("airflow.cli.commands.task_command.task_list"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_VERBOSE),
),
ActionCommand(
name="clear",
Expand All @@ -1187,7 +1187,6 @@ class GroupCommand(NamedTuple):
ARG_TASK_REGEX,
ARG_START_DATE,
ARG_END_DATE,
ARG_SUBDIR,
ARG_UPSTREAM,
ARG_DOWNSTREAM,
ARG_YES,
Expand All @@ -1205,7 +1204,6 @@ class GroupCommand(NamedTuple):
ARG_DAG_ID,
ARG_TASK_ID,
ARG_LOGICAL_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
),
Expand All @@ -1219,7 +1217,7 @@ class GroupCommand(NamedTuple):
"and then run by an executor."
),
func=lazy_load_command("airflow.cli.commands.task_command.task_failed_deps"),
args=(ARG_DAG_ID, ARG_TASK_ID, ARG_LOGICAL_DATE_OR_RUN_ID, ARG_SUBDIR, ARG_MAP_INDEX, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_TASK_ID, ARG_LOGICAL_DATE_OR_RUN_ID, ARG_MAP_INDEX, ARG_VERBOSE),
),
ActionCommand(
name="render",
Expand All @@ -1229,7 +1227,6 @@ class GroupCommand(NamedTuple):
ARG_DAG_ID,
ARG_TASK_ID,
ARG_LOGICAL_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
),
Expand All @@ -1246,7 +1243,6 @@ class GroupCommand(NamedTuple):
ARG_DAG_ID,
ARG_TASK_ID,
ARG_LOGICAL_DATE_OR_RUN_ID_OPTIONAL,
ARG_SUBDIR,
ARG_DRY_RUN,
ARG_TASK_PARAMS,
ARG_POST_MORTEM,
Expand Down
22 changes: 12 additions & 10 deletions airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def task_failed_deps(args) -> None:
Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks
to have succeeded, but found 1 non-success(es).
"""
dag = get_dag(args.subdir, args.dag_id)
dag = get_dag(subdir=None, dag_id=args.dag_id, from_db=True)

task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(task, args.map_index, logical_date_or_run_id=args.logical_date_or_run_id)
dep_context = DepContext(deps=SCHEDULER_QUEUED_DEPS)
Expand All @@ -255,7 +256,8 @@ def task_state(args) -> None:
>>> airflow tasks state tutorial sleep 2015-01-01
success
"""
dag = get_dag(args.subdir, args.dag_id, from_db=True)
dag = get_dag(subdir=None, dag_id=args.dag_id, from_db=True)

task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(task, args.map_index, logical_date_or_run_id=args.logical_date_or_run_id)
print(ti.state)
Expand All @@ -266,7 +268,8 @@ def task_state(args) -> None:
@providers_configuration_loaded
def task_list(args, dag: DAG | None = None) -> None:
"""List the tasks within a DAG at the command line."""
dag = dag or get_dag(args.subdir, args.dag_id)
dag = dag or get_dag(subdir=None, dag_id=args.dag_id, from_db=True)

tasks = sorted(t.task_id for t in dag.tasks)
print("\n".join(tasks))

Expand Down Expand Up @@ -365,7 +368,7 @@ def task_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> N
env_vars.update(args.env_vars)
os.environ.update(env_vars)

dag = dag or get_dag(args.subdir, args.dag_id)
dag = dag or get_dag(subdir=None, dag_id=args.dag_id, from_db=True)

dag = DAG.from_sdk_dag(dag)

Expand Down Expand Up @@ -423,8 +426,8 @@ def task_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> N
@providers_configuration_loaded
def task_render(args, dag: DAG | None = None) -> None:
"""Render and displays templated fields for a given task."""
if not dag:
dag = get_dag(args.subdir, args.dag_id)
dag = dag or get_dag(subdir=None, dag_id=args.dag_id, from_db=True)

task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(
task, args.map_index, logical_date_or_run_id=args.logical_date_or_run_id, create_if_necessary="memory"
Expand All @@ -448,13 +451,12 @@ def task_render(args, dag: DAG | None = None) -> None:
def task_clear(args) -> None:
"""Clear all task instances or only those matched by regex for a DAG(s)."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
if args.dag_id and not args.subdir and not args.dag_regex and not args.task_regex:

if args.dag_id and not args.dag_regex and not args.task_regex:
dags = [get_dag_by_file_location(args.dag_id)]
else:
# todo clear command only accepts a single dag_id. no reason for get_dags with 's' except regex?
# Reading from_db because clear method still not implemented in Task SDK DAG
dags = get_dags(args.subdir, args.dag_id, use_regex=args.dag_regex, from_db=True)

dags = get_dags(None, args.dag_id, use_regex=args.dag_regex)
if args.task_regex:
for idx, dag in enumerate(dags):
dags[idx] = dag.partial_subset(
Expand Down
7 changes: 5 additions & 2 deletions airflow-core/src/airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,11 @@ def get_dags(subdir: str | None, dag_id: str, use_regex: bool = False, from_db:
from airflow.models import DagBag

if not use_regex:
return [get_dag(subdir=subdir, dag_id=dag_id, from_db=from_db)]
dagbag = DagBag(process_subdir(subdir))
return [get_dag(subdir=None, dag_id=dag_id, from_db=True)]

dagbag = DagBag(read_dags_from_db=True)
dagbag.collect_dags_from_db()

matched_dags = [dag for dag in dagbag.dags.values() if re.search(dag_id, dag.dag_id)]
if not matched_dags:
raise AirflowException(
Expand Down
6 changes: 5 additions & 1 deletion airflow-core/tests/unit/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,14 @@ def test_task_render(self):
assert 'echo "2016-01-01"' in output
assert 'echo "2016-01-08"' in output

def test_mapped_task_render(self):
def test_mapped_task_render(self, get_test_dag):
"""
tasks render should render and displays templated fields for a given mapping task
"""

# Retrieve and serialize a test DAG for unit testing.
get_test_dag("test_mapped_classic")

with redirect_stdout(io.StringIO()) as stdout:
task_command.task_render(
self.parser.parse_args(
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/tests/unit/utils/test_cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def test_success_function(self):
def test_process_subdir_path_with_placeholder(self):
assert os.path.join(settings.DAGS_FOLDER, "abc") == cli.process_subdir("DAGS_FOLDER/abc")

def test_get_dags(self):
def test_get_dags(self, get_test_dag):
get_test_dag("test_example_bash_operator")
dags = cli.get_dags(None, "test_example_bash_operator")
assert len(dags) == 1

Expand Down
Loading