Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airflow] Update AIR302 to check for deprecated context keys #15144

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Check arguments and function decorated with @task
  • Loading branch information
sunank200 committed Jan 23, 2025
commit 3687e7913bcacad4243087e03f03786e987738e8
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
import pendulum
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from airflow.plugins_manager import AirflowPlugin
from airflow.decorators import task, get_current_context
from airflow.models.baseoperator import BaseOperator
from airflow.decorators import dag, task
from airflow.providers.standard.operators.python import PythonOperator


def access_invalid_key_in_context(**context):
print("access invalid key", context["conf"])


@task
def access_invalid_key_task_out_of_dag(**context):
print("access invalid key", context.get("conf"))



@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=[""],
)
def invalid_dag():
@task()
def access_invalid_key_task(**context):
print("access invalid key", context.get("conf"))

task1 = PythonOperator(
task_id="task1",
python_callable=access_invalid_key_in_context,
)
access_invalid_key_task() >> task1
access_invalid_key_task_out_of_dag()


invalid_dag()

@task
def print_config(**context):
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -74,3 +109,16 @@ def execute(self, context):
tomorrow_ds = context["tomorrow_ds"]
yesterday_ds = context["yesterday_ds"]
yesterday_ds_nodash = context["yesterday_ds_nodash"]

@task
def access_invalid_argument_task_out_of_dag(execution_date, **context):
print("execution date", execution_date)
print("access invalid key", context.get("conf"))

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print(ds)
print(kwargs.get("tomorrow_ds"))

run_this = print_context()
133 changes: 115 additions & 18 deletions crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::checkers::ast::Checker;
use ruff_diagnostics::{Diagnostic, Edit, Fix, FixAvailability, Violation};
use ruff_macros::{derive_message_formats, ViolationMetadata};
use ruff_python_ast::helpers::map_callable;
use ruff_python_ast::{
name::QualifiedName, Arguments, Expr, ExprAttribute, ExprCall, ExprContext, ExprName,
ExprStringLiteral, ExprSubscript, StmtClassDef,
ExprStringLiteral, ExprSubscript, Stmt, StmtClassDef, StmtFunctionDef,
};
use ruff_python_semantic::analyze::typing;
use ruff_python_semantic::Modules;
use ruff_python_semantic::ScopeKind;
use ruff_text_size::Ranged;
use ruff_text_size::TextRange;

use crate::checkers::ast::Checker;

/// ## What it does
/// Checks for uses of deprecated Airflow functions and values.
///
Expand Down Expand Up @@ -71,6 +71,21 @@ impl Violation for Airflow3Removal {
}
}

const REMOVED_CONTEXT_KEYS: [&str; 12] = [
"conf",
"execution_date",
"next_ds",
"next_ds_nodash",
"next_execution_date",
"prev_ds",
"prev_ds_nodash",
"prev_execution_date",
"prev_execution_date_success",
"tomorrow_ds",
"yesterday_ds",
"yesterday_ds_nodash",
];

fn extract_name_from_slice(slice: &Expr) -> Option<String> {
match slice {
Expr::StringLiteral(ExprStringLiteral { value, .. }) => Some(value.to_string()),
Expand All @@ -79,21 +94,6 @@ fn extract_name_from_slice(slice: &Expr) -> Option<String> {
}

pub(crate) fn removed_context_variable(checker: &mut Checker, expr: &Expr) {
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
const REMOVED_CONTEXT_KEYS: [&str; 12] = [
"conf",
"execution_date",
"next_ds",
"next_ds_nodash",
"next_execution_date",
"prev_ds",
"prev_ds_nodash",
"prev_execution_date",
"prev_execution_date_success",
"tomorrow_ds",
"yesterday_ds",
"yesterday_ds_nodash",
];

if let Expr::Subscript(ExprSubscript { value, slice, .. }) = expr {
if let Expr::Name(ExprName { id, .. }) = &**value {
if id.as_str() == "context" {
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -144,6 +144,7 @@ pub(crate) fn removed_in_3(checker: &mut Checker, expr: &Expr) {
check_call_arguments(checker, &qualname, arguments);
};
check_method(checker, call_expr);
check_context_get(checker, call_expr);
}
Expr::Attribute(attribute_expr @ ExprAttribute { attr, .. }) => {
check_name(checker, expr, attr.range());
Expand Down Expand Up @@ -312,6 +313,50 @@ fn check_class_attribute(checker: &mut Checker, attribute_expr: &ExprAttribute)
}
}

/// Check whether a removed context key is access through context.get("key").
///
/// ```python
/// from airflow.decorators import task
///
///
/// @task
/// def access_invalid_key_task_out_of_dag(**context):
/// print("access invalid key", context.get("conf"))
/// ```
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
fn check_context_get(checker: &mut Checker, call_expr: &ExprCall) {
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
if is_task_context_referenced(checker, &call_expr.func) {
return;
}

let Expr::Attribute(ExprAttribute { value, attr, .. }) = &*call_expr.func else {
return;
};

if !value
.as_name_expr()
.is_some_and(|name| matches!(name.id.as_str(), "context" | "kwargs"))
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
{
return;
}

if attr.as_str() != "get" {
return;
}

for removed_key in REMOVED_CONTEXT_KEYS {
if let Some(argument) = call_expr.arguments.find_argument_value(removed_key, 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can use 0 as the default value here. What would happen if it's a positional parameter which is expected to be at position 1 or 2? This will return incorrect argument.

I think we should also add test cases where there are multiple arguments that are deprecated in the same function intermixed with non-deprecated arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am also doing a check if let Expr::StringLiteral(ExprStringLiteral { value, .. }) = argument here as well.

I have added the test cases for multiple arguments that are deprecated in the same function intermixed with non-deprecated arguments.

Copy link
Member

@dhruvmanila dhruvmanila Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, this is for the call expression. I think I confused this with the function parameter check.

Does context.get allow keyword argument? The find_arguments_value will first check if there's a keyword with the given name (as indicated by removed_key variable in this case) and then check for positional argument. I'm assuming that context.get(conf="conf") is not allowed and only context.get("conf") is allowed which is why I think we should directly use find_positional instead.

(I've marked this as unresolved to make sure I don't forget to check this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can only do context.get("conf")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using find_positional directly.

checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: removed_key.to_string(),
replacement: Replacement::None,
},
argument.range(),
));
return;
}
}
}

/// Check whether a removed Airflow class method is called.
///
/// For example:
Expand Down Expand Up @@ -920,3 +965,55 @@ fn is_airflow_builtin_or_provider(segments: &[&str], module: &str, symbol_suffix
_ => false,
}
}

fn is_task_context_referenced(checker: &mut Checker, expr: &Expr) -> bool {
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
let parents: Vec<_> = checker.semantic().current_statements().collect();

for stmt in parents {
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
if let Stmt::FunctionDef(function_def) = stmt {
if is_decorated_with(checker, function_def) {
let arguments = extract_task_function_arguments(function_def);

for deprecated_arg in REMOVED_CONTEXT_KEYS {
if arguments.contains(&deprecated_arg.to_string()) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: deprecated_arg.to_string(),
replacement: Replacement::None,
},
expr.range(),
));
return true;
}
}
}
}
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
}

false
}

fn extract_task_function_arguments(stmt: &StmtFunctionDef) -> Vec<String> {
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
let mut arguments = Vec::new();
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

for param in &stmt.parameters.args {
arguments.push(param.parameter.name.to_string());
}

if let Some(vararg) = &stmt.parameters.kwarg {
arguments.push(format!("**{}", vararg.name));
}
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

arguments
}

fn is_decorated_with(checker: &mut Checker, stmt: &StmtFunctionDef) -> bool {
stmt.decorator_list.iter().any(|decorator| {
checker
.semantic()
.resolve_qualified_name(map_callable(&decorator.expression))
.is_some_and(|qualified_name| {
matches!(qualified_name.segments(), ["airflow", "decorators", "task"])
})
})
}
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
Loading