Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source PostHog: incremental streams read only relevant pages #4001

Merged
merged 9 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "af6d50ee-dddf-4126-a8ee-7faee990774f",
"name": "PostHog",
"dockerRepository": "airbyte/source-posthog",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/posthog"
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
- sourceDefinitionId: af6d50ee-dddf-4126-a8ee-7faee990774f
name: PostHog
dockerRepository: airbyte/source-posthog
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/posthog
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


def test_dummy():
""" This is just to fix customIntegration task in Gradle. It fails when pytest unable to find tests."""
assert True
4 changes: 0 additions & 4 deletions airbyte-integrations/connectors/source-posthog/CHANGELOG.md

This file was deleted.

2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-posthog/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-posthog
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
validate_output_from_all_streams: yes
empty_streams:
- events_sessions
- insights_path
- insights_sessions
- trends
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/state.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ docker run --rm -it \
-v $(pwd):/test_input \
airbyte/source-acceptance-test \
--acceptance-test-config /test_input

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "events_sessions",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": [["global_session_id"]],
"namespace": null
},
"sync_mode": "full_refresh",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "feature_flags",
Expand Down Expand Up @@ -82,6 +97,7 @@
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": null,
"namespace": null
},
"sync_mode": "full_refresh",
Expand All @@ -96,6 +112,7 @@
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": null,
"namespace": null
},
"sync_mode": "full_refresh",
Expand Down Expand Up @@ -125,6 +142,7 @@
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": null,
"namespace": null
},
"sync_mode": "full_refresh",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


def test_dummy():
""" This is just to fix customIntegration task in Gradle. It fails when pytest unable to find tests."""
assert True
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
{
"feature_flags": { "id": 100000000000000000000000000 },
"events": { "timestamp": "2121-04-13T18:13:51.504000+00:00" },
"persons": { "created_at": "2121-04-13T18:13:54.269000Z" },
"annotations": { "updated_at": "2121-05-27T14:09:29.961933Z" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
}
},
"email": {
"type": "string"
"type": ["null", "string"]
},
"session_recordings": {
"type": "array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator

from .streams import ( # EventsSessions,
from .streams import (
Annotations,
Cohorts,
Events,
EventsSessions,
FeatureFlags,
Insights,
InsightsPath,
Expand All @@ -49,14 +50,14 @@
class SourcePosthog(AbstractSource):
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
_ = pendulum.parse(config["start_date"], strict=True)
_ = pendulum.parse(config["start_date"])
authenticator = TokenAuthenticator(token=config["api_key"])
stream = PingMe(authenticator=authenticator)
records = stream.read_records(sync_mode=SyncMode.full_refresh)
_ = next(records)
return True, None
except Exception as e:
if isinstance(e, requests.exceptions.HTTPError) and e.response.status_code == 401:
if isinstance(e, requests.exceptions.HTTPError) and e.response.status_code == requests.codes.UNAUTHORIZED:
return False, f"Please check you api_key. Error: {repr(e)}"
return False, repr(e)

Expand All @@ -72,9 +73,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Annotations(authenticator=authenticator, start_date=config["start_date"]),
Cohorts(authenticator=authenticator),
Events(authenticator=authenticator, start_date=config["start_date"]),
# disabled because the endpoint returns only active sessions and they have TTL=24h
# so most of the time it will be empty
# EventsSessions(authenticator=authenticator),
EventsSessions(authenticator=authenticator),
FeatureFlags(authenticator=authenticator),
Insights(authenticator=authenticator),
InsightsPath(authenticator=authenticator),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import math
import urllib.parse
from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Sequence

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream


Expand All @@ -48,8 +49,14 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]:
return {"Content-Type": "application/json", "User-Agent": "posthog-python/1.4.0"}

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield from response_json.get(self.data_field, [])
response_data = response.json()
if self.data_field:
response_data = response_data.get(self.data_field)

if isinstance(response_data, Sequence):
yield from response_data
elif response_data:
yield response_data

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
Expand Down Expand Up @@ -86,12 +93,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
"""
Return next page token until we reach the page with records older than state/start_date
"""
min_state = self._initial_state or self._start_date
response_json = response.json()
data = response_json.get(self.data_field, [])
latest_record = data[-1] if data else None # records are ordered so we check only last one

if not latest_record or latest_record[self.cursor_field] > min_state:
if not latest_record or latest_record[self.cursor_field] > self._initial_state:
return super().next_page_token(response=response)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -104,11 +110,28 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: max(latest_state, current_state)}

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""
Filter records by initial_state value
"""
data = super().parse_response(response=response, stream_state=stream_state, **kwargs)
for record in data:
if record.get(self.cursor_field) >= stream_state.get(self.cursor_field, self._start_date):
if record.get(self.cursor_field) >= self._initial_state:
yield record

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
Initialize initial_state value
"""
stream_state = stream_state or {}
self._initial_state = self._initial_state or stream_state.get(self.cursor_field) or self._start_date
return super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)


class Annotations(IncrementalPosthogStream):
"""
Expand Down Expand Up @@ -159,9 +182,10 @@ class EventsSessions(PosthogStream):
Docs: https://posthog.com/docs/api/events
"""

primary_key = "global_session_id"
data_field = "result"

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, **kwargs) -> str:
return "event/sessions"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand All @@ -174,7 +198,7 @@ class FeatureFlags(PosthogStream):
Docs: https://posthog.com/docs/api/feature-flags
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, **kwargs) -> str:
return "feature_flag"


Expand All @@ -184,7 +208,7 @@ class Insights(PosthogStream):
Endpoint does not support incremental read because id, created_at and last_refresh are ordered in any particular way
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, **kwargs) -> str:
return "insight"


Expand All @@ -193,9 +217,10 @@ class InsightsPath(PosthogStream):
Docs: https://posthog.com/docs/api/insights
"""

primary_key = None
data_field = "result"

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, **kwargs) -> str:
return "insight/path"


Expand All @@ -204,9 +229,10 @@ class InsightsSessions(PosthogStream):
Docs: https://posthog.com/docs/api/insights
"""

primary_key = None
data_field = "result"

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, **kwargs) -> str:
return "insight/session"


Expand All @@ -215,7 +241,7 @@ class Persons(PosthogStream):
Docs: https://posthog.com/docs/api/people
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, **kwargs) -> str:
return "person"


Expand All @@ -224,9 +250,10 @@ class Trends(PosthogStream):
Docs: https://posthog.com/docs/api/insights
"""

primary_key = None
data_field = "result"

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, **kwargs) -> str:
return "insight/trend"


Expand All @@ -235,9 +262,7 @@ class PingMe(PosthogStream):
Docs: https://posthog.com/docs/api/user
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "users/@me"
data_field = None

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield response_json
def path(self, **kwargs) -> str:
return "users/@me"
5 changes: 3 additions & 2 deletions docs/integrations/sources/posthog.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Please follow these [steps](https://posthog.com/docs/api/overview#how-to-obtain-

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.2 | 2021-07-15 | [4692](https://github.com/airbytehq/airbyte/pull/4692) | Source PostHog: Use account information for checking the connection
| 0.1.3 | 2021-07-20 | [4001](https://github.com/airbytehq/airbyte/pull/4001) | Incremental streams read only relevant pages|
| 0.1.2 | 2021-07-15 | [4692](https://github.com/airbytehq/airbyte/pull/4692) | Use account information for checking the connection|
| 0.1.1 | 2021-07-05 | [4539](https://github.com/airbytehq/airbyte/pull/4539) | Add `AIRBYTE_ENTRYPOINT` env variable for kubernetes support|
| 0.1.0 | 2021-06-08 | [3768](https://github.com/airbytehq/airbyte/pull/3768) | Initial Release |
| 0.1.0 | 2021-06-08 | [3768](https://github.com/airbytehq/airbyte/pull/3768) | Initial Release|