-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
Description
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
The error has not been present in version apache-airflow-providers-databricks==4.7.0
I upgraded to the latest and it is presentapache-airflow-providers-databricks==6.0.0
Apache Airflow version
2.8.0
Operating System
Debian GNU/Linux 11 (bullseye)
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
e9b5cf0d8cb8
*** Found local files:
*** * /opt/airflow/logs/dag_id=dag_id/run_id=manual__2024-01-17T11:39:05+01:00/task_id=read/attempt=1.log
[2024-01-17, 11:39:11 CET] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_id.read manual__2024-01-17T11:39:05+01:00 [queued]>
[2024-01-17, 11:39:11 CET] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_id.read manual__2024-01-17T11:39:05+01:00 [queued]>
[2024-01-17, 11:39:11 CET] {taskinstance.py:2171} INFO - Starting attempt 1 of 1
[2024-01-17, 11:39:11 CET] {taskinstance.py:2192} INFO - Executing <Task(DatabricksSqlOperator): read> on 2024-01-17 10:39:05+00:00
[2024-01-17, 11:39:11 CET] {standard_task_runner.py:60} INFO - Started process 3561 to run task
[2024-01-17, 11:39:11 CET] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'dag_id', 'read', 'manual__2024-01-17T11:39:05+01:00', '--job-id', '23', '--raw', '--subdir', 'DAGS_FOLDER/file_uploads/dag-wn-equipment.py', '--cfg-path', '/tmp/tmp49oxl6yk']
[2024-01-17, 11:39:11 CET] {standard_task_runner.py:88} INFO - Job 23: Subtask read
[2024-01-17, 11:39:11 CET] {task_command.py:423} INFO - Running <TaskInstance: dag_id.read manual__2024-01-17T11:39:05+01:00 [running]> on host e9b5cf0d8cb8
[2024-01-17, 11:39:11 CET] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='team_analytics_foundation' AIRFLOW_CTX_DAG_ID='dag_id' AIRFLOW_CTX_TASK_ID='read' AIRFLOW_CTX_EXECUTION_DATE='2024-01-17T10:39:05+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-17T11:39:05+01:00'
[2024-01-17, 11:39:11 CET] {sql.py:276} INFO - Executing: SELECT * FROM catalog.schema.test_table LIMIT 10;
[2024-01-17, 11:39:11 CET] {base.py:83} INFO - Using connection ID 'tu-databricks-sp' for task execution.
[2024-01-17, 11:39:11 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-17, 11:39:11 CET] {databricks_base.py:223} INFO - Existing Service Principal token is expired, or going to expire soon. Refreshing...
[2024-01-17, 11:39:12 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-17, 11:39:12 CET] {client.py:200} INFO - Successfully opened session 01eeb524-XXXX-1b54-9b2e-b16859209198
[2024-01-17, 11:39:12 CET] {sql.py:450} INFO - Running statement: SELECT * FROM catalog.schema.test_table LIMIT 10, parameters: None
[2024-01-17, 11:39:12 CET] {client.py:258} INFO - Closing session 01eeb524-XXX-1b54-9b2e-b16859209198
[2024-01-17, 11:39:12 CET] {xcom.py:664} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.
[2024-01-17, 11:39:12 CET] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 147, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in <listcomp>
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in <listcomp>
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 180, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class 'airflow.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 440, in _execute_task
task_instance.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2981, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 247, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 662, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.10/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type tuple is not JSON serializable
[2024-01-17, 11:39:12 CET] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=dag_id, task_id=read, execution_date=20240117T103905, start_date=20240117T103911, end_date=20240117T103912
[2024-01-17, 11:39:12 CET] {standard_task_runner.py:107} ERROR - Failed to execute job 23 for task read (Object of type tuple is not JSON serializable; 3561)
[2024-01-17, 11:39:12 CET] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-01-17, 11:39:12 CET] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check
I did a little of investigation and edited hooks/databricks_sql.py file and added prints in the _make_common_data_structure method:
result variable value:
[Row(currency_code='AUD', date_eom=datetime.date(2022, 11, 30), exchange_rate_avg=4.813328450816598, exchange_rate_closing=4.821134521880065, currency_date_code='AUD_2022-11-30'), Row(currency_code='BRL', date_eom=datetime.date(2018, 5, 31), exchange_rate_avg=1.7359416398794039, exchange_rate_closing=1.706230229679549, currency_date_code='BRL_2018-05-31'), Row(currency_code='CHF', date_eom=datetime.date(2018, 11, 30), exchange_rate_avg=6.558196767265858, exchange_rate_closing=6.580423280423281, currency_date_code='CHF_2018-11-30'), Row(currency_code='CHF', date_eom=datetime.date(2021, 2, 28), exchange_rate_avg=6.849691568142963, exchange_rate_closing=6.768705625341343, currency_date_code='CHF_2021-02-28'), Row(currency_code='CNY', date_eom=datetime.date(2008, 8, 31), exchange_rate_avg=0.7273119209984404, exchange_rate_closing=0.7405127390432313, currency_date_code='CNY_2008-08-31'), Row(currency_code='CYP', date_eom=datetime.date(2001, 1, 31), exchange_rate_avg=12.92422064597834, exchange_rate_closing=12.891303972216942, currency_date_code='CYP_2001-01-31'), Row(currency_code='CYP', date_eom=datetime.date(2001, 10, 31), exchange_rate_avg=12.94928151413741, exchange_rate_closing=12.961950370047887, currency_date_code='CYP_2001-10-31'), Row(currency_code='CZK', date_eom=datetime.date(2010, 6, 30), exchange_rate_avg=0.28863360451335074, exchange_rate_closing=0.289938110622397, currency_date_code='CZK_2010-06-30'), Row(currency_code='CZK', date_eom=datetime.date(2010, 12, 31), exchange_rate_avg=0.2960584828246455, exchange_rate_closing=0.29741430908583055, currency_date_code='CZK_2010-12-31'), Row(currency_code='DKK', date_eom=datetime.date(2017, 5, 31), exchange_rate_avg=1.0, exchange_rate_closing=1.0, currency_date_code='DKK_2017-05-31')]
rows_fields var value:
row_fields <Row('currency_code', 'date_eom', 'exchange_rate_avg', 'exchange_rate_closing', 'currency_date_code')>
What you think should happen instead
No response
How to reproduce
test2 = DatabricksSqlOperator(
task_id="read",
databricks_conn_id="tu-databricks-sp",
sql_endpoint_name="sql_endpoint_name",
sql=f"""SELECT * FROM catalog.schema.test_table LIMIT 10;"""
)
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
w0ut0