Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,73 @@ class UtcAwareFilterConverter(fab_sqlafilters.SQLAFilterConverter):
"""Retrieve conversion tables for UTC-Aware filters."""


class XComFilterStartsWith(fab_sqlafilters.FilterStartsWith):
"""Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(field.cast(types.String).ilike(value + "%"))


class XComFilterEndsWith(fab_sqlafilters.FilterEndsWith):
"""Ends With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(field.cast(types.String).ilike("%" + value))


class XComFilterEqual(fab_sqlafilters.FilterEqual):
"""Equality filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
value = set_value_to_type(self.datamodel, self.column_name, value)
return query.filter(field.cast(types.String) == value)


class XComFilterContains(fab_sqlafilters.FilterContains):
"""Not Equal To filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(field.cast(types.String).ilike("%" + value + "%"))


class XComFilterNotStartsWith(fab_sqlafilters.FilterNotStartsWith):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(~field.cast(types.String).ilike(value + "%"))


class XComFilterNotEndsWith(fab_sqlafilters.FilterNotEndsWith):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(~field.cast(types.String).ilike(value + "%"))


class XComFilterNotContains(fab_sqlafilters.FilterNotContains):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(~field.cast(types.String).ilike("%" + value + "%"))


class XComFilterNotEqual(fab_sqlafilters.FilterNotEqual):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
value = set_value_to_type(self.datamodel, self.column_name, value)

return query.filter(field.cast(types.String) != value)


class AirflowFilterConverter(fab_sqlafilters.SQLAFilterConverter):
"""Retrieve conversion tables for Airflow-specific filters."""

Expand All @@ -800,6 +867,19 @@ class AirflowFilterConverter(fab_sqlafilters.SQLAFilterConverter):
"is_extendedjson",
[],
),
(
"is_xcom_value",
[
XComFilterStartsWith,
XComFilterEndsWith,
XComFilterEqual,
XComFilterContains,
XComFilterNotStartsWith,
XComFilterNotEndsWith,
XComFilterNotContains,
XComFilterNotEqual,
],
),
*fab_sqlafilters.SQLAFilterConverter.conversion_table,
)

Expand Down Expand Up @@ -864,6 +944,10 @@ def is_extendedjson(self, col_name):
)
return False

def is_xcom_value(self, col_name):
"""Check if it is col_name is value of xcom table."""
return col_name == "value" and self.obj.__tablename__ == "xcom"

def get_col_default(self, col_name: str) -> Any:
if col_name not in self.list_columns:
# Handle AssociationProxy etc, or anything that isn't a "real" column
Expand Down