Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions providers/oracle/src/airflow/providers/oracle/hooks/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def get_conn(self) -> oracledb.Connection:


"""
conn = self.get_connection(self.oracle_conn_id) # type: ignore[attr-defined]
conn = self.get_connection(self.get_conn_id())
conn_config: dict[str, Any] = {"user": conn.login, "password": conn.password}
sid = conn.extra_dejson.get("sid")
mod = conn.extra_dejson.get("module")
Expand Down Expand Up @@ -231,18 +231,18 @@ def get_conn(self) -> oracledb.Connection:
if expire_time:
conn_config["expire_time"] = expire_time

conn = oracledb.connect(**conn_config) # type: ignore[assignment]
oracle_conn = oracledb.connect(**conn_config)
if mod is not None:
conn.module = mod # type: ignore[attr-defined]
oracle_conn.module = mod

# if Connection.schema is defined, set schema after connecting successfully
# cannot be part of conn_config
# https://python-oracledb.readthedocs.io/en/latest/api_manual/connection.html?highlight=schema#Connection.current_schema
# Only set schema when not using conn.schema as Service Name
if schema and service_name:
conn.current_schema = schema # type: ignore[attr-defined]
oracle_conn.current_schema = schema

return conn # type: ignore[return-value]
return oracle_conn

def insert_rows(
self,
Expand Down Expand Up @@ -292,7 +292,7 @@ def insert_rows(
conn = self.get_conn()
if self.supports_autocommit:
self.set_autocommit(conn, False)
cur = conn.cursor() # type: ignore[attr-defined]
cur = conn.cursor()
i = 0
for row in rows:
i += 1
Expand All @@ -312,11 +312,11 @@ def insert_rows(
sql = f"INSERT /*+ APPEND */ INTO {table} {target_fields} VALUES ({','.join(values)})"
cur.execute(sql)
if i % commit_every == 0:
conn.commit() # type: ignore[attr-defined]
conn.commit()
self.log.info("Loaded %s into %s rows so far", i, table)
conn.commit() # type: ignore[attr-defined]
conn.commit()
cur.close()
conn.close() # type: ignore[attr-defined]
conn.close()
self.log.info("Done loading. Loaded a total of %s rows", i)

def bulk_insert_rows(
Expand Down Expand Up @@ -349,7 +349,7 @@ def bulk_insert_rows(
conn = self.get_conn()
if self.supports_autocommit:
self.set_autocommit(conn, False)
cursor = conn.cursor() # type: ignore[attr-defined]
cursor = conn.cursor()
values_base = target_fields or rows[0]

if bool(sequence_column) ^ bool(sequence_name):
Expand Down Expand Up @@ -382,18 +382,18 @@ def bulk_insert_rows(
if row_count % commit_every == 0:
cursor.prepare(prepared_stm)
cursor.executemany(None, row_chunk)
conn.commit() # type: ignore[attr-defined]
conn.commit()
self.log.info("[%s] inserted %s rows", table, row_count)
# Empty chunk
row_chunk = []
# Commit the leftover chunk
if row_chunk:
cursor.prepare(prepared_stm)
cursor.executemany(None, row_chunk)
conn.commit() # type: ignore[attr-defined]
conn.commit()
self.log.info("[%s] inserted %s rows", table, row_count)
cursor.close()
conn.close() # type: ignore[attr-defined]
conn.close()

def callproc(
self,
Expand Down Expand Up @@ -452,7 +452,7 @@ def handler(cursor):

def get_uri(self) -> str:
"""Get the URI for the Oracle connection."""
conn = self.get_connection(self.oracle_conn_id) # type: ignore[attr-defined]
conn = self.get_connection(self.get_conn_id())
login = conn.login
password = conn.password
host = conn.host
Expand Down