Skip to content

Commit 8de75f1

Browse files
committed
fix: api_version on on_kill method
1 parent d5ffb46 commit 8de75f1

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None:
133133

134134
def on_kill(self):
135135
"""Cancel the job if task is cancelled."""
136-
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_type=self.api_type)
136+
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
137137
if self.job_id:
138138
self.log.info("on_kill: cancel the airbyte Job %s", self.job_id)
139139
hook.cancel_job(self.job_id)

providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,20 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection):
6969
mock_wait_for_job.assert_called_once_with(
7070
job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout
7171
)
72+
73+
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
74+
def test_on_kill(self, mock_cancel_job):
75+
conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com")
76+
db.merge_conn(conn)
77+
78+
op = AirbyteTriggerSyncOperator(
79+
task_id="test_Airbyte_op",
80+
airbyte_conn_id=self.airbyte_conn_id,
81+
connection_id=self.connection_id,
82+
wait_seconds=self.wait_seconds,
83+
timeout=self.timeout,
84+
)
85+
op.job_id = self.job_id
86+
op.on_kill()
87+
88+
mock_cancel_job.assert_called_once_with(self.job_id)

0 commit comments

Comments
 (0)