Skip to content

Resync Operator #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
17 changes: 14 additions & 3 deletions fivetran_provider_async/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,21 +357,32 @@ def prep_connector(self, connector_id: str, schedule_type: str) -> None:
else:
self.log.debug("Schedule type for %s was already %s", connector_id, schedule_type)

def start_fivetran_sync(self, connector_id: str) -> str:
def start_fivetran_sync(self, connector_id: str, mode="sync", payload: dict | None = None) -> str:
"""
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:return: Timestamp of previously completed sync
"""
if payload and mode != "resync":
raise ValueError("payload should only be provided when doing a resync")

connector_details = self.get_connector(connector_id)
succeeded_at = connector_details["succeeded_at"]
failed_at = connector_details["failed_at"]
endpoint = self.api_path_connectors + connector_id
if self._do_api_call("GET", endpoint)["data"]["paused"] is True:

if connector_details["paused"] is True:
self._do_api_call("PATCH", endpoint, json={"paused": False})
if succeeded_at is None and failed_at is None:
succeeded_at = str(pendulum.now())
self._do_api_call("POST", endpoint + "/force")

api_call_args: dict[str, Any] = {
"method": "POST",
"endpoint": f"{endpoint}/{'resync' if mode == 'resync' else 'force'}",
}
if payload:
api_call_args["json"] = payload
self._do_api_call(**api_call_args)

failed_at_time = None
try:
Expand Down
16 changes: 15 additions & 1 deletion fivetran_provider_async/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def execute(self, context: Context) -> None | str:
"""Start the sync using synchronous hook"""
hook = self.hook
hook.prep_connector(self.connector_id, self.schedule_type)
last_sync = hook.start_fivetran_sync(self.connector_id)
last_sync = self._sync(hook)

if not self.wait_for_completion:
return last_sync
Expand Down Expand Up @@ -262,3 +262,17 @@ def get_openlineage_facets_on_start(self):

def get_openlineage_facets_on_complete(self, task_instance):
return self.get_openlineage_facets_on_start()

def _sync(self, hook: FivetranHook):
return hook.start_fivetran_sync(connector_id=self.connector_id)


class FivetranResyncOperator(FivetranOperator):
def __init__(self, scope: dict | None = None, **kwargs):
super().__init__(**kwargs)
self.scope = scope

def _sync(self, hook: FivetranHook):
return hook.start_fivetran_sync(
connector_id=self.connector_id, mode="resync", payload={"scope": self.scope} if self.scope else None
)
28 changes: 28 additions & 0 deletions tests/hooks/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@
},
}

MOCK_FIVETRAN_OPERATION_PERFORMED_RESPONSE = {"code": "Success", "message": "Operation performed."}


class TestFivetranHookAsync:
@pytest.mark.asyncio
Expand Down Expand Up @@ -832,3 +834,29 @@ def test_prepare_api_call_kwargs_always_returns_tuple(self):
kwargs = hook._prepare_api_call_kwargs("POST", "v1/connectors/test", auth="BadAuth")
assert not isinstance(kwargs["auth"], BasicAuth)
assert is_container(kwargs["auth"]) and len(kwargs["auth"]) == 2

@requests_mock.mock()
def test_start_fivetran_resync(self, m):
m.get(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge",
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD,
)
m.post(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge/resync",
json=MOCK_FIVETRAN_OPERATION_PERFORMED_RESPONSE,
)
hook = FivetranHook(
fivetran_conn_id="conn_fivetran",
)

payload = {"scope": {"schema": ["table1", "table2"]}}
result = hook.start_fivetran_sync(
connector_id="interchangeable_revenge",
mode="resync",
payload=payload,
)

assert m.last_request.path == "/v1/connectors/interchangeable_revenge/resync"
assert m.last_request.json() == payload
assert m.last_request.method == "POST"
assert result is not None
48 changes: 47 additions & 1 deletion tests/operators/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pendulum.tz.timezone import Timezone

from fivetran_provider_async.hooks import FivetranHook
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.operators import FivetranOperator, FivetranResyncOperator
from tests.common.static import (
MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD_SHEETS,
MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_SHEETS,
Expand Down Expand Up @@ -346,3 +346,49 @@ def test_fivetran_op_without_connector_id_error(self):
)

assert str(exc.value) == "No value specified for connector_id or to both connector_name and destination_name"


@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@")
class TestFivetranResyncOperator(unittest.TestCase):
@mock.patch("fivetran_provider_async.operators.FivetranHook")
def test_fivetran_resync_no_scope(self, hook_cls):
"""Tests that task gets deferred after job submission"""
hook = hook_cls.return_value
hook.start_fivetran_sync.return_value = "2025-12-31T12:34:56Z"
hook.is_synced_after_target_time.return_value = False

task = FivetranResyncOperator(
task_id="fivetran_op_async",
fivetran_conn_id="conn_fivetran",
connector_id="interchangeable_revenge",
)
with pytest.raises(TaskDeferred):
task.execute({})

assert hook.start_fivetran_sync.call_args[1] == {
"connector_id": "interchangeable_revenge",
"mode": "resync",
"payload": None,
}

@mock.patch("fivetran_provider_async.operators.FivetranHook")
def test_fivetran_resync_scope(self, hook_cls):
"""Tests that task gets deferred after job submission"""
hook = hook_cls.return_value
hook.start_fivetran_sync.return_value = "2025-12-31T12:34:56Z"
hook.is_synced_after_target_time.return_value = False

task = FivetranResyncOperator(
task_id="fivetran_op_async",
fivetran_conn_id="conn_fivetran",
connector_id="interchangeable_revenge",
scope={"schema": ["table1", "table2"]},
)
with pytest.raises(TaskDeferred):
task.execute({})

assert hook.start_fivetran_sync.call_args[1] == {
"connector_id": "interchangeable_revenge",
"mode": "resync",
"payload": {"scope": {"schema": ["table1", "table2"]}},
}