Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _cast_basic_types(value):
return value.isoformat()
if isinstance(value, datetime.timedelta):
return f"{value.total_seconds()} seconds"
if isinstance(value, (set, list, tuple)):
if isinstance(value, (set, tuple)):
return str(list(value))
return value

Expand Down Expand Up @@ -214,6 +214,12 @@ class TaskInstanceInfo(InfoJsonEncodable):
}


class DatasetInfo(InfoJsonEncodable):
"""Defines encoding Airflow Dataset object to JSON."""

includes = ["uri", "extra"]


class TaskInfo(InfoJsonEncodable):
"""Defines encoding BaseOperator/AbstractOperator object to JSON."""

Expand Down Expand Up @@ -242,6 +248,9 @@ class TaskInfo(InfoJsonEncodable):
"run_as_user",
"sla",
"task_id",
"trigger_dag_id",
"external_dag_id",
"external_task_id",
"trigger_rule",
"upstream_task_ids",
"wait_for_downstream",
Expand All @@ -255,6 +264,8 @@ class TaskInfo(InfoJsonEncodable):
if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None)
else None
),
"inlets": lambda task: [DatasetInfo(inlet) for inlet in task.inlets],
"outlets": lambda task: [DatasetInfo(outlet) for outlet in task.outlets],
}


Expand Down
26 changes: 26 additions & 0 deletions tests/providers/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ class Test:
}


def test_info_json_encodable_list_does_not_flatten():
class TestInfo(InfoJsonEncodable):
includes = ["alist"]

@define(slots=False)
class Test:
alist: list[str]

obj = Test(["a", "b", "c"])

assert json.loads(json.dumps(TestInfo(obj))) == {"alist": ["a", "b", "c"]}


def test_info_json_encodable_list_does_include_nonexisting():
class TestInfo(InfoJsonEncodable):
includes = ["exists", "doesnotexist"]

@define(slots=False)
class Test:
exists: str

obj = Test("something")

assert json.loads(json.dumps(TestInfo(obj))) == {"exists": "something"}


def test_is_name_redactable():
class NotMixin:
def __init__(self):
Expand Down