Skip to content

Commit

Permalink
Source Klaviyo: use CheckpointMixin for handling state updates (#38879)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jun 6, 2024
1 parent c1de750 commit 209f5b8
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.2.2@sha256:57703de3b4c4204bd68a7b13c9300f8e03c0189bffddaffc796f1da25d2dbea0
dockerImageTag: 2.6.3
dockerImageTag: 2.6.4
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
378 changes: 336 additions & 42 deletions airbyte-integrations/connectors/source-klaviyo/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-klaviyo/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.6.3"
version = "2.6.4"
name = "source-klaviyo"
description = "Source implementation for Klaviyo."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_klaviyo"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "^0"
airbyte-cdk = "=0.90.0"

[tool.poetry.scripts]
source-klaviyo = "source_klaviyo.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,36 @@
import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.core import CheckpointMixin, StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from requests import Response

from .availability_strategy import KlaviyoAvailabilityStrategy
from .exceptions import KlaviyoBackoffError


class KlaviyoStream(HttpStream, ABC):
class KlaviyoStream(HttpStream, CheckpointMixin, ABC):
"""Base stream for api version v2023-10-15"""

url_base = "https://a.klaviyo.com/api/"
primary_key = "id"
page_size = None
api_revision = "2023-10-15"

@property
def state(self):
return self._state

@state.setter
def state(self, value):
self._state = value

def __init__(self, api_key: str, start_date: Optional[str] = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._api_key = api_key
self._start_ts = start_date
if not hasattr(self, "_state"):
self._state = {}

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
Expand Down Expand Up @@ -86,7 +96,7 @@ def map_record(self, record: MutableMapping[str, Any]) -> MutableMapping[str, An
record[self.cursor_field] = record["attributes"][self.cursor_field]
return record

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Override to determine the latest state after reading the latest record.
This typically compared the cursor_field from the latest record and the current state and picks
Expand Down Expand Up @@ -118,8 +128,13 @@ def read_records(
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:

current_state = self.state or {}
try:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
for record in super().read_records(sync_mode, cursor_field, stream_slice, current_state):
self.state = self._get_updated_state(current_state, record)
yield record

except KlaviyoBackoffError as e:
self.logger.warning(repr(e))

Expand Down Expand Up @@ -167,7 +182,7 @@ def request_params(
class IncrementalKlaviyoStreamWithArchivedRecords(IncrementalKlaviyoStream, ABC):
"""A base class which should be used when archived records need to be read"""

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Extend the stream state with `archived` property to store such records' state separately from the stream state
"""
Expand All @@ -180,7 +195,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
current_stream_state["archived"] = {self.cursor_field: latest_archived_cursor.isoformat()}
return current_stream_state
else:
return super().get_updated_state(current_stream_state, latest_record)
return super()._get_updated_state(current_stream_state, latest_record)

def stream_slices(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_request_params(self, config_start_date, stream_state_date, next_page_to
)
def test_get_updated_state(self, config_start_date, current_cursor, latest_cursor, expected_cursor):
stream = SomeIncrementalStream(api_key=API_KEY, start_date=config_start_date)
assert stream.get_updated_state(
assert stream._get_updated_state(
current_stream_state={stream.cursor_field: current_cursor} if current_cursor else {},
latest_record={stream.cursor_field: latest_cursor},
) == {stream.cursor_field: expected_cursor}
Expand Down Expand Up @@ -446,7 +446,7 @@ def test_read_records(self, requests_mock):
)
def test_get_updated_state(self, latest_record, current_stream_state, expected_state):
stream = Campaigns(api_key=API_KEY)
assert stream.get_updated_state(current_stream_state, latest_record) == expected_state
assert stream._get_updated_state(current_stream_state, latest_record) == expected_state

def test_stream_slices(self):
stream = Campaigns(api_key=API_KEY)
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ Stream `Lists Detailed` contains field `profile_count` in addition to info from

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|
| 2.6.3 | 2024-06-04 | [38935](https://github.com/airbytehq/airbyte/pull/38935) | [autopull] Upgrade base image to v1.2.1 |
| `2.6.4` | 2024-06-06 | [38879](https://github.com/airbytehq/airbyte/pull/38879) | Implement `CheckpointMixin` for handling state in Python streams |
| `2.6.3` | 2024-06-04 | [38935](https://github.com/airbytehq/airbyte/pull/38935) | [autopull] Upgrade base image to v1.2.1 |
| `2.6.2` | 2024-05-08 | [37789](https://github.com/airbytehq/airbyte/pull/37789) | Move stream schemas and spec to manifest |
| `2.6.1` | 2024-05-07 | [38010](https://github.com/airbytehq/airbyte/pull/38010) | Add error handler for `5XX` status codes |
| `2.6.0` | 2024-04-19 | [37370](https://github.com/airbytehq/airbyte/pull/37370) | Add streams `campaigns_detailed` and `lists_detailed` |
Expand Down Expand Up @@ -107,4 +108,4 @@ Stream `Lists Detailed` contains field `profile_count` in addition to info from
| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. |
| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT |

</details>
</details>

0 comments on commit 209f5b8

Please sign in to comment.