Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import json
import logging
import os
import re
import shutil
import subprocess
import sys
Expand All @@ -35,6 +36,9 @@
from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast

import lazy_object_proxy
from packaging.requirements import InvalidRequirement, Requirement
from packaging.specifiers import InvalidSpecifier
from packaging.version import InvalidVersion

from airflow.exceptions import (
AirflowConfigException,
Expand Down Expand Up @@ -848,10 +852,38 @@ def execute_callable(self):

def _iter_serializable_context_keys(self):
yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS
if self.system_site_packages or "apache-airflow" in self.requirements:

found_airflow = found_pendulum = False

if self.system_site_packages:
# If we're using system packages, assume both are present
found_airflow = found_pendulum = True
else:
for raw_str in self.requirements:
line = raw_str.strip()
# Skip blank lines and full‐line comments
if not line or line.startswith("#"):
continue

# Strip off any inline comment
# e.g. turn "foo==1.2.3 # comment" → "foo==1.2.3"
req_str = re.sub(r"#.*$", "", line).strip()

try:
req = Requirement(req_str)
except (InvalidRequirement, InvalidSpecifier, InvalidVersion) as e:
raise ValueError(f"Invalid requirement '{raw_str}': {e}") from e

if req.name == "apache-airflow":
found_airflow = found_pendulum = True
break
elif req.name == "pendulum":
found_pendulum = True

if found_airflow:
yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS
yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
elif "pendulum" in self.requirements:
elif found_pendulum:
yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS


Expand Down
93 changes: 93 additions & 0 deletions providers/standard/tests/unit/standard/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,99 @@ def f(

self.run_as_task(f, serializer=serializer, system_site_packages=False, requirements=None)

@pytest.mark.parametrize(
"requirements, system_site, want_airflow, want_pendulum",
[
# nothing → just base keys
([], False, False, False),
# site-packages → base keys + pendulum keys
([], True, True, True),
# apache-airflow / no version constraint
(["apache-airflow"], False, True, True),
# specific version
(["apache-airflow==2.10.2"], False, True, True),
# minimum version
(["apache-airflow>=2.10"], False, True, True),
# pendulum / no version constraint
(["pendulum"], False, False, True),
# compatible release
(["pendulum~=2.1.0"], False, False, True),
# other package
(["foo==1.0.0"], False, False, False),
# with other package
(["apache-airflow", "foo"], False, True, True),
# full-line comment only
(["# comment"], False, False, False),
# inline comment after requirement
(["apache-airflow==2.10.2 # comment"], False, True, True),
# blank line + requirement
(["", "pendulum"], False, False, True),
# indented comment + requirement
([" # comment", "pendulum~=2.1.0"], False, False, True),
],
)
def test_iter_serializable_context_keys(self, requirements, system_site, want_airflow, want_pendulum):
def func():
return "test_return_value"

op = PythonVirtualenvOperator(
task_id="task",
python_callable=func,
requirements=requirements,
system_site_packages=system_site,
)
keys = set(op._iter_serializable_context_keys())

base_keys = set(op.BASE_SERIALIZABLE_CONTEXT_KEYS)
airflow_keys = set(op.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
pendulum_keys = set(op.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)

# BASE keys always present
assert base_keys <= keys

# AIRFLOW keys only when expected
if want_airflow:
assert airflow_keys <= keys, f"expected AIRFLOW keys for requirements: {requirements}"
else:
assert not (airflow_keys & keys), f"unexpected AIRFLOW keys for requirements: {requirements}"

# PENDULUM keys only when expected
if want_pendulum:
assert pendulum_keys <= keys, f"expected PENDULUM keys for requirements: {requirements}"
else:
assert not (pendulum_keys & keys), f"unexpected PENDULUM keys for requirements: {requirements}"

@pytest.mark.parametrize(
"invalid_requirement",
[
# invalid version format
"pendulum==3..0",
# invalid operator (=< instead of <=)
"apache-airflow=<2.0",
# same invalid operator on pendulum
"pendulum=<3.0",
# totally malformed
"invalid requirement",
Comment on lines +1519 to +1526
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not account for the case when more than one are passed.

],
)
def test_iter_serializable_context_keys_invalid_requirement(self, invalid_requirement):
def func():
return "test_return_value"

op = PythonVirtualenvOperator(
task_id="task",
python_callable=func,
requirements=[invalid_requirement],
system_site_packages=False,
)

with pytest.raises(ValueError) as exc_info:
# Consume the generator to trigger parsing
list(op._iter_serializable_context_keys())

msg = str(exc_info.value)
assert f"Invalid requirement '{invalid_requirement}'" in msg


# when venv tests are run in parallel to other test they create new processes and this might take
# quite some time in shared docker environment and get some contention even between different containers
Expand Down