Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions airflow-core/src/airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,13 @@ def deserialize(cls, encoded_var: Any) -> Any:
elif type_ == DAT.DATETIME:
return from_timestamp(var)
elif type_ == DAT.POD:
if not _has_kubernetes():
raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!")
# Attempt to import kubernetes for deserialization. Using attempt_import=True allows
# lazy loading of kubernetes libraries only when actually needed for POD deserialization.
if not _has_kubernetes(attempt_import=True):
raise RuntimeError(
"Cannot deserialize POD objects without kubernetes libraries. "
"Please install the cncf.kubernetes provider."
)
pod = PodGenerator.deserialize_model_dict(var)
return pod
elif type_ == DAT.TIMEDELTA:
Expand Down Expand Up @@ -3799,13 +3804,20 @@ class SerializedAssetWatcher(AssetWatcher):
trigger: dict


def _has_kubernetes() -> bool:
def _has_kubernetes(attempt_import: bool = False) -> bool:
"""
Check if kubernetes libraries are available.

:param attempt_import: If true, attempt to import kubernetes libraries if not already loaded. If
False, only check if already in sys.modules (avoids expensive import).
:return: True if kubernetes libraries are available, False otherwise.
"""
global HAS_KUBERNETES
if "HAS_KUBERNETES" in globals():
return HAS_KUBERNETES

# Check if kubernetes is already imported before triggering expensive import
if "kubernetes.client" not in sys.modules:
if "kubernetes.client" not in sys.modules and not attempt_import:
HAS_KUBERNETES = False
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,7 @@ def execute_complete(self):


def test_kubernetes_optional():
"""Serialisation / deserialisation continues to work without kubernetes installed"""
"""Test that serialization module loads without kubernetes, but deserialization of PODs requires it"""

def mock__import__(name, globals_=None, locals_=None, fromlist=(), level=0):
if level == 0 and name.partition(".")[0] == "kubernetes":
Expand All @@ -2558,7 +2558,8 @@ def mock__import__(name, globals_=None, locals_=None, fromlist=(), level=0):
"__var": PodGenerator.serialize_pod(executor_config_pod),
}

with pytest.raises(RuntimeError):
# we should error if attempting to deserialize POD without kubernetes installed
with pytest.raises(RuntimeError, match="Cannot deserialize POD objects without kubernetes"):
module.BaseSerialization.from_dict(pod_override)

# basic serialization should succeed
Expand Down
Loading