Skip to content

Commit

Permalink
šŸ› Source Mixpanel - Add new datatime formats for state for cohort_memā€¦
Browse files Browse the repository at this point in the history
ā€¦bers stream, added obsolete state reset for cohort_members stream (#38066)

Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: katmarkham <40400595+katmarkham@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
  • Loading branch information
5 people authored May 23, 2024
1 parent b45b38d commit 751b7af
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,28 @@
{
"type": "STREAM",
"stream": {
"stream_state": { "last_seen": "2030-01-01T00:00:00" },
"stream_state": {
"states": [
{
"partition": {
"id": 4269289,
"parent_slice": {}
},
"cursor": {
"last_seen": "2030-01-01T00:00:00"
}
},
{
"partition": {
"id": 1343181,
"parent_slice": {}
},
"cursor": {
"last_seen": "2030-01-01T00:00:00"
}
}
]
},
"stream_descriptor": { "name": "cohort_members" }
}
},
Expand Down
22 changes: 17 additions & 5 deletions airbyte-integrations/connectors/source-mixpanel/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 2.3.1
dockerImageTag: 3.0.0
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel
Expand All @@ -26,18 +26,30 @@ data:
registries:
cloud:
enabled: true
dockerImageTag: 2.2.0 # temporary pin due to a bug in 2.3.0 https://github.com/airbytehq/airbyte/pull/38106
oss:
enabled: true
dockerImageTag: 2.2.0 # temporary pin due to a bug in 2.3.0 https://github.com/airbytehq/airbyte/pull/38106
releaseStage: generally_available
releases:
breakingChanges:
3.0.0:
message:
In this release, CohortMembers stream has changed due to changes in primary key and an improper state format.
Please reset CohortMembers stream. For more information, see our migration documentation.
upgradeDeadline: "2024-06-03"
2.0.0:
message: In this release, the default primary key for stream Export has been deleted, allowing users to select the key that best fits their data. Refreshing the source schema and resetting affected streams is necessary only if new primary keys are to be applied following the upgrade.
message:
In this release, the default primary key for stream Export has been
deleted, allowing users to select the key that best fits their data. Refreshing
the source schema and resetting affected streams is necessary only if new
primary keys are to be applied following the upgrade.
upgradeDeadline: "2023-11-30"
1.0.0:
message: In this release, the datetime field of stream engage has had its type changed from date-time to string due to inconsistent data from Mixpanel. Additionally, the primary key for stream export has been fixed to uniquely identify records. Users will need to refresh the source schema and reset affected streams after upgrading.
message:
In this release, the datetime field of stream engage has had its
type changed from date-time to string due to inconsistent data from Mixpanel.
Additionally, the primary key for stream export has been fixed to uniquely
identify records. Users will need to refresh the source schema and reset
affected streams after upgrading.
upgradeDeadline: "2023-10-31"
suggestedStreams:
streams:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.3.1"
version = "3.0.0"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
Expand Down Expand Up @@ -261,6 +259,8 @@ class EngagePaginationStrategy(PageIncrement):
page - incremental page number
"""

_total = 0

def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
"""
Determines page and subpage numbers for the `items` stream
Expand All @@ -281,6 +281,10 @@ def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Op
self._total = None
return None

def reset(self) -> None:
super().reset()
self._total = 0


class EngageJsonFileSchemaLoader(JsonFileSchemaLoader):
"""Engage schema combines static and dynamic approaches"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,14 @@ definitions:
record_selector:
$ref: "#/definitions/selector_empty_dpath"
record_filter:
condition: "{{ record['created'] >= stream_state.created if stream_state.created else true }}"
condition: "{{ record.created >= stream_interval.start_time }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: created
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%d %H:%M:%S"
- "%Y-%m-%dT%H:%M:%SZ"
- "%Y-%m-%dT%H:%M:%S%z"
datetime_format: "%Y-%m-%d %H:%M:%S"
start_datetime:
Expand Down Expand Up @@ -173,7 +175,7 @@ definitions:
record_selector:
$ref: "#/definitions/selector"
record_filter:
condition: "{{ record['$properties']['$last_seen'] >= stream_state.last_seen if stream_state.last_seen else true }}"
condition: "{{ record['$properties']['$last_seen'] >= stream_interval.start_time }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: last_seen
Expand All @@ -191,14 +193,17 @@ definitions:
fields:
- path:
- browser_version
value: "{{ record.browser_version | string }}"
value: "{{ record.browser_version | string if record.browser_version else '' }}"
schema_loader:
type: CustomSchemaLoader
class_name: "source_mixpanel.components.EngageJsonFileSchemaLoader"
file_path: "./source_mixpanel/schemas/{{ parameters['name'] }}.json"

cohort_members_stream:
$ref: "#/definitions/engage_stream"
primary_key:
- distinct_id
- cohort_id
$parameters:
name: cohort_members
path: 2.0/engage
Expand All @@ -210,6 +215,10 @@ definitions:
http_method: POST
paginator:
$ref: "#/definitions/paginator"
record_selector:
$ref: "#/definitions/selector"
record_filter:
condition: "{{ record['$properties']['$last_seen'] >= stream_interval.start_time }}"
partition_router:
class_name: "source_mixpanel.components.CohortMembersSubstreamPartitionRouter"
parent_stream_configs:
Expand All @@ -232,7 +241,7 @@ definitions:
fields:
- path:
- browser_version
value: "{{ record.browser_version | string }}"
value: "{{ record.browser_version | string if record.browser_version else '' }}"

# No API docs! build based on singer source
revenue_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from .engage import EngageSchema
from .export import Export, ExportSchema


__all__ = [
"IncrementalMixpanelStream",
"MixpanelStream",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import os
from pathlib import Path

import pendulum
import pytest
Expand Down Expand Up @@ -37,3 +39,20 @@ def config_raw(config):
@pytest.fixture(autouse=True)
def patch_time(mocker):
mocker.patch("time.sleep")


ENV_REQUEST_CACHE_PATH = "REQUEST_CACHE_PATH"
os.environ["REQUEST_CACHE_PATH"] = ENV_REQUEST_CACHE_PATH

def delete_cache_files(cache_directory):
directory_path = Path(cache_directory)
if directory_path.exists() and directory_path.is_dir():
for file_path in directory_path.glob("*.sqlite"):
file_path.unlink()

@pytest.fixture(autouse=True)
def clear_cache_before_each_test():
# The problem: Once the first request is cached, we will keep getting the cached result no matter what setup we prepared for a particular test.
# Solution: We must delete the cache before each test because for the same URL, we want to define multiple responses and status codes.
delete_cache_files(os.getenv(ENV_REQUEST_CACHE_PATH))
yield
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ def test_streams_string_date(requests_mock, config_raw):
),
)
def test_config_validation(config, success, expected_error_message, requests_mock):
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}])
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}])
requests_mock.get("https://eu.mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}])
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1, 'created':'2021-02-11T00:00:00Z'}])
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1, 'created':'2021-02-11T00:00:00Z'}])
requests_mock.get("https://eu.mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1, 'created':'2021-02-11T00:00:00Z'}])
try:
is_success, message = SourceMixpanel().check_connection(None, config)
except AirbyteTracedException as e:
Expand Down
Loading

0 comments on commit 751b7af

Please sign in to comment.