Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,10 +685,8 @@ def _deserialize_value(result: XCom, orm: bool) -> Any:
except pickle.UnpicklingError:
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
else:
try:
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
except (json.JSONDecodeError, UnicodeDecodeError):
return pickle.loads(result.value)
# Since xcom_pickling is disabled, we should only try to deserialize with JSON
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)

@staticmethod
def deserialize_value(result: XCom) -> Any:
Expand Down
3 changes: 3 additions & 0 deletions tests/api_connexion/schemas/test_xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airflow.models import DagRun, XCom
from airflow.utils.dates import parse_execution_date
from airflow.utils.session import create_session
from tests.test_utils.config import conf_vars

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -188,6 +189,7 @@ class TestXComSchema:
default_time = "2016-04-02T21:00:00+00:00"
default_time_parsed = parse_execution_date(default_time)

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_serialize(self, create_xcom, session):
create_xcom(
dag_id="test_dag",
Expand All @@ -208,6 +210,7 @@ def test_serialize(self, create_xcom, session):
"map_index": -1,
}

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_deserialize(self):
xcom_dump = {
"key": "test_key",
Expand Down
18 changes: 9 additions & 9 deletions tests/models/test_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def test_xcom_deserialize_with_json_to_pickle_switch(self, task_instance, sessio
ret_value = XCom.get_value(key="xcom_test3", ti_key=ti_key, session=session)
assert ret_value == {"key": "value"}

def test_xcom_deserialize_with_pickle_to_json_switch(self, task_instance, session):
def test_xcom_deserialize_pickle_when_xcom_pickling_is_disabled(self, task_instance, session):
with conf_vars({("core", "enable_xcom_pickling"): "True"}):
XCom.set(
key="xcom_test3",
Expand All @@ -151,14 +151,14 @@ def test_xcom_deserialize_with_pickle_to_json_switch(self, task_instance, sessio
session=session,
)
with conf_vars({("core", "enable_xcom_pickling"): "False"}):
ret_value = XCom.get_one(
key="xcom_test3",
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
run_id=task_instance.run_id,
session=session,
)
assert ret_value == {"key": "value"}
with pytest.raises(UnicodeDecodeError):
XCom.get_one(
key="xcom_test3",
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
run_id=task_instance.run_id,
session=session,
)

@conf_vars({("core", "xcom_enable_pickling"): "False"})
def test_xcom_disable_pickle_type_fail_on_non_json(self, task_instance, session):
Expand Down