Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Mixpanel - Add new datatime formats for state for cohort_members stream, added obsolete state reset for cohort_members stream #38066

Merged
merged 42 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
efc1ab0
Added new datatime formats for state for cohort_members stream, added…
midavadim May 8, 2024
253dfea
fix unit test
midavadim May 8, 2024
3b7ab40
format
midavadim May 8, 2024
85db1b2
Merge branch 'master' into midavadim/mixpanel-datatime-fix
lazebnyi May 9, 2024
ba2d280
updated test coverage
midavadim May 9, 2024
91c52b3
Merge remote-tracking branch 'origin/midavadim/mixpanel-datatime-fix'…
midavadim May 9, 2024
a60dba2
removed odd end date
midavadim May 9, 2024
e81326a
added client side filtration of old records
midavadim May 9, 2024
7046c2c
added change history for breaking change
midavadim May 9, 2024
d0d061e
added filtering based on start_date
midavadim May 13, 2024
a36f99c
Merge branch 'master' into midavadim/mixpanel-datatime-fix
midavadim May 13, 2024
2e07f3c
format
midavadim May 13, 2024
0947701
format
midavadim May 13, 2024
82d3717
added filter based on start_date fixed unit test
midavadim May 14, 2024
ef816af
update abnormal state
midavadim May 14, 2024
0cda51b
update upgradeDeadline
midavadim May 14, 2024
5caec34
Key is changed to new unique key (based on 'distinct_id' and 'cohort_…
midavadim May 14, 2024
a6a3ff2
removed odd pytz package for testing
midavadim May 14, 2024
c5f43d8
chore: auto-fix lint and format issues
octavia-squidington-iii May 14, 2024
32925d5
reverted state migration since due to breaking change state and data …
midavadim May 14, 2024
2b0d86c
Merge remote-tracking branch 'origin/midavadim/mixpanel-datatime-fix'…
midavadim May 14, 2024
45b3881
set key for cohort_members
midavadim May 14, 2024
8e5a1a1
Merge branch 'master' into midavadim/mixpanel-datatime-fix
midavadim May 14, 2024
463ac1b
updated change log message
midavadim May 14, 2024
dd88ee7
fix unit test for cohort_members
midavadim May 14, 2024
a383794
updated deadline, removed not used import
midavadim May 15, 2024
94194e8
format
midavadim May 15, 2024
422278c
removed dockerImageTag
midavadim May 16, 2024
8aafe69
updated change history messages
midavadim May 17, 2024
b966769
format
midavadim May 17, 2024
86007c1
added unit test for states in semiincremental stream. Use stream_inte…
midavadim May 20, 2024
fa2741b
added unit test for pagination for partition stream.
midavadim May 21, 2024
16a97a9
Merge branch 'master' into midavadim/mixpanel-datatime-fix
midavadim May 21, 2024
daa5975
Merge branch 'master' into midavadim/mixpanel-datatime-fix
midavadim May 22, 2024
2af956b
Merge branch 'master' into midavadim/mixpanel-datatime-fix
midavadim May 22, 2024
863d8d8
Merge remote-tracking branch 'origin/midavadim/mixpanel-datatime-fix'…
midavadim May 22, 2024
9d825ed
disabled cache before each test
midavadim May 22, 2024
87746bd
format
midavadim May 22, 2024
545ed7b
Update docs/integrations/sources/mixpanel-migrations.md
lazebnyi May 22, 2024
560f392
Updated breaking changes notes and message
lazebnyi May 22, 2024
422ff2f
Update airbyte-integrations/connectors/source-mixpanel/metadata.yaml
midavadim May 23, 2024
221c7ae
prettier format
midavadim May 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,28 @@
{
"type": "STREAM",
"stream": {
"stream_state": { "last_seen": "2030-01-01T00:00:00" },
"stream_state": {
"states": [
{
"partition": {
"id": 4269289,
"parent_slice": {}
},
"cursor": {
"last_seen": "2030-01-01T00:00:00"
}
},
{
"partition": {
"id": 1343181,
"parent_slice": {}
},
"cursor": {
"last_seen": "2030-01-01T00:00:00"
}
}
]
},
"stream_descriptor": { "name": "cohort_members" }
}
},
Expand Down
26 changes: 21 additions & 5 deletions airbyte-integrations/connectors/source-mixpanel/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 2.3.1
dockerImageTag: 3.0.0
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel
Expand All @@ -26,18 +26,34 @@ data:
registries:
cloud:
enabled: true
dockerImageTag: 2.2.0 # temporary pin due to a bug in 2.3.0 https://github.com/airbytehq/airbyte/pull/38106
oss:
enabled: true
dockerImageTag: 2.2.0 # temporary pin due to a bug in 2.3.0 https://github.com/airbytehq/airbyte/pull/38106
releaseStage: generally_available
releases:
breakingChanges:
3.0.0:
message:
In this release, state for CohortMembers is changed to per partition format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lazebnyi @midavadim don't we need to have impactscope?
-let's change this message to be more customer friendly. This information is good, but I think it belongs in the migration guide, not the breaking change warning.
-please pic a format for the stream either CohortMembers or cohort_members
-user needs to understand what action needs to be taken. General format is "x stream has changed for y reason. Please reset x stream. See migration guide for further details."

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Key for CohortMembers stream was changed to new unique key based on
distinct_id and cohort_id fields since previous key was not unique and
didn't support possibility for user be in a few different cohorts.
Semi-incremental Cohorts, Cohort_members and Engage streams with client side
filtering extract data since user provided or default (1 year old) start_date.
upgradeDeadline: "2024-06-03"
2.0.0:
message: In this release, the default primary key for stream Export has been deleted, allowing users to select the key that best fits their data. Refreshing the source schema and resetting affected streams is necessary only if new primary keys are to be applied following the upgrade.
message:
In this release, the default primary key for stream Export has been
deleted, allowing users to select the key that best fits their data. Refreshing
the source schema and resetting affected streams is necessary only if new
primary keys are to be applied following the upgrade.
upgradeDeadline: "2023-11-30"
1.0.0:
message: In this release, the datetime field of stream engage has had its type changed from date-time to string due to inconsistent data from Mixpanel. Additionally, the primary key for stream export has been fixed to uniquely identify records. Users will need to refresh the source schema and reset affected streams after upgrading.
message:
In this release, the datetime field of stream engage has had its
type changed from date-time to string due to inconsistent data from Mixpanel.
Additionally, the primary key for stream export has been fixed to uniquely
identify records. Users will need to refresh the source schema and reset
affected streams after upgrading.
upgradeDeadline: "2023-10-31"
suggestedStreams:
streams:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.3.1"
version = "3.0.0"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
Expand Down Expand Up @@ -261,6 +259,8 @@ class EngagePaginationStrategy(PageIncrement):
page - incremental page number
"""

_total = 0
midavadim marked this conversation as resolved.
Show resolved Hide resolved

def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
"""
Determines page and subpage numbers for the `items` stream
Expand All @@ -281,6 +281,10 @@ def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Op
self._total = None
return None

def reset(self) -> None:
super().reset()
self._total = 0


class EngageJsonFileSchemaLoader(JsonFileSchemaLoader):
"""Engage schema combines static and dynamic approaches"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,14 @@ definitions:
record_selector:
$ref: "#/definitions/selector_empty_dpath"
record_filter:
condition: "{{ record['created'] >= stream_state.created if stream_state.created else true }}"
condition: "{{ record.created >= stream_interval.start_time }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: created
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%d %H:%M:%S"
- "%Y-%m-%dT%H:%M:%SZ"
- "%Y-%m-%dT%H:%M:%S%z"
datetime_format: "%Y-%m-%d %H:%M:%S"
start_datetime:
Expand Down Expand Up @@ -173,7 +175,7 @@ definitions:
record_selector:
$ref: "#/definitions/selector"
record_filter:
condition: "{{ record['$properties']['$last_seen'] >= stream_state.last_seen if stream_state.last_seen else true }}"
condition: "{{ record['$properties']['$last_seen'] >= stream_interval.start_time }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: last_seen
Expand All @@ -191,14 +193,17 @@ definitions:
fields:
- path:
- browser_version
value: "{{ record.browser_version | string }}"
value: "{{ record.browser_version | string if record.browser_version else '' }}"
schema_loader:
type: CustomSchemaLoader
class_name: "source_mixpanel.components.EngageJsonFileSchemaLoader"
file_path: "./source_mixpanel/schemas/{{ parameters['name'] }}.json"

cohort_members_stream:
$ref: "#/definitions/engage_stream"
primary_key:
- distinct_id
- cohort_id
$parameters:
name: cohort_members
path: 2.0/engage
Expand All @@ -210,6 +215,10 @@ definitions:
http_method: POST
paginator:
$ref: "#/definitions/paginator"
record_selector:
midavadim marked this conversation as resolved.
Show resolved Hide resolved
$ref: "#/definitions/selector"
record_filter:
condition: "{{ record['$properties']['$last_seen'] >= stream_interval.start_time }}"
partition_router:
class_name: "source_mixpanel.components.CohortMembersSubstreamPartitionRouter"
parent_stream_configs:
Expand All @@ -232,7 +241,7 @@ definitions:
fields:
- path:
- browser_version
value: "{{ record.browser_version | string }}"
value: "{{ record.browser_version | string if record.browser_version else '' }}"
midavadim marked this conversation as resolved.
Show resolved Hide resolved

# No API docs! build based on singer source
revenue_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from .engage import EngageSchema
from .export import Export, ExportSchema


__all__ = [
"IncrementalMixpanelStream",
"MixpanelStream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ def test_streams_string_date(requests_mock, config_raw):
),
)
def test_config_validation(config, success, expected_error_message, requests_mock):
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}])
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}])
requests_mock.get("https://eu.mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}])
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1, 'created':'2021-02-11T00:00:00Z'}])
requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1, 'created':'2021-02-11T00:00:00Z'}])
requests_mock.get("https://eu.mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1, 'created':'2021-02-11T00:00:00Z'}])
try:
is_success, message = SourceMixpanel().check_connection(None, config)
except AirbyteTracedException as e:
Expand Down
Loading
Loading