Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def init_on_load(self):
self.value = self.orm_deserialize_value()

def __repr__(self):
"""Return the `string` representation of the object."""
if self.map_index < 0:
return f'<XCom "{self.key}" ({self.task_id} @ {self.run_id})>'
return f'<XCom "{self.key}" ({self.task_id}[{self.map_index}] @ {self.run_id})>'
Expand Down Expand Up @@ -757,18 +758,22 @@ def build_from_xcom_query(cls, query: Query) -> LazyXComAccess:
return cls(query=query.with_entities(XCom.value))

def __repr__(self) -> str:
"""Return the `string` representation of the object."""
return f"LazyXComAccess([{len(self)} items])"

def __str__(self) -> str:
"""Convert object to a `string` type."""
return str(list(self))

def __eq__(self, other: Any) -> bool:
"""Perform equality comparison."""
if isinstance(other, (list, LazyXComAccess)):
z = itertools.zip_longest(iter(self), iter(other), fillvalue=object())
return all(x == y for x, y in z)
return NotImplemented

def __getstate__(self) -> Any:
"""Return the state and properties of the query."""
# We don't want to go to the trouble of serializing the entire Query
# object, including its filters, hints, etc. (plus SQLAlchemy does not
# provide a public API to inspect a query's contents). Converting the
Expand All @@ -785,19 +790,23 @@ def __getstate__(self) -> Any:
return (str(statement), query.count())

def __setstate__(self, state: Any) -> None:
"""Set the state of the query."""
statement, self._len = state
self._query = Query(XCom.value).from_statement(text(statement))

def __len__(self):
"""Get the length of the object."""
if self._len is None:
with self._get_bound_query() as query:
self._len = query.count()
return self._len

def __iter__(self):
"""Iterate over an object."""
return _LazyXComAccessIterator(self._get_bound_query())

def __getitem__(self, key):
"""Get an item using its `key`."""
if not isinstance(key, int):
raise ValueError("only support index access for now")
try:
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,6 @@ combine-as-imports = true
"airflow/models/tasklog.py" = ["D105"]
"airflow/models/taskmixin.py" = ["D105"]
"airflow/models/variable.py" = ["D105"]
"airflow/models/xcom.py" = ["D105"]
"airflow/models/xcom_arg.py" = ["D105"]
"airflow/plugins_manager.py" = ["D105"]
"airflow/providers_manager.py" = ["D105"]
Expand Down