Skip to content

Commit 369727a

Browse files
Fix XCom PATCH/POST to store native values (instead of json.dumps) (#64220) (#67116)
(cherry picked from commit acdd9da) Co-authored-by: Henry Chen <henryhenry0512@gmail.com>
1 parent 9d80770 commit 369727a

2 files changed

Lines changed: 55 additions & 24 deletions

File tree

  • airflow-core
    • src/airflow/api_fastapi/core_api/routes/public
    • tests/unit/api_fastapi/core_api/routes/public

airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from __future__ import annotations
1818

1919
import copy
20-
import json
2120
from typing import Annotated
2221

2322
from fastapi import Depends, HTTPException, Query, status
@@ -291,27 +290,24 @@ def create_xcom_entry(
291290
)
292291

293292
try:
294-
value = json.dumps(request_body.value)
295-
except (ValueError, TypeError):
293+
XComModel.set(
294+
key=request_body.key,
295+
value=request_body.value,
296+
dag_id=dag_id,
297+
task_id=task_id,
298+
run_id=dag_run_id,
299+
map_index=request_body.map_index,
300+
serialize=False,
301+
session=session,
302+
)
303+
except (ValueError, TypeError) as e:
296304
raise HTTPException(
297305
status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with key: `{request_body.key}`"
298-
)
299-
300-
new = XComModel(
301-
dag_run_id=dag_run.id,
302-
key=request_body.key,
303-
value=value,
304-
run_id=dag_run_id,
305-
task_id=task_id,
306-
dag_id=dag_id,
307-
map_index=request_body.map_index,
308-
)
309-
session.add(new)
310-
session.flush()
306+
) from e
311307

312308
xcom = session.scalar(
313309
select(XComModel)
314-
.filter(
310+
.where(
315311
XComModel.dag_id == dag_id,
316312
XComModel.task_id == task_id,
317313
XComModel.run_id == dag_run_id,
@@ -345,11 +341,12 @@ def update_xcom_entry(
345341
dag_run_id: str,
346342
xcom_key: str,
347343
patch_body: XComUpdateBody,
344+
*,
348345
session: SessionDep,
349346
) -> XComResponseNative:
350347
"""Update an existing XCom entry."""
351348
# Check if XCom entry exists
352-
xcom_entry = session.scalar(
349+
xcom_query = (
353350
select(XComModel)
354351
.where(
355352
XComModel.dag_id == dag_id,
@@ -361,16 +358,32 @@ def update_xcom_entry(
361358
.limit(1)
362359
.options(joinedload(XComModel.task), joinedload(XComModel.dag_run).joinedload(DR.dag_model))
363360
)
361+
xcom_entry = session.scalar(xcom_query)
364362

365363
if not xcom_entry:
366364
raise HTTPException(
367365
status.HTTP_404_NOT_FOUND,
368366
f"The XCom with key: `{xcom_key}` with mentioned task instance doesn't exist.",
369367
)
370368

371-
# Update XCom entry
372-
xcom_entry.value = json.dumps(patch_body.value)
369+
try:
370+
XComModel.set(
371+
key=xcom_key,
372+
value=patch_body.value,
373+
dag_id=dag_id,
374+
task_id=task_id,
375+
run_id=dag_run_id,
376+
map_index=patch_body.map_index,
377+
serialize=False,
378+
session=session,
379+
)
380+
except (ValueError, TypeError) as e:
381+
raise HTTPException(
382+
status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with key: `{xcom_key}`"
383+
) from e
373384

385+
# Fetch after setting, to get fresh object for response
386+
xcom_entry = session.scalar(xcom_query)
374387
return XComResponseNative.model_validate(xcom_entry)
375388

376389

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ def test_create_xcom_entry(
680680
# Validate the created XCom response
681681
current_data = response.json()
682682
assert current_data["key"] == request_body.key
683-
assert current_data["value"] == XComModel.serialize_value(request_body.value)
683+
assert current_data["value"] == request_body.value
684684
assert current_data["dag_id"] == dag_id
685685
assert current_data["task_id"] == task_id
686686
assert current_data["run_id"] == dag_run_id
@@ -716,7 +716,7 @@ def test_create_xcom_entry_with_slash_key(self, test_client):
716716
)
717717
assert get_resp.status_code == 200
718718
assert get_resp.json()["key"] == slash_key
719-
assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE)
719+
assert get_resp.json()["value"] == TEST_XCOM_VALUE
720720

721721
@pytest.mark.parametrize(
722722
("key", "value"),
@@ -833,7 +833,7 @@ def test_patch_xcom_entry(self, key, patch_body, expected_status, expected_detai
833833
assert response.status_code == expected_status
834834

835835
if expected_status == 200:
836-
assert response.json()["value"] == json.dumps(patch_body["value"])
836+
assert response.json()["value"] == patch_body["value"]
837837
else:
838838
assert response.json()["detail"] == expected_detail
839839
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None)
@@ -862,7 +862,25 @@ def test_patch_xcom_entry_with_slash_key(self, test_client, session):
862862
)
863863
assert response.status_code == 200
864864
assert response.json()["key"] == slash_key
865-
assert response.json()["value"] == json.dumps(new_value)
865+
assert response.json()["value"] == new_value
866+
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None)
867+
868+
def test_patch_xcom_preserves_int_type(self, test_client, session):
869+
"""Test scenario described in #59032: if existing XCom value type is int,
870+
after patching with different value, it should still be int in the API response.
871+
"""
872+
key = "int_type_xcom"
873+
# Create with int value
874+
self._create_xcom(key, 42)
875+
patch_value = 100
876+
response = test_client.patch(
877+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{key}",
878+
json={"value": patch_value},
879+
)
880+
assert response.status_code == 200
881+
data = response.json()
882+
assert data["value"] == patch_value
883+
assert isinstance(data["value"], int), f"Expected int type but got {type(data['value'])}"
866884
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None)
867885

868886
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)