Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,15 @@ def _get_parent_run_facet(task_instance):

def _run_api_call(hook: DatabricksSqlHook | DatabricksHook, query_ids: list[str]) -> list[dict]:
"""Retrieve execution details for specific queries from Databricks's query history API."""
try:
token = hook._get_token(raise_error=True)
# https://docs.databricks.com/api/azure/workspace/queryhistory/list
response = requests.get(
url=f"https://{hook.host}/api/2.0/sql/history/queries",
headers={"Authorization": f"Bearer {token}"},
data=json.dumps({"filter_by": {"statement_ids": query_ids}}),
timeout=2,
)
response.raise_for_status()
except Exception as e:
log.warning(
"OpenLineage could not retrieve Databricks queries details. Error received: `%s`.",
e,
)
return []
token = hook._get_token(raise_error=True)
# https://docs.databricks.com/api/azure/workspace/queryhistory/list
response = requests.get(
url=f"https://{hook.host}/api/2.0/sql/history/queries",
headers={"Authorization": f"Bearer {token}"},
data=json.dumps({"filter_by": {"statement_ids": query_ids}}),
timeout=3,
)
response.raise_for_status()

return response.json()["res"]

Expand Down Expand Up @@ -176,7 +169,11 @@ def _get_queries_details_from_databricks(
if query_info["query_id"]
}
except Exception as e:
log.warning("OpenLineage could not retrieve extra metadata from Databricks. Error encountered: %s", e)
log.info(
"OpenLineage encountered an error while retrieving additional metadata about SQL queries"
" from Databricks. The process will continue with default values. Error details: %s",
e,
)

return query_details

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ def test_run_api_call_request_error():
mock_response.status_code = 200

with mock.patch("requests.get", side_effect=RuntimeError("request error")):
result = _run_api_call(mock_hook, ["123"])

assert result == []
with pytest.raises(RuntimeError):
_run_api_call(mock_hook, ["123"])


def test_run_api_call_token_error():
Expand All @@ -135,9 +134,8 @@ def test_run_api_call_token_error():
mock_response.status_code = 200

with mock.patch("requests.get", return_value=mock_response):
result = _run_api_call(mock_hook, ["123"])

assert result == []
with pytest.raises(RuntimeError):
_run_api_call(mock_hook, ["123"])


def test_process_data_from_api():
Expand Down Expand Up @@ -194,6 +192,18 @@ def test_get_queries_details_from_databricks_empty_query_ids():
assert details == {}


@mock.patch("airflow.providers.databricks.utils.openlineage._run_api_call")
def test_get_queries_details_from_databricks_error(mock_api_call):
mock_api_call.side_effect = RuntimeError("Token error")

hook = DatabricksSqlHook()
query_ids = ["ABC"]

details = _get_queries_details_from_databricks(hook, query_ids)
mock_api_call.assert_called_once_with(hook=hook, query_ids=query_ids)
assert details == {}


@mock.patch("airflow.providers.databricks.utils.openlineage._run_api_call")
def test_get_queries_details_from_databricks(mock_api_call):
hook = DatabricksSqlHook()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def _run_single_query_with_hook(hook: SnowflakeHook, sql: str) -> list[dict]:
with closing(hook.get_conn()) as conn:
hook.set_autocommit(conn, False)
with hook._get_cursor(conn, return_dictionaries=True) as cur:
cur.execute("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3;") # only for this session
cur.execute(sql)
result = cur.fetchall()
conn.commit()
Expand Down Expand Up @@ -232,11 +233,16 @@ def _get_queries_details_from_snowflake(
if not query_ids:
return {}
query_condition = f"IN {tuple(query_ids)}" if len(query_ids) > 1 else f"= '{query_ids[0]}'"
# https://docs.snowflake.com/en/sql-reference/account-usage#differences-between-account-usage-and-information-schema
# INFORMATION_SCHEMA.QUERY_HISTORY has no latency, so it's better than ACCOUNT_USAGE.QUERY_HISTORY
# https://docs.snowflake.com/en/sql-reference/functions/query_history
# SNOWFLAKE.INFORMATION_SCHEMA.QUERY_HISTORY() function seems the most suitable function for the job,
# we get history of queries executed by the user, and we're using the same credentials.
query = (
"SELECT "
"QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM "
"table(information_schema.query_history()) "
"table(snowflake.information_schema.query_history()) "
f"WHERE "
f"QUERY_ID {query_condition}"
f";"
Expand All @@ -250,7 +256,11 @@ def _get_queries_details_from_snowflake(
else:
result = _run_single_query_with_hook(hook=hook, sql=query)
except Exception as e:
log.warning("OpenLineage could not retrieve extra metadata from Snowflake. Error encountered: %s", e)
log.info(
"OpenLineage encountered an error while retrieving additional metadata about SQL queries"
" from Snowflake. The process will continue with default values. Error details: %s",
e,
)
result = []

return {row["QUERY_ID"]: row for row in result} if result else {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ def test_run_single_query_with_hook(mock_get_cursor, mock_set_autocommit, mock_g
sql_query = "SELECT * FROM test_table;"
result = _run_single_query_with_hook(hook, sql_query)

mock_cursor.execute.assert_called_once_with(sql_query)
mock_cursor.execute.assert_has_calls(
[mock.call("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3;"), mock.call(sql_query)]
)
assert result == [{"col1": "value1"}, {"col2": "value2"}]


Expand Down Expand Up @@ -302,7 +304,7 @@ def test_get_queries_details_from_snowflake_single_query(mock_run_single_query):
details = _get_queries_details_from_snowflake(hook, query_ids)
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
"WHERE QUERY_ID = 'ABC';"
)
mock_run_single_query.assert_called_once_with(hook=hook, sql=expected_query)
Expand Down Expand Up @@ -330,7 +332,7 @@ def test_get_queries_details_from_snowflake_single_query_api_hook(mock_run_singl

expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
"WHERE QUERY_ID = 'ABC';"
)
expected_details = {
Expand Down Expand Up @@ -377,7 +379,7 @@ def test_get_queries_details_from_snowflake_multiple_queries(mock_run_single_que
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query.assert_called_once_with(hook=hook, sql=expected_query)
Expand Down Expand Up @@ -415,7 +417,7 @@ def test_get_queries_details_from_snowflake_multiple_queries_api_hook(mock_run_s
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
expected_details = [
Expand Down Expand Up @@ -453,7 +455,7 @@ def test_get_queries_details_from_snowflake_no_data_found(mock_run_single_query)
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query.assert_called_once_with(hook=hook, sql=expected_query)
Expand All @@ -471,7 +473,7 @@ def test_get_queries_details_from_snowflake_no_data_found_api_hook(mock_run_sing
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query_api.assert_called_once_with(hook=hook, sql=expected_query)
Expand All @@ -489,7 +491,7 @@ def test_get_queries_details_from_snowflake_error(mock_run_single_query):
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query.assert_called_once_with(hook=hook, sql=expected_query)
Expand All @@ -507,7 +509,7 @@ def test_get_queries_details_from_snowflake_error_api_hook(mock_run_single_query
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query_api.assert_called_once_with(hook=hook, sql=expected_query)
Expand All @@ -529,7 +531,7 @@ def test_get_queries_details_from_snowflake_error_api_hook_process_data(
expected_query_condition = f"IN {tuple(query_ids)}"
expected_query = (
"SELECT QUERY_ID, EXECUTION_STATUS, START_TIME, END_TIME, QUERY_TEXT, ERROR_CODE, ERROR_MESSAGE "
"FROM table(information_schema.query_history()) "
"FROM table(snowflake.information_schema.query_history()) "
f"WHERE QUERY_ID {expected_query_condition};"
)
mock_run_single_query_api.assert_called_once_with(hook=hook, sql=expected_query)
Expand Down