Skip to content

Commit

Permalink
🐛 Source Intercom: Fix handling of scroll param when it expired (#9513)
Browse files Browse the repository at this point in the history
* Add handling of scroll param when it expired

* Updated PR number

* Fix typo in docs

* Add unittest

* Updated scroll or standard switch mechanism

* Updated to linters

* Updated spec.yaml and defenitions
  • Loading branch information
lazebnyi authored Jan 19, 2022
1 parent 89308f7 commit 6990cc7
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "d8313939-3782-41b0-be29-b3ca20d8dd3a",
"name": "Intercom",
"dockerRepository": "airbyte/source-intercom",
"dockerImageTag": "0.1.11",
"dockerImageTag": "0.1.13",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/intercom",
"icon": "intercom.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@
- name: Intercom
sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerRepository: airbyte/source-intercom
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
documentationUrl: https://docs.airbyte.io/integrations/sources/intercom
icon: intercom.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3134,7 +3134,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-intercom:0.1.12"
- dockerImage: "airbyte/source-intercom:0.1.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-intercom/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.name=airbyte/source-intercom
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class EndpointType(Enum):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._backoff_count = 0
self._use_standard = False
self._endpoint_type = self.EndpointType.scroll
self._total_count = None # uses for saving of a total_count value once

Expand All @@ -193,6 +194,9 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return super().next_page_token(response)
return None

def need_use_standard(self):
return not self.can_use_scroll() or self._use_standard

def can_use_scroll(self):
"""Check backoff count"""
return self._backoff_count <= 3
Expand All @@ -202,38 +206,46 @@ def path(self, **kwargs) -> str:

@classmethod
def check_exists_scroll(cls, response: requests.Response) -> bool:
if response.status_code == 400:
if response.status_code in [400, 404]:
# example response:
# {..., "errors": [{'code': 'scroll_exists', 'message': 'scroll already exists for this workspace'}]}
# {..., "errors": [{'code': 'not_found', 'message':'scroll parameter not found'}]}
err_body = response.json()["errors"][0]
if err_body["code"] == "scroll_exists":
if err_body["code"] in ["scroll_exists", "not_found"]:
return True

return False

@property
def raise_on_http_errors(self) -> bool:
if not self.can_use_scroll() and self._endpoint_type == self.EndpointType.scroll:
if self.need_use_standard() and self._endpoint_type == self.EndpointType.scroll:
return False
return True

def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
yield None
if not self.can_use_scroll():
if self.need_use_standard():
self._endpoint_type = self.EndpointType.standard
yield None

def should_retry(self, response: requests.Response) -> bool:
if self.check_exists_scroll(response):
self._backoff_count += 1
if not self.can_use_scroll():
self.logger.error("Can't create a new scroll request within an minute. " "Let's try to use a standard non-scroll endpoint.")
if self.need_use_standard():
self.logger.error(
"Can't create a new scroll request within an minute or scroll param was expired. "
"Let's try to use a standard non-scroll endpoint."
)
return False

return True
return super().should_retry(response)

def backoff_time(self, response: requests.Response) -> Optional[float]:
if response.status_code == 404:
self._use_standard = True
# Need return value greater than zero to use UserDefinedBackoffException class
return 0.01
if self.check_exists_scroll(response):
self.logger.warning("A previous scroll request is exists. " "It must be deleted within an minute automatically")
# try to check 3 times
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_intercom.source import Companies, Contacts, IntercomStream

Expand Down Expand Up @@ -46,3 +47,32 @@ def test_get_next_page_token(intercom_class, response_json, expected_output_toke
test = intercom_class(authenticator=NoAuth).next_page_token(response)

assert test == expected_output_token


def test_switch_to_standard_endpoint_if_scroll_expired(requests_mock):
"""
Test shows that if scroll param expired we try sync with standard API.
"""

url = "https://api.intercom.io/companies/scroll"
requests_mock.get(
url,
json={"type": "company.list", "data": [{"type": "company", "id": "530370b477ad7120001d"}], "scroll_param": "expired_scroll_param"},
)

url = "https://api.intercom.io/companies/scroll?scroll_param=expired_scroll_param"
requests_mock.get(url, json={"errors": [{"code": "not_found", "message": "scroll parameter not found"}]}, status_code=404)

url = "https://api.intercom.io/companies"
requests_mock.get(url, json={"type": "company.list", "data": [{"type": "company", "id": "530370b477ad7120001d"}]})

stream1 = Companies(authenticator=NoAuth())

records = []

assert stream1._endpoint_type == Companies.EndpointType.scroll

for slice in stream1.stream_slices(sync_mode=SyncMode.full_refresh):
records += list(stream1.read_records(sync_mode=SyncMode, stream_slice=slice))

assert stream1._endpoint_type == Companies.EndpointType.standard
5 changes: 3 additions & 2 deletions docs/integrations/sources/intercom.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ Please read [How to get your Access Token](https://developers.intercom.com/build

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.13 | 2022-01-14 | [9513](https://github.com/airbytehq/airbyte/pull/9513) | Added handling of scroll param when it expired |
| 0.1.12 | 2021-12-14 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Updated fields and descriptions |
| 0.1.11 | 2021-12-13 | [8685](https://github.com/airbytehq/airbyte/pull/8685) | Remove time.sleep for rate limit |
| 0.1.10 | 2021-12-10 | [8637](https://github.com/airbytehq/airbyte/pull/8637) | Fix 'conversations' order and sorting. Correction of the companies stream|
| 0.1.10 | 2021-12-10 | [8637](https://github.com/airbytehq/airbyte/pull/8637) | Fix 'conversations' order and sorting. Correction of the companies stream |
| 0.1.9 | 2021-12-03 | [8395](https://github.com/airbytehq/airbyte/pull/8395) | Fix backoff of 'companies' stream |
| 0.1.8 | 2021-11-09 | [7060](https://github.com/airbytehq/airbyte/pull/7060) | Added oauth support |
| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
Expand All @@ -67,4 +68,4 @@ Please read [How to get your Access Token](https://developers.intercom.com/build
| 0.1.3 | 2021-09-08 | [5908](https://github.com/airbytehq/airbyte/pull/5908) | Corrected timestamp and arrays in schemas |
| 0.1.2 | 2021-08-19 | [5531](https://github.com/airbytehq/airbyte/pull/5531) | Corrected pagination |
| 0.1.1 | 2021-07-31 | [5123](https://github.com/airbytehq/airbyte/pull/5123) | Corrected rate limit |
| 0.1.0 | 2021-07-19 | [4676](https://github.com/airbytehq/airbyte/pull/4676) | Release Slack CDK Connector |
| 0.1.0 | 2021-07-19 | [4676](https://github.com/airbytehq/airbyte/pull/4676) | Release Intercom CDK Connector |

0 comments on commit 6990cc7

Please sign in to comment.