Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1d3dc64
refactor: Override the get_first and get_records method with Oracle e…
dabla Jan 27, 2026
b84a8da
Merge branch 'main' into feature/oracle-hook-resolve-oracledb-types-h…
dabla Jan 27, 2026
9492135
refactor: Added unit test for get_first and get_records method
dabla Jan 27, 2026
49730a4
Merge branch 'main' into feature/oracle-hook-resolve-oracledb-types-h…
dabla Jan 27, 2026
5bc5c17
refactor: Reformatted get_first method in OracleHook
dabla Jan 28, 2026
37b8df9
refactor: Fixed test_test_connection_use_dual_table
dabla Jan 28, 2026
afab71b
Merge branch 'main' into feature/oracle-hook-resolve-oracledb-types-h…
dabla Jan 28, 2026
c14f705
Update providers/oracle/src/airflow/providers/oracle/hooks/oracle.py
dabla Jan 28, 2026
0263e5a
refactor: Moved Oracle handlers in separate module like in common.sql
dabla Jan 28, 2026
7178b8b
refactor: Fixed return type of fetch_one_handler in common.sql provider
dabla Jan 28, 2026
49a4e11
Merge branch 'main' into feature/oracle-hook-resolve-oracledb-types-h…
dabla Jan 28, 2026
ce05d62
refactor: Fixed return type of fetch_one_handler in common.sql provider
dabla Jan 29, 2026
d6f6deb
refactor: Added handlers in python-module of Oracle hook
dabla Jan 29, 2026
b83516b
refactor: Added handlers test for Oracle provider
dabla Jan 29, 2026
4cca406
refactor: Updated hooks in provider info
dabla Jan 30, 2026
da5f0a0
refactor: Reformatted test files
dabla Jan 30, 2026
ef3c2e2
refactor: Updated test_fetch_one_handler as fetchone method is suppos…
dabla Jan 30, 2026
46e2b16
Merge branch 'main' into feature/oracle-hook-resolve-oracledb-types-h…
dabla Jan 30, 2026
aa28f49
Merge branch 'main' into feature/oracle-hook-resolve-oracledb-types-h…
dabla Jan 30, 2026
d2fe038
Merge branch 'main' into feature/oracle-hook-resolve-oracledb-types-h…
dabla Jan 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def fetch_all_handler(cursor) -> list[tuple] | None:
return None


def fetch_one_handler(cursor) -> list[tuple] | None:
def fetch_one_handler(cursor) -> tuple | None:
"""Return first result for DbApiHook.run()."""
if not hasattr(cursor, "description"):
raise RuntimeError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def return_single_query_results(
sql: str | Iterable[str], return_last: bool, split_statements: bool | None
): ...
def fetch_all_handler(cursor) -> list[tuple] | None: ...
def fetch_one_handler(cursor) -> list[tuple] | None: ...
def fetch_one_handler(cursor) -> tuple | None: ...
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def fetch_all_handler(cursor) -> list[tuple] | None:
return handlers.fetch_all_handler(cursor)


def fetch_one_handler(cursor) -> list[tuple] | None:
def fetch_one_handler(cursor) -> tuple | None:
warnings.warn(WARNING_MESSAGE.format("fetch_one_handler"), DeprecationWarning, stacklevel=2)

return handlers.fetch_one_handler(cursor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def return_single_query_results(
sql: str | Iterable[str], return_last: bool, split_statements: bool | None
): ...
def fetch_all_handler(cursor) -> list[tuple] | None: ...
def fetch_one_handler(cursor) -> list[tuple] | None: ...
def fetch_one_handler(cursor) -> tuple | None: ...
def resolve_dialects() -> MutableMapping[str, MutableMapping]: ...

class ConnectorProtocol(Protocol):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ def test_fetch_all_handler(self):
def test_fetch_one_handler(self):
cursor = MagicMock()
cursor.description = [("col1", "int")]
cursor.fetchone.return_value = 1
cursor.fetchone.return_value = (1,)

assert fetch_one_handler(cursor) == (1)
assert fetch_one_handler(cursor) == (1,)

cursor.description = None
assert fetch_one_handler(cursor) is None
1 change: 1 addition & 0 deletions providers/oracle/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ operators:
hooks:
- integration-name: Oracle
python-modules:
- airflow.providers.oracle.hooks.handlers
- airflow.providers.oracle.hooks.oracle

transfers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ def get_provider_info():
{"integration-name": "Oracle", "python-modules": ["airflow.providers.oracle.operators.oracle"]}
],
"hooks": [
{"integration-name": "Oracle", "python-modules": ["airflow.providers.oracle.hooks.oracle"]}
{
"integration-name": "Oracle",
"python-modules": [
"airflow.providers.oracle.hooks.handlers",
"airflow.providers.oracle.hooks.oracle",
],
}
],
"transfers": [
{
Expand Down
57 changes: 57 additions & 0 deletions providers/oracle/src/airflow/providers/oracle/hooks/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import oracledb


def _read_lob(val):
if isinstance(val, oracledb.LOB):
return val.read()
return val


def _read_lobs(row):
if row is not None:
return tuple([_read_lob(value) for value in row])
return row


def fetch_all_handler(cursor) -> list[tuple] | None:
"""Return results for DbApiHook.run(). If oracledb.LOB objects are present, then those will be read."""
if not hasattr(cursor, "description"):
raise RuntimeError(
"The database we interact with does not support DBAPI 2.0. Use operator and "
"handlers that are specifically designed for your database."
)
if cursor.description is not None:
results = [_read_lobs(row) for row in cursor.fetchall()]
return results
return None


def fetch_one_handler(cursor) -> tuple | None:
"""Return first result for DbApiHook.run(). If oracledb.LOB objects are present, then those will be read."""
if not hasattr(cursor, "description"):
raise RuntimeError(
"The database we interact with does not support DBAPI 2.0. Use operator and "
"handlers that are specifically designed for your database."
)
if cursor.description is not None:
return _read_lobs(cursor.fetchone())
return None
24 changes: 24 additions & 0 deletions providers/oracle/src/airflow/providers/oracle/hooks/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import math
import warnings
from collections.abc import Iterable, Mapping
from datetime import datetime
from typing import TYPE_CHECKING, Any

Expand All @@ -29,6 +30,7 @@
from airflow.providers.openlineage.sqlparser import DatabaseInfo

from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.oracle.hooks import handlers

DEFAULT_DB_PORT = 1521
PARAM_TYPES = {bool, float, int, str}
Expand Down Expand Up @@ -288,6 +290,28 @@ def get_conn(self) -> oracledb.Connection:

return oracle_conn

def get_records(
self,
sql: str | list[str],
parameters: Iterable | Mapping[str, Any] | None = None,
) -> Any:
"""
Execute the sql and return a set of records.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
"""
return self.run(sql=sql, parameters=parameters, handler=handlers.fetch_all_handler)

def get_first(self, sql: str | list[str], parameters: Iterable | Mapping[str, Any] | None = None) -> Any:
"""
Execute the sql and return the first resulting row.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
"""
return self.run(sql=sql, parameters=parameters, handler=handlers.fetch_one_handler)

def insert_rows(
self,
table: str,
Expand Down
49 changes: 49 additions & 0 deletions providers/oracle/tests/unit/oracle/hooks/test_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest.mock import MagicMock

from airflow.providers.oracle.hooks.handlers import (
fetch_all_handler,
fetch_one_handler,
)

from unit.oracle.test_utils import mock_oracle_lob


class TestHandlers:
def test_fetch_all_handler(self):
cursor = MagicMock()
cursor.description = [("col1", "int"), ("col2", "string")]
cursor.fetchall.return_value = [(1, mock_oracle_lob("hello"))]

assert fetch_all_handler(cursor) == [(1, "hello")]

cursor.description = None
assert fetch_all_handler(cursor) is None

def test_fetch_one_handler(self):
cursor = MagicMock()
cursor.description = [("col1", "int")]
cursor.fetchone.return_value = (mock_oracle_lob("hello"),)

assert fetch_one_handler(cursor) == ("hello",)

cursor.description = None
assert fetch_one_handler(cursor) is None
25 changes: 25 additions & 0 deletions providers/oracle/tests/unit/oracle/hooks/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from airflow.models import Connection
from airflow.providers.oracle.hooks.oracle import OracleHook

from unit.oracle.test_utils import mock_oracle_lob


class TestOracleHookConn:
def setup_method(self):
Expand Down Expand Up @@ -545,6 +547,7 @@ def bindvar(value):
assert result == expected

def test_test_connection_use_dual_table(self):
self.cur.fetchone.return_value = (1,)
status, message = self.db_hook.test_connection()
self.cur.execute.assert_called_once_with("select 1 from dual")
assert status is True
Expand Down Expand Up @@ -589,3 +592,25 @@ def test_get_openlineage_database_info_with_sid(self):
assert db_info.normalize_name_method("employees") == "EMPLOYEES"
assert db_info.information_schema_table_name == "ALL_TAB_COLUMNS"
assert "owner" in db_info.information_schema_columns

def test_get_first(self):
statement = "SQL"

self.cur.fetchone.return_value = (mock_oracle_lob("hello"),)

assert self.db_hook.get_first(statement) == ("hello",)

assert self.conn.close.call_count == 1
assert self.cur.close.call_count == 1
self.cur.execute.assert_called_once_with(statement)

def test_get_records(self):
statement = "SQL"

self.cur.fetchall.return_value = (mock_oracle_lob("hello"),), (mock_oracle_lob("world"),)

assert self.db_hook.get_records(statement) == [("hello",), ("world",)]

assert self.conn.close.call_count == 1
assert self.cur.close.call_count == 1
self.cur.execute.assert_called_once_with(statement)
28 changes: 28 additions & 0 deletions providers/oracle/tests/unit/oracle/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from unittest.mock import MagicMock

import oracledb


def mock_oracle_lob(value):
mock_lob = MagicMock(spec=oracledb.LOB)
mock_lob.read.return_value = value
return mock_lob