Skip to content

Commit

Permalink
Source Jira: refactor state handler (airbytehq#39347)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored and kabeer27 committed Jun 11, 2024
1 parent 695043d commit 051ebe3
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 44 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-jira/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 68e63de2-bb83-4c7e-93fa-a8a9051e3993
dockerImageTag: 2.0.2
dockerImageTag: 2.0.3
dockerRepository: airbyte/source-jira
documentationUrl: https://docs.airbyte.com/integrations/sources/jira
githubIssueLabel: source-jira
Expand Down
363 changes: 328 additions & 35 deletions airbyte-integrations/connectors/source-jira/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-jira/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.2"
version = "2.0.3"
name = "source-jira"
description = "Source implementation for Jira."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_jira"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "0.90.0"

[tool.poetry.scripts]
source-jira = "source_jira.run:run"
Expand Down
19 changes: 15 additions & 4 deletions airbyte-integrations/connectors/source-jira/source_jira/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import requests
from airbyte_cdk.logger import AirbyteLogger as Logger
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams import CheckpointMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
Expand Down Expand Up @@ -149,14 +149,24 @@ def __init__(
self._start_date = start_date


class IncrementalJiraStream(StartDateJiraStream, ABC):
class IncrementalJiraStream(StartDateJiraStream, CheckpointMixin, ABC):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._starting_point_cache = {}
self._state = None

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
@property
def state(self) -> Mapping[str, Any]:
return self._state

@state.setter
def state(self, value: Mapping[str, Any]):
self._state = value

def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
updated_state = latest_record[self.cursor_field]
stream_state_value = current_stream_state.get(self.cursor_field)
current_stream_state = current_stream_state or {}
stream_state_value = current_stream_state.get(self.cursor_field, {})
if stream_state_value:
updated_state = max(updated_state, stream_state_value)
current_stream_state[self.cursor_field] = updated_state
Expand Down Expand Up @@ -187,6 +197,7 @@ def read_records(
start_point = self.get_starting_point(stream_state=stream_state)
for record in super().read_records(stream_slice=stream_slice, stream_state=stream_state, **kwargs):
cursor_value = pendulum.parse(record[self.cursor_field])
self.state = self._get_updated_state(self.state, record)
if not start_point or cursor_value >= start_point:
yield record

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ def test_python_issues_stream_updated_state(config):
args = {"authenticator": authenticator, "domain": config["domain"], "projects": config["projects"]}
stream = Issues(**args)

updated_state = stream.get_updated_state(
updated_state = stream._get_updated_state(
current_stream_state={"updated": "2021-01-01T00:00:00Z"},
latest_record={"updated": "2021-01-02T00:00:00Z"}
)
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/jira.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ The Jira connector should not run into Jira API limitations under normal usage.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.3 | 2024-06-10 | [39347](https://github.com/airbytehq/airbyte/pull/39347) | Update state handling for incremental Python streams |
| 2.0.2 | 2024-06-06 | [39310](https://github.com/airbytehq/airbyte/pull/39310) | Fix projects substreams for deleted projects |
| 2.0.1 | 2024-05-20 | [38341](https://github.com/airbytehq/airbyte/pull/38341) | Update CDK authenticator package |
| 2.0.0 | 2024-04-20 | [37374](https://github.com/airbytehq/airbyte/pull/37374) | Migrate to low-code and fix `Project Avatars` stream |
Expand Down Expand Up @@ -191,4 +192,4 @@ The Jira connector should not run into Jira API limitations under normal usage.
| 0.2.4 | | | Implementing base_read acceptance test dived by stream groups. |
| 0.2.3 | | | Implementing incremental sync. Migrated to airbyte-cdk. Adding all available entities in Jira Cloud. |

</details>
</details>

0 comments on commit 051ebe3

Please sign in to comment.