Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,19 +250,19 @@ def execute_bteq_script_at_local(
temp_file_read_encoding: str | None,
) -> int | None:
verify_bteq_installed()
bteq_command_str = prepare_bteq_command_for_local_execution(
bteq_command_list = prepare_bteq_command_for_local_execution(
self.get_conn(),
timeout=timeout,
bteq_script_encoding=bteq_script_encoding or "",
bteq_session_encoding=bteq_session_encoding or "",
timeout_rc=timeout_rc or -1,
)
process = subprocess.Popen(
bteq_command_str,
bteq_command_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
shell=False,
start_new_session=True,
)
encode_bteq_script = bteq_script.encode(str(temp_file_read_encoding or "UTF-8"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,20 @@ def _prepare_bteq_command(
bteq_session_encoding: str,
timeout_rc: int,
) -> list[str]:
bteq_core_cmd = ["bteq"]
if bteq_session_encoding:
bteq_core_cmd.append(f" -e {bteq_script_encoding}")
bteq_core_cmd.append(f" -c {bteq_session_encoding}")
bteq_core_cmd.append('"')
bteq_core_cmd.append(f".SET EXITONDELAY ON MAXREQTIME {timeout}")
cmd = ["bteq"]
if bteq_session_encoding and bteq_script_encoding:
cmd.extend(["-e", bteq_script_encoding])
cmd.extend(["-c", bteq_session_encoding])

script_parts = [f".SET EXITONDELAY ON MAXREQTIME {timeout}"]
if timeout_rc is not None and timeout_rc >= 0:
bteq_core_cmd.append(f" RC {timeout_rc}")
bteq_core_cmd.append(";")
script_parts.append(f"RC {timeout_rc}")
script_parts.append(";")
# Airflow doesn't display the script of BTEQ in UI but only in log so WIDTH is 500 enough
bteq_core_cmd.append(" .SET WIDTH 500;")
return bteq_core_cmd
script_parts.append(".SET WIDTH 500;")

cmd.append(" ".join(script_parts))
return cmd


def prepare_bteq_command_for_remote_execution(
Expand All @@ -141,9 +143,9 @@ def prepare_bteq_command_for_remote_execution(
timeout_rc: int,
) -> str:
"""Prepare the BTEQ command with necessary parameters."""
bteq_core_cmd = _prepare_bteq_command(timeout, bteq_script_encoding, bteq_session_encoding, timeout_rc)
bteq_core_cmd.append('"')
return " ".join(bteq_core_cmd)
cmd = _prepare_bteq_command(timeout, bteq_script_encoding, bteq_session_encoding, timeout_rc)
cmd.append('"')
return " ".join(cmd)


def prepare_bteq_command_for_local_execution(
Expand All @@ -152,16 +154,14 @@ def prepare_bteq_command_for_local_execution(
bteq_script_encoding: str,
bteq_session_encoding: str,
timeout_rc: int,
) -> str:
) -> list[str]:
"""Prepare the BTEQ command with necessary parameters."""
bteq_core_cmd = _prepare_bteq_command(timeout, bteq_script_encoding, bteq_session_encoding, timeout_rc)
cmd = _prepare_bteq_command(timeout, bteq_script_encoding, bteq_session_encoding, timeout_rc)
host = conn["host"]
login = conn["login"]
password = conn["password"]
bteq_core_cmd.append(f" .LOGON {host}/{login},{password}")
bteq_core_cmd.append('"')
bteq_command_str = " ".join(bteq_core_cmd)
return bteq_command_str
cmd[-1] += f" .LOGON {host}/{login},{password}"
return cmd


def is_valid_file(file_path: str) -> bool:
Expand Down
6 changes: 3 additions & 3 deletions providers/teradata/tests/unit/teradata/hooks/test_bteq.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ def test_execute_bteq_script_at_local_timeout(
"sp": None,
},
)
@patch("airflow.providers.teradata.hooks.bteq.verify_bteq_installed") # <- patch here
@patch("airflow.providers.teradata.hooks.bteq.prepare_bteq_command_for_local_execution") # <- patch here too
@patch("airflow.providers.teradata.hooks.bteq.verify_bteq_installed")
@patch("airflow.providers.teradata.hooks.bteq.prepare_bteq_command_for_local_execution")
def test_execute_bteq_script_at_local_success(
mock_prepare_cmd,
mock_verify_bteq,
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_execute_bteq_script_at_local_success(
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
shell=False,
start_new_session=True,
)
assert ret_code == 0
Expand Down