Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def get_xcom_entry(
stub.value = XCom.deserialize_value(stub)
item = stub

if stringify:
if stringify or conf.getboolean("core", "enable_xcom_pickling"):
return xcom_schema_string.dump(item)

return xcom_schema_native.dump(item)
2 changes: 2 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,8 @@ paths:
If set to true (default) the Any value will be returned as string, e.g. a Python representation
of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored.

This parameter is not meaningful when using XCom pickling, then it is always returned as string.

*New in version 2.10.0*
responses:
"200":
Expand Down
30 changes: 30 additions & 0 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,36 @@ def test_should_respond_200_native(self):
"value": {"key": "value"},
}

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_should_respond_200_native_for_pickled(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}
self._create_xcom_entry(
dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": value_non_serializable_key}
)
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
environ_overrides={"REMOTE_USER": "test"},
)
assert 200 == response.status_code

current_data = response.json
current_data["timestamp"] = "TIMESTAMP"
assert current_data == {
"dag_id": dag_id,
"execution_date": execution_date,
"key": xcom_key,
"task_id": task_id,
"map_index": -1,
"timestamp": "TIMESTAMP",
"value": f"{{'key': {str(value_non_serializable_key)}}}",
}

def test_should_raise_404_for_non_existent_xcom(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
Expand Down