From ef0ecc3f3e70fa28831bc8153c4cb39d20531748 Mon Sep 17 00:00:00 2001 From: midavadim Date: Wed, 8 May 2024 11:04:38 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Source=20Mixpanel=20low=20code?= =?UTF-8?q?=20migration=20(#36724)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alexandre Girard --- .../integration_tests/abnormal_state.json | 4 +- .../connectors/source-mixpanel/metadata.yaml | 4 +- .../connectors/source-mixpanel/pyproject.toml | 2 +- .../source_mixpanel/components.py | 339 ++++++++++++++++ .../source_mixpanel/manifest.yaml | 373 ++++++++++++++++++ .../source_mixpanel/schemas/funnel_ids.json | 12 + .../source-mixpanel/source_mixpanel/source.py | 102 ++--- .../source_mixpanel/streams/__init__.py | 15 +- .../source_mixpanel/streams/annotations.py | 44 --- .../source_mixpanel/streams/base.py | 6 +- .../source_mixpanel/streams/cohort_members.py | 42 -- .../source_mixpanel/streams/cohorts.py | 53 --- .../source_mixpanel/streams/engage.py | 173 +------- .../source_mixpanel/streams/funnels.py | 169 -------- .../source_mixpanel/streams/revenue.py | 56 --- .../source_mixpanel/testing.py | 22 -- .../source-mixpanel/unit_tests/conftest.py | 7 +- .../source-mixpanel/unit_tests/test_source.py | 95 +---- .../unit_tests/test_streams.py | 284 ++++++------- .../source-mixpanel/unit_tests/unit_test.py | 98 +---- .../source-mixpanel/unit_tests/utils.py | 3 +- docs/integrations/sources/mixpanel.md | 1 + 22 files changed, 904 insertions(+), 1000 deletions(-) create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/components.py create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnel_ids.json delete mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/annotations.py delete mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohort_members.py delete mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py delete mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py delete mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json index 828816502f30..7d5b377ab16d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json @@ -3,8 +3,10 @@ "type": "STREAM", "stream": { "stream_state": { + "36152117": { "date": "2030-01-01" }, "41833532": { "date": "2030-01-01" }, - "36152117": { "date": "2030-01-01" } + "41833755": { "date": "2030-01-01" }, + "41833700": { "date": "2030-01-01" } }, "stream_descriptor": { "name": "funnels" } } diff --git a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml index 5047059dd230..fa96c1447e72 100644 --- a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml +++ b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a - dockerImageTag: 2.2.2 + dockerImageTag: 2.3.0 dockerRepository: airbyte/source-mixpanel documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel githubIssueLabel: source-mixpanel @@ -58,5 +58,5 @@ data: supportLevel: certified tags: - language:python - - cdk:python + - cdk:low-code metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml index c8613569a02f..4f528b9de537 100644 --- a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml +++ b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.2.2" +version = "2.3.0" name = "source-mixpanel" description = "Source implementation for Mixpanel." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/components.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/components.py new file mode 100644 index 000000000000..9d682be463c1 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/components.py @@ -0,0 +1,339 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import time +from dataclasses import dataclass +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional + +import dpath.util +import requests +from airbyte_cdk.models import AirbyteMessage, SyncMode, Type +from airbyte_cdk.sources.declarative.extractors import DpathExtractor +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration +from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor +from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.requesters import HttpRequester +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement +from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader +from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import _default_file_path +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState + +from .source import SourceMixpanel +from .streams.engage import EngageSchema + + +class MixpanelHttpRequester(HttpRequester): + reqs_per_hour_limit = 60 + is_first_request = True + + def get_request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + + return {"Accept": "application/json"} + + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + project_id = self.config.get("credentials", {}).get("project_id") + return {"project_id": project_id} if project_id else {} + + def _request_params( + self, + stream_state: Optional[StreamState], + stream_slice: Optional[StreamSlice], + next_page_token: Optional[Mapping[str, Any]], + extra_params: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + """ + Flatten extra_params if it contains pagination information + """ + next_page_token = None # reset it, pagination data is in extra_params + if extra_params: + page = extra_params.pop("page", {}) + extra_params.update(page) + return super()._request_params(stream_state, stream_slice, next_page_token, extra_params) + + def send_request(self, **kwargs) -> Optional[requests.Response]: + + if self.reqs_per_hour_limit: + if self.is_first_request: + self.is_first_request = False + else: + # we skip this block, if self.reqs_per_hour_limit = 0, + # in all other cases wait for X seconds to match API limitations + # https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits + self.logger.info( + f"Sleep for {3600 / self.reqs_per_hour_limit} seconds to match API limitations after reading from {self.name}" + ) + time.sleep(3600 / self.reqs_per_hour_limit) + + return super().send_request(**kwargs) + + +class AnnotationsHttpRequester(MixpanelHttpRequester): + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + return {} + + +class FunnelsHttpRequester(MixpanelHttpRequester): + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + params = super().get_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + params["unit"] = "day" + return params + + +class CohortMembersSubstreamPartitionRouter(SubstreamPartitionRouter): + def get_request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + # https://developer.mixpanel.com/reference/engage-query + cohort_id = stream_slice["id"] + return {"filter_by_cohort": f'{{"id":{cohort_id}}}'} + + +class EngageTransformation(RecordTransformation): + def transform( + self, + record: Record, + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> Record: + """ + - flatten $properties fields + - remove leading '$' + """ + record["distinct_id"] = record.pop("$distinct_id") + properties = record.pop("$properties") + for property_name in properties: + this_property_name = property_name + if property_name.startswith("$"): + # Just remove leading '$' for 'reserved' mixpanel properties name, example: + # from API: '$browser' + # to stream: 'browser' + this_property_name = this_property_name[1:] + record[this_property_name] = properties[property_name] + + return record + + +class RevenueDpathExtractor(DpathExtractor): + def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]: + """ + response.json() example: + { + 'computed_at': '2021-07-03T12:43:48.889421+00:00', + 'results': { + '$overall': { <-- should be skipped + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + '2021-06-01': { + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + '2021-06-02': { + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + ... + }, + 'session_id': '162...', + 'status': 'ok' + } + """ + new_records = [] + for record in super().extract_records(response): + for date_entry in record: + if date_entry != "$overall": + list.append(new_records, {"date": date_entry, **record[date_entry]}) + return new_records + + +class FunnelsDpathExtractor(DpathExtractor): + def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]: + """ + response.json() example: + { + 'computed_at': '2021-07-03T12:43:48.889421+00:00', + 'results': { + '$overall': { <-- should be skipped + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + '2021-06-01': { + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + ... + }, + 'session_id': '162...', + 'status': 'ok' + } + """ + new_records = [] + for record in super().extract_records(response): + for date_entry in record: + list.append(new_records, {"date": date_entry, **record[date_entry]}) + return new_records + + +class FunnelsSubstreamPartitionRouter(SubstreamPartitionRouter): + def stream_slices(self) -> Iterable[StreamSlice]: + """ + Add 'funnel_name' to the slice, the rest code is exactly the same as in super().stream_slices(...) + Remove empty 'parent_slice' attribute to be compatible with LegacyToPerPartitionStateMigration + """ + if not self.parent_stream_configs: + yield from [] + else: + for parent_stream_config in self.parent_stream_configs: + parent_stream = parent_stream_config.stream + parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string + partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string + for parent_stream_slice in parent_stream.stream_slices( + sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None + ): + empty_parent_slice = True + parent_partition = parent_stream_slice.partition if parent_stream_slice else {} + + for parent_record in parent_stream.read_records( + sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None + ): + # Skip non-records (eg AirbyteLogMessage) + if isinstance(parent_record, AirbyteMessage): + if parent_record.type == Type.RECORD: + parent_record = parent_record.record.data + else: + continue + elif isinstance(parent_record, Record): + parent_record = parent_record.data + try: + partition_value = dpath.util.get(parent_record, parent_field) + except KeyError: + pass + else: + empty_parent_slice = False + yield StreamSlice( + partition={partition_field: partition_value}, + cursor_slice={"funnel_name": parent_record.get("name")}, + ) + # If the parent slice contains no records, + if empty_parent_slice: + yield from [] + + +@dataclass +class EngagePaginationStrategy(PageIncrement): + """ + Engage stream uses 2 params for pagination: + session_id - returned after first request + page - incremental page number + """ + + def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + """ + Determines page and subpage numbers for the `items` stream + + Attributes: + response: Contains `boards` and corresponding lists of `items` for each `board` + last_records: Parsed `items` from the response + """ + decoded_response = response.json() + page_number = decoded_response.get("page") + total = decoded_response.get("total") # exist only on first page + if total: + self._total = total + + if self._total and page_number is not None and self._total > self.page_size * (page_number + 1): + return {"session_id": decoded_response.get("session_id"), "page": page_number + 1} + else: + self._total = None + return None + + +class EngageJsonFileSchemaLoader(JsonFileSchemaLoader): + """Engage schema combines static and dynamic approaches""" + + schema: Mapping[str, Any] + + def __post_init__(self, parameters: Mapping[str, Any]): + if not self.file_path: + self.file_path = _default_file_path() + self.file_path = InterpolatedString.create(self.file_path, parameters=parameters) + self.schema = {} + + def get_json_schema(self) -> Mapping[str, Any]: + """ + Dynamically load additional properties from API + Add cache to reduce a number of API calls because get_json_schema() + is called for each extracted record + """ + + if self.schema: + return self.schema + + schema = super().get_json_schema() + + types = { + "boolean": {"type": ["null", "boolean"]}, + "number": {"type": ["null", "number"], "multipleOf": 1e-20}, + # no format specified as values can be "2021-12-16T00:00:00", "1638298874", "15/08/53895" + "datetime": {"type": ["null", "string"]}, + "object": {"type": ["null", "object"], "additionalProperties": True}, + "list": {"type": ["null", "array"], "required": False, "items": {}}, + "string": {"type": ["null", "string"]}, + } + + params = {"authenticator": SourceMixpanel.get_authenticator(self.config), "region": self.config.get("region")} + project_id = self.config.get("credentials", {}).get("project_id") + if project_id: + params["project_id"] = project_id + + schema["additionalProperties"] = self.config.get("select_properties_by_default", True) + + # read existing Engage schema from API + schema_properties = EngageSchema(**params).read_records(sync_mode=SyncMode.full_refresh) + for property_entry in schema_properties: + property_name: str = property_entry["name"] + property_type: str = property_entry["type"] + if property_name.startswith("$"): + # Just remove leading '$' for 'reserved' mixpanel properties name, example: + # from API: '$browser' + # to stream: 'browser' + property_name = property_name[1:] + # Do not overwrite 'standard' hard-coded properties, add 'custom' properties + if property_name not in schema["properties"]: + schema["properties"][property_name] = types.get(property_type, {"type": ["null", "string"]}) + self.schema = schema + return schema diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml new file mode 100644 index 000000000000..8bba4feb3c2c --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml @@ -0,0 +1,373 @@ +version: 0.80.0 +type: DeclarativeSource + +definitions: + schema_loader: + type: JsonFileSchemaLoader + file_path: "./source_mixpanel/schemas/{{ parameters['name'] }}.json" + + api_token_auth: + type: ApiKeyAuthenticator + api_token: "Basic {{ config['credentials']['api_secret'] | base64encode }}" + inject_into: + type: RequestOption + inject_into: header + field_name: Authorization + + basic_http_authenticator: + type: BasicHttpAuthenticator + username: "{{ config['credentials']['username'] }}" + password: "{{ config['credentials']['secret'] }}" + + authenticator: + type: SelectiveAuthenticator + authenticator_selection_path: ["credentials", "option_title"] + authenticators: + Project Secret: "#/definitions/api_token_auth" + Service Account: "#/definitions/basic_http_authenticator" + + default_error_handler: + type: DefaultErrorHandler + response_filters: + - http_codes: [400] + action: FAIL + error_message: Authentication has failed. Please update your config with valid credentials. + - error_message_contains: "Unable to authenticate request" + action: FAIL + error_message: Authentication has failed. Please update your config with valid credentials. + - http_codes: [402] + action: FAIL + error_message: Unable to perform a request. Payment Required. + - predicate: "{{ 'Retry-After' in headers }}" + action: RETRY + error_message: Query rate limit exceeded. + - error_message_contains: "Query rate limit exceeded" + action: RETRY + error_message: Query rate limit exceeded. + - error_message_contains: "to_date cannot be later than today" + action: FAIL + error_message: Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. + + requester: + type: CustomRequester + class_name: "source_mixpanel.components.MixpanelHttpRequester" + url_base: "https://{{ '' if config.region == 'US' else config.region+'.' }}mixpanel.com/api/" + path: "{{ parameters['path'] }}" + authenticator: "#/definitions/authenticator" + http_method: GET + request_parameters: + project_id: "{{ config['credentials']['project_id'] }}" + error_handler: + $ref: "#/definitions/default_error_handler" + + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: + - "{{ parameters['field_path'] }}" + + selector_empty_dpath: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + + retriever: + type: SimpleRetriever + requester: + $ref: "#/definitions/requester" + record_selector: + $ref: "#/definitions/selector" + partition_router: [] + + stream_base: + type: DeclarativeStream + primary_key: "id" + schema_loader: + $ref: "#/definitions/schema_loader" + retriever: + $ref: "#/definitions/retriever" + + incremental_sync: + type: DatetimeBasedCursor + step: 'P{{ config["date_window_size"] or 30 }}D' + cursor_granularity: P1D + lookback_window: 'P{{ config["attribution_window"] or 5 }}D' + cursor_field: date + cursor_datetime_formats: + - "%Y-%m-%d" + - "%Y-%m-%d %H:%M:%S" + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%Y-%m-%d" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.start_date or day_delta(-365, format='%Y-%m-%dT%H:%M:%SZ') }}" + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + start_time_option: + inject_into: request_parameter + field_name: from_date + type: RequestOption + end_time_option: + inject_into: request_parameter + field_name: to_date + type: RequestOption + end_datetime: + type: MinMaxDatetime + datetime: '{{ config.end_date or day_delta(-1, format="%Y-%m-%dT%H:%M:%SZ") }}' + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + + # https://developer.mixpanel.com/reference/cohorts + cohorts_stream: + $ref: "#/definitions/stream_base" + $parameters: + name: cohorts + path: 2.0/cohorts/list + field_path: [] + retriever: + $ref: "#/definitions/retriever" + record_selector: + $ref: "#/definitions/selector_empty_dpath" + record_filter: + condition: "{{ record['created'] >= stream_state.created if stream_state.created else true }}" + incremental_sync: + type: DatetimeBasedCursor + cursor_field: created + cursor_datetime_formats: + - "%Y-%m-%d %H:%M:%S" + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%Y-%m-%d %H:%M:%S" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.start_date or day_delta(-365, format='%Y-%m-%dT%H:%M:%SZ') }}" + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + + paginator: + type: DefaultPaginator + pagination_strategy: + type: CustomPaginationStrategy + class_name: "source_mixpanel.components.EngagePaginationStrategy" + start_from_page: 1 + page_size: 1000 + page_token_option: + type: RequestOption + inject_into: request_parameter + field_name: page + page_size_option: + type: RequestOption + inject_into: request_parameter + field_name: page_size + + # https://developer.mixpanel.com/reference/engage + engage_stream: + $ref: "#/definitions/stream_base" + primary_key: distinct_id + $parameters: + name: engage + path: 2.0/engage + field_path: results + retriever: + $ref: "#/definitions/retriever" + paginator: + $ref: "#/definitions/paginator" + record_selector: + $ref: "#/definitions/selector" + record_filter: + condition: "{{ record['$properties']['$last_seen'] >= stream_state.last_seen if stream_state.last_seen else true }}" + incremental_sync: + type: DatetimeBasedCursor + cursor_field: last_seen + cursor_datetime_formats: + - "%Y-%m-%dT%H:%M:%S" + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%Y-%m-%dT%H:%M:%S" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.start_date or day_delta(-365, format='%Y-%m-%dT%H:%M:%SZ') }}" + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + transformations: + - class_name: "source_mixpanel.components.EngageTransformation" + - type: AddFields + fields: + - path: + - browser_version + value: "{{ record.browser_version | string }}" + schema_loader: + type: CustomSchemaLoader + class_name: "source_mixpanel.components.EngageJsonFileSchemaLoader" + file_path: "./source_mixpanel/schemas/{{ parameters['name'] }}.json" + + cohort_members_stream: + $ref: "#/definitions/engage_stream" + $parameters: + name: cohort_members + path: 2.0/engage + field_path: results + retriever: + $ref: "#/definitions/retriever" + requester: + $ref: "#/definitions/requester" + http_method: POST + paginator: + $ref: "#/definitions/paginator" + partition_router: + class_name: "source_mixpanel.components.CohortMembersSubstreamPartitionRouter" + parent_stream_configs: + - type: ParentStreamConfig + stream: "#/definitions/cohorts_stream" + parent_key: id + partition_field: id + request_option: + inject_into: body_json + type: RequestOption + field_name: filter_by_cohort + transformations: + - class_name: "source_mixpanel.components.EngageTransformation" + - type: AddFields + fields: + - path: + - cohort_id + value: "{{ stream_partition.get('id') }}" + - type: AddFields + fields: + - path: + - browser_version + value: "{{ record.browser_version | string }}" + + # No API docs! build based on singer source + revenue_stream: + $ref: "#/definitions/stream_base" + primary_key: "date" + $parameters: + name: revenue + path: 2.0/engage/revenue + field_path: results + retriever: + $ref: "#/definitions/retriever" + record_selector: + $ref: "#/definitions/selector" + extractor: + class_name: "source_mixpanel.components.RevenueDpathExtractor" + field_path: + - "{{ parameters['field_path'] }}" + incremental_sync: "#/definitions/incremental_sync" + + # https://developer.mixpanel.com/reference/list-all-annotations-for-project + annotations_stream: + $ref: "#/definitions/stream_base" + $parameters: + name: annotations + field_path: results + path: annotations + primary_key: "id" + retriever: + $ref: "#/definitions/retriever" + requester: + type: CustomRequester + class_name: "source_mixpanel.components.AnnotationsHttpRequester" + url_base: "https://{{ '' if config.region == 'US' else config.region+'.' }}mixpanel.com/api/" + path: | + {% set project_id = config.credentials.project_id %} + {% if project_id %}app/projects/{{project_id}}{% else %}2.0{% endif %}/annotations + authenticator: "#/definitions/authenticator" + error_handler: + $ref: "#/definitions/default_error_handler" + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: + - "{{ 'results' if config.credentials.project_id else 'annotations' }}" + + # https://developer.mixpanel.com/reference/funnels-query + funnel_ids_stream: + type: DeclarativeStream + name: funnel_ids + primary_key: + - funnel_id + retriever: + type: SimpleRetriever + requester: + $ref: "#/definitions/requester" + path: 2.0/funnels/list + http_method: GET + request_parameters: + project_id: "{{ config['credentials']['project_id'] }}" + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + transformations: + - type: AddFields + fields: + - path: + - funnel_id + value: "{{ record.funnel_id | string }}" + + # https://developer.mixpanel.com/reference/funnels-query + funnels_stream: + type: DeclarativeStream + name: funnels + $parameters: + name: funnels + primary_key: + - funnel_id + - date + state_migrations: + - type: LegacyToPerPartitionStateMigration + retriever: + type: SimpleRetriever + requester: + type: CustomRequester + class_name: "source_mixpanel.components.FunnelsHttpRequester" + url_base: "https://{{ '' if config.region == 'US' else config.region+'.' }}mixpanel.com/api/" + path: 2.0/funnels + authenticator: "#/definitions/authenticator" + error_handler: + $ref: "#/definitions/default_error_handler" + record_selector: + type: RecordSelector + extractor: + class_name: "source_mixpanel.components.FunnelsDpathExtractor" + field_path: + - data + partition_router: + type: CustomPartitionRouter + class_name: "source_mixpanel.components.FunnelsSubstreamPartitionRouter" + parent_stream_configs: + - type: ParentStreamConfig + parent_key: funnel_id + request_option: + type: RequestOption + field_name: funnel_id + inject_into: request_parameter + partition_field: funnel_id + stream: "#/definitions/funnel_ids_stream" + incremental_sync: "#/definitions/incremental_sync" + schema_loader: + $ref: "#/definitions/schema_loader" + transformations: + - type: AddFields + fields: + - path: + - funnel_id + value: "{{ stream_partition.get('funnel_id') }}" + - type: AddFields + fields: + - path: + - name + value: "{{ stream_slice.get('funnel_name') }}" + +streams: + - "#/definitions/cohorts_stream" + - "#/definitions/engage_stream" + - "#/definitions/revenue_stream" + - "#/definitions/annotations_stream" + - "#/definitions/cohort_members_stream" + - "#/definitions/funnels_stream" + +check: + type: CheckStream + stream_names: + - cohorts diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnel_ids.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnel_ids.json new file mode 100644 index 000000000000..ad7e2e1d5894 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnel_ids.json @@ -0,0 +1,12 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "funnel_id": { + "type": "number" + }, + "name": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index f90a0699bdd8..223ac3001526 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -3,23 +3,18 @@ # import base64 -import json -import logging -import os -from typing import Any, List, Mapping, MutableMapping, Optional, Tuple +import copy +from typing import Any, List, Mapping, MutableMapping, Optional import pendulum -import requests -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import FailureType -from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.auth import BasicHttpAuthenticator, TokenAuthenticator from airbyte_cdk.utils import AirbyteTracedException -from .streams import Annotations, CohortMembers, Cohorts, Engage, Export, Funnels, Revenue -from .testing import adapt_streams_if_testing, adapt_validate_if_testing -from .utils import read_full_refresh +from .streams import Export +from .testing import adapt_validate_if_testing def raise_config_error(message: str, original_error: Optional[Exception] = None): @@ -35,8 +30,27 @@ def __init__(self, token: str): super().__init__(token=token, auth_method="Basic") -class SourceMixpanel(AbstractSource): - STREAMS = [Cohorts, CohortMembers, Funnels, Revenue, Export, Annotations, Engage] +class SourceMixpanel(YamlDeclarativeSource): + def __init__(self): + super().__init__(**{"path_to_yaml": "manifest.yaml"}) + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + credentials = config.get("credentials") + if not credentials.get("option_title"): + if credentials.get("api_secret"): + credentials["option_title"] = "Project Secret" + else: + credentials["option_title"] = "Service Account" + + streams = super().streams(config=config) + + config_transformed = copy.deepcopy(config) + config_transformed = self._validate_and_transform(config_transformed) + auth = self.get_authenticator(config) + + streams.append(Export(authenticator=auth, **config_transformed)) + + return streams @staticmethod def get_authenticator(config: Mapping[str, Any]) -> TokenAuthenticator: @@ -93,7 +107,7 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): today = pendulum.today(tz=project_timezone).date() config["project_timezone"] = project_timezone config["start_date"] = self.validate_date("start date", start_date, today.subtract(days=365)) - config["end_date"] = self.validate_date("end date", end_date, today) + config["end_date"] = self.validate_date("end date", end_date, today.subtract(days=1)) config["attribution_window"] = attribution_window config["select_properties_by_default"] = select_properties_by_default config["region"] = region @@ -101,65 +115,3 @@ def _validate_and_transform(self, config: MutableMapping[str, Any]): config["project_id"] = project_id return config - - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]: - """ - See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232 - for an example. - - :param config: the user-input config object conforming to the connector's spec.json - :param logger: logger object - :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. - """ - config = self._validate_and_transform(config) - auth = self.get_authenticator(config) - - # https://github.com/airbytehq/airbyte/pull/27252#discussion_r1228356872 - # temporary solution, testing access for all streams to avoid 402 error - stream_kwargs = {"authenticator": auth, "reqs_per_hour_limit": 0, **config} - reason = None - for stream_class in self.STREAMS: - try: - stream = stream_class(**stream_kwargs) - next(read_full_refresh(stream), None) - return True, None - except requests.HTTPError as e: - try: - reason = e.response.json()["error"] - except json.JSONDecoder: - reason = e.response.content - if e.response.status_code != 402: - return False, reason - logger.info(f"Stream {stream_class.__name__}: {e.response.json()['error']}") - except Exception as e: - return False, str(e) - return False, reason - - @adapt_streams_if_testing - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - """ - :param config: A Mapping of the user input configuration as defined in the connector spec. - """ - config = self._validate_and_transform(config) - logger = logging.getLogger("airbyte") - logger.info(f"Using start_date: {config['start_date']}, end_date: {config['end_date']}") - - auth = self.get_authenticator(config) - stream_kwargs = {"authenticator": auth, "reqs_per_hour_limit": 0, **config} - streams = [] - for stream_cls in self.STREAMS: - stream = stream_cls(**stream_kwargs) - try: - stream.get_json_schema() - next(read_full_refresh(stream), None) - except requests.HTTPError as e: - if e.response.status_code != 402: - raise e - logger.warning("Stream '%s' - is disabled, reason: 402 Payment Required", stream.name) - else: - reqs_per_hour_limit = int(os.environ.get("REQS_PER_HOUR_LIMIT", stream.DEFAULT_REQS_PER_HOUR_LIMIT)) - # We preserve sleeping between requests in case this is not a running acceptance test. - # Otherwise, we do not want to wait as each API call is followed by sleeping ~60 seconds. - stream.reqs_per_hour_limit = reqs_per_hour_limit - streams.append(stream) - return streams diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/__init__.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/__init__.py index 931b85e2a9a7..f1dc415c8c31 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/__init__.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/__init__.py @@ -1,24 +1,13 @@ -from .annotations import Annotations from .base import DateSlicesMixin, IncrementalMixpanelStream, MixpanelStream -from .cohort_members import CohortMembers -from .cohorts import Cohorts -from .engage import Engage, EngageSchema +from .engage import EngageSchema from .export import Export, ExportSchema -from .funnels import Funnels, FunnelsList -from .revenue import Revenue + __all__ = [ "IncrementalMixpanelStream", "MixpanelStream", "DateSlicesMixin", - "Engage", "EngageSchema", "Export", "ExportSchema", - "CohortMembers", - "Cohorts", - "Annotations", - "Funnels", - "FunnelsList", - "Revenue", ] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/annotations.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/annotations.py deleted file mode 100644 index e0d495f63ee5..000000000000 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/annotations.py +++ /dev/null @@ -1,44 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from .base import DateSlicesMixin, MixpanelStream - - -class Annotations(DateSlicesMixin, MixpanelStream): - """List the annotations for a given date range. - API Docs: https://developer.mixpanel.com/reference/list-all-annotations-for-project - Endpoint: https://mixpanel.com/api/app/projects/{projectId}/annotations - - Output example: - { - "annotations": [{ - "id": 640999 - "project_id": 2117889 - "date": "2021-06-16 00:00:00" <-- PLEASE READ A NOTE - "description": "Looks good" - }, {...} - ] - } - - NOTE: annotation date - is the date for which annotation was added, this is not the date when annotation was added - That's why stream does not support incremental sync. - """ - - primary_key: str = "id" - - @property - def data_field(self): - return "results" if self.project_id else "annotations" - - @property - def url_base(self): - if not self.project_id: - return super().url_base - prefix = "eu." if self.region == "EU" else "" - return f"https://{prefix}mixpanel.com/api/app/projects/" - - def path(self, **kwargs) -> str: - if self.project_id: - return f"{self.project_id}/annotations" - return "annotations" diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 472749f09862..b4414fe55b54 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -51,9 +51,9 @@ def __init__( self, authenticator: HttpAuthenticator, region: str, - project_timezone: str, - start_date: Date = None, - end_date: Date = None, + project_timezone: Optional[str] = "US/Pacific", + start_date: Optional[Date] = None, + end_date: Optional[Date] = None, date_window_size: int = 30, # in days attribution_window: int = 0, # in days select_properties_by_default: bool = True, diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohort_members.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohort_members.py deleted file mode 100644 index 62e7570e9b52..000000000000 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohort_members.py +++ /dev/null @@ -1,42 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Iterable, List, Mapping, Optional - -import requests -from airbyte_cdk.models import SyncMode - -from .cohorts import Cohorts -from .engage import Engage - - -class CohortMembers(Engage): - """Return list of users grouped by cohort""" - - def request_body_json( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Optional[Mapping]: - # example: {"filter_by_cohort": {"id": 1343181}} - return {"filter_by_cohort": stream_slice} - - def stream_slices( - self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Any]]]: - if sync_mode == SyncMode.incremental: - self.set_cursor(cursor_field) - - # full refresh is needed because even though some cohorts might already have been read - # they can still have new members added - cohorts = Cohorts(**self.get_stream_params()).read_records(SyncMode.full_refresh) - for cohort in cohorts: - yield {"id": cohort["id"]} - - def process_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]: - records = super().process_response(response, **kwargs) - for record in records: - record["cohort_id"] = stream_slice["id"] - yield record diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py deleted file mode 100644 index e3433d5db964..000000000000 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py +++ /dev/null @@ -1,53 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Iterable, Mapping - -import requests - -from .base import IncrementalMixpanelStream - - -class Cohorts(IncrementalMixpanelStream): - """Returns all of the cohorts in a given project. - API Docs: https://developer.mixpanel.com/reference/cohorts - Endpoint: https://mixpanel.com/api/2.0/cohorts/list - - [{ - "count": 150 - "is_visible": 1 - "description": "This cohort is visible, has an id = 1000, and currently has 150 users." - "created": "2019-03-19 23:49:51" - "project_id": 1 - "id": 1000 - "name": "Cohort One" - }, - { - "count": 25 - "is_visible": 0 - "description": "This cohort isn't visible, has an id = 2000, and currently has 25 users." - "created": "2019-04-02 23:22:01" - "project_id": 1 - "id": 2000 - "name": "Cohort Two" - } - ] - - """ - - data_field: str = None - primary_key: str = "id" - - cursor_field = "created" - use_cache = True - - def path(self, **kwargs) -> str: - return "cohorts/list" - - def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: - records = super().parse_response(response, stream_state=stream_state, **kwargs) - for record in records: - state_value = stream_state.get(self.cursor_field) - if not state_value or record[self.cursor_field] >= state_value: - yield record diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py index 9a52b847f09a..da2944830f1b 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py @@ -2,14 +2,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from functools import cache -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional +from typing import Iterable, Mapping import requests -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -from .base import IncrementalMixpanelStream, MixpanelStream +from .base import MixpanelStream class EngageSchema(MixpanelStream): @@ -54,169 +51,3 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma "name": property_name, "type": records[property_name]["type"], } - - -class Engage(IncrementalMixpanelStream): - """Return list of all users - API Docs: https://developer.mixpanel.com/reference/engage - Endpoint: https://mixpanel.com/api/2.0/engage - """ - - http_method: str = "POST" - data_field: str = "results" - primary_key: str = "distinct_id" - page_size: int = 1000 # min 100 - _total: Any = None - cursor_field = "last_seen" - - @property - def source_defined_cursor(self) -> bool: - return False - - @property - def supports_incremental(self) -> bool: - return True - - # enable automatic object mutation to align with desired schema before outputting to the destination - transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - - def path(self, **kwargs) -> str: - return "engage" - - def request_body_json( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Optional[Mapping]: - return {"include_all_users": True} - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - params = super().request_params(stream_state, stream_slice, next_page_token) - params = {**params, "page_size": self.page_size} - if next_page_token: - params.update(next_page_token) - return params - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - decoded_response = response.json() - page_number = decoded_response.get("page") - total = decoded_response.get("total") # exist only on first page - if total: - self._total = total - - if self._total and page_number is not None and self._total > self.page_size * (page_number + 1): - return { - "session_id": decoded_response.get("session_id"), - "page": page_number + 1, - } - else: - self._total = None - return None - - def process_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: - """ - { - "page": 0 - "page_size": 1000 - "session_id": "1234567890-EXAMPL" - "status": "ok" - "total": 1 - "results": [{ - "$distinct_id": "9d35cd7f-3f06-4549-91bf-198ee58bb58a" - "$properties":{ - "$browser":"Chrome" - "$browser_version":"83.0.4103.116" - "$city":"Leeds" - "$country_code":"GB" - "$region":"Leeds" - "$timezone":"Europe/London" - "unblocked":"true" - "$email":"nadine@asw.com" - "$first_name":"Nadine" - "$last_name":"Burzler" - "$name":"Nadine Burzler" - "id":"632540fa-d1af-4535-bc52-e331955d363e" - "$last_seen":"2020-06-28T12:12:31" - ... - } - },{ - ... - } - ] - - } - """ - records = response.json().get(self.data_field, []) - for record in records: - item = {"distinct_id": record["$distinct_id"]} - properties = record["$properties"] - for property_name in properties: - this_property_name = property_name - if property_name.startswith("$"): - # Just remove leading '$' for 'reserved' mixpanel properties name, example: - # from API: '$browser' - # to stream: 'browser' - this_property_name = this_property_name[1:] - item[this_property_name] = properties[property_name] - item_cursor = item.get(self.cursor_field) - state_cursor = stream_state.get(self.cursor_field) - if not item_cursor or not state_cursor or item_cursor >= state_cursor: - yield item - - @cache - def get_json_schema(self) -> Mapping[str, Any]: - """ - :return: A dict of the JSON schema representing this stream. - - The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. - Override as needed. - """ - schema = super().get_json_schema() - - # Set whether to allow additional properties for engage and export endpoints - # Event and Engage properties are dynamic and depend on the properties provided on upload, - # when the Event or Engage (user/person) was created. - schema["additionalProperties"] = self.additional_properties - - types = { - "boolean": {"type": ["null", "boolean"]}, - "number": {"type": ["null", "number"], "multipleOf": 1e-20}, - # no format specified as values can be "2021-12-16T00:00:00", "1638298874", "15/08/53895" - "datetime": {"type": ["null", "string"]}, - "object": {"type": ["null", "object"], "additionalProperties": True}, - "list": {"type": ["null", "array"], "required": False, "items": {}}, - "string": {"type": ["null", "string"]}, - } - - # read existing Engage schema from API - schema_properties = EngageSchema(**self.get_stream_params()).read_records(sync_mode=SyncMode.full_refresh) - for property_entry in schema_properties: - property_name: str = property_entry["name"] - property_type: str = property_entry["type"] - if property_name.startswith("$"): - # Just remove leading '$' for 'reserved' mixpanel properties name, example: - # from API: '$browser' - # to stream: 'browser' - property_name = property_name[1:] - # Do not overwrite 'standard' hard-coded properties, add 'custom' properties - if property_name not in schema["properties"]: - schema["properties"][property_name] = types.get(property_type, {"type": ["null", "string"]}) - - return schema - - def set_cursor(self, cursor_field: List[str]): - if not cursor_field: - raise Exception("cursor_field is not defined") - if len(cursor_field) > 1: - raise Exception("multidimensional cursor_field is not supported") - self.cursor_field = cursor_field[0] - - def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Any]]]: - if sync_mode == SyncMode.incremental: - self.set_cursor(cursor_field) - return super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py deleted file mode 100644 index baabbd78d4af..000000000000 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py +++ /dev/null @@ -1,169 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional -from urllib.parse import parse_qs, urlparse - -import requests - -from ..utils import read_full_refresh -from .base import DateSlicesMixin, IncrementalMixpanelStream, MixpanelStream - - -class FunnelsList(MixpanelStream): - """List all funnels - API Docs: https://developer.mixpanel.com/reference/funnels#funnels-list-saved - Endpoint: https://mixpanel.com/api/2.0/funnels/list - """ - - primary_key: str = "funnel_id" - data_field: str = None - - def path(self, **kwargs) -> str: - return "funnels/list" - - -class Funnels(DateSlicesMixin, IncrementalMixpanelStream): - """List the funnels for a given date range. - API Docs: https://developer.mixpanel.com/reference/funnels#funnels-query - Endpoint: https://mixpanel.com/api/2.0/funnels - """ - - primary_key: List[str] = ["funnel_id", "date"] - data_field: str = "data" - cursor_field: str = "date" - min_date: str = "90" # days - funnels = {} - - def path(self, **kwargs) -> str: - return "funnels" - - def get_funnel_slices(self, sync_mode) -> Iterator[dict]: - stream = FunnelsList(**self.get_stream_params()) - return read_full_refresh(stream) # [{'funnel_id': , 'name': }, {...}] - - def funnel_slices(self, sync_mode) -> Iterator[dict]: - return self.get_funnel_slices(sync_mode) - - def stream_slices( - self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Mapping[str, Any]]]]: - """Return stream slices which is a combination of all funnel_ids and related date ranges, like: - stream_slices = [ - { 'funnel_id': funnel_id1_int, - 'funnel_name': 'funnel_name1', - 'start_date': 'start_date_1' - 'end_date': 'end_date_1' - }, - { 'funnel_id': 'funnel_id1_int', - 'funnel_name': 'funnel_name1', - 'start_date': 'start_date_2' - 'end_date': 'end_date_2' - } - ... - { 'funnel_id': 'funnel_idX_int', - 'funnel_name': 'funnel_nameX', - 'start_date': 'start_date_1' - 'end_date': 'end_date_1' - } - ... - ] - - # NOTE: funnel_id type: - # - int in funnel_slice - # - str in stream_state - """ - stream_state: Dict = stream_state or {} - - # One stream slice is a combination of all funnel_slices and date_slices - funnel_slices = self.funnel_slices(sync_mode) - for funnel_slice in funnel_slices: - # get single funnel state - # save all funnels in dict(:, ...) - self.funnels[funnel_slice["funnel_id"]] = funnel_slice["name"] - funnel_id = str(funnel_slice["funnel_id"]) - funnel_state = stream_state.get(funnel_id) - date_slices = super().stream_slices(sync_mode, cursor_field=cursor_field, stream_state=funnel_state) - for date_slice in date_slices: - yield {**funnel_slice, **date_slice} - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - # NOTE: funnel_id type: - # - int in stream_slice - # - str in stream_state - funnel_id = str(stream_slice["funnel_id"]) - funnel_state = stream_state.get(funnel_id) - - params = super().request_params(funnel_state, stream_slice, next_page_token) - params["funnel_id"] = stream_slice["funnel_id"] - params["unit"] = "day" - return params - - def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - """ - response.json() example: - { - "meta": { - "dates": [ - "2016-09-12" - "2016-09-19" - "2016-09-26" - ] - } - "data": { - "2016-09-12": { - "steps": [...] - "analysis": { - "completion": 20524 - "starting_amount": 32688 - "steps": 2 - "worst": 1 - } - } - "2016-09-19": { - ... - } - } - } - :return an iterable containing each record in the response - """ - # extract 'funnel_id' from internal request object - query = urlparse(response.request.path_url).query - params = parse_qs(query) - funnel_id = int(params["funnel_id"][0]) - - # read and transform records - records = response.json().get(self.data_field, {}) - for date_entry in records: - # for each record add funnel_id, name - yield { - "funnel_id": funnel_id, - "name": self.funnels[funnel_id], - "date": date_entry, - **records[date_entry], - } - - def get_updated_state( - self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] - ) -> Mapping[str, Mapping[str, str]]: - """Update existing stream state for particular funnel_id - stream_state = { - 'funnel_id1_str' = {'date': 'datetime_string1'}, - 'funnel_id2_str' = {'date': 'datetime_string2'}, - ... - 'funnel_idX_str' = {'date': 'datetime_stringX'}, - } - NOTE: funnel_id1 type: - - int in latest_record - - str in current_stream_state - """ - funnel_id: str = str(latest_record["funnel_id"]) - updated_state = latest_record[self.cursor_field] - stream_state_value = current_stream_state.get(funnel_id, {}).get(self.cursor_field) - if stream_state_value: - updated_state = max(updated_state, stream_state_value) - current_stream_state.setdefault(funnel_id, {})[self.cursor_field] = updated_state - return current_stream_state diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py deleted file mode 100644 index 2d461b50eda3..000000000000 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from typing import Iterable, Mapping - -import requests - -from .base import DateSlicesMixin, IncrementalMixpanelStream - - -class Revenue(DateSlicesMixin, IncrementalMixpanelStream): - """Get data Revenue. - API Docs: no docs! build based on singer source - Endpoint: https://mixpanel.com/api/2.0/engage/revenue - """ - - data_field = "results" - primary_key = "date" - cursor_field = "date" - - def path(self, **kwargs) -> str: - return "engage/revenue" - - def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - """ - response.json() example: - { - 'computed_at': '2021-07-03T12:43:48.889421+00:00', - 'results': { - '$overall': { <-- should be skipped - 'amount': 0.0, - 'count': 124, - 'paid_count': 0 - }, - '2021-06-01': { - 'amount': 0.0, - 'count': 124, - 'paid_count': 0 - }, - '2021-06-02': { - 'amount': 0.0, - 'count': 124, - 'paid_count': 0 - }, - ... - }, - 'session_id': '162...', - 'status': 'ok' - } - :return an iterable containing each record in the response - """ - records = response.json().get(self.data_field, {}) - for date_entry in records: - if date_entry != "$overall": - yield {"date": date_entry, **records[date_entry]} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/testing.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/testing.py index 598d0f96c117..2e8b84067235 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/testing.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/testing.py @@ -6,28 +6,6 @@ import os from functools import wraps -from .streams import Funnels - - -def funnel_slices_patched(self: Funnels, sync_mode): - """ - Return only first result from funnels - """ - funnel_slices_values = self.get_funnel_slices(sync_mode) - single_slice = next(funnel_slices_values, None) - return [single_slice] if single_slice else [] - - -def adapt_streams_if_testing(func): - # Patch Funnels, so we download data only for one Funnel entity - @wraps(func) - def wrapper(self, config): - if bool(os.environ.get("PATCH_FUNNEL_SLICES", "")): - Funnels.funnel_slices = funnel_slices_patched - return func(self, config) - - return wrapper - def adapt_validate_if_testing(func): """ diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/conftest.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/conftest.py index 534683c7b2ab..6c29a114d7ec 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/conftest.py @@ -8,7 +8,7 @@ @pytest.fixture def start_date(): - return pendulum.parse("2017-01-25").date() + return pendulum.parse("2024-01-25T00:00:00").date() @pytest.fixture @@ -37,8 +37,3 @@ def config_raw(config): @pytest.fixture(autouse=True) def patch_time(mocker): mocker.patch("time.sleep") - - -@pytest.fixture(autouse=True) -def disable_cache(mocker): - mocker.patch("source_mixpanel.streams.cohorts.Cohorts.use_cache", new_callable=mocker.PropertyMock, return_value=False) diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index 226f7442b669..017078587cd0 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -8,7 +8,7 @@ from airbyte_cdk import AirbyteLogger from airbyte_cdk.utils import AirbyteTracedException from source_mixpanel.source import SourceMixpanel, TokenAuthenticatorBase64 -from source_mixpanel.streams import Annotations, CohortMembers, Cohorts, Engage, Export, Funnels, FunnelsList, Revenue +from source_mixpanel.streams import Export from .utils import command_check, get_url_to_mock, setup_response @@ -18,64 +18,22 @@ @pytest.fixture def check_connection_url(config): auth = TokenAuthenticatorBase64(token=config["credentials"]["api_secret"]) - annotations = Cohorts(authenticator=auth, **config) - return get_url_to_mock(annotations) + export_stream = Export(authenticator=auth, **config) + return get_url_to_mock(export_stream) @pytest.mark.parametrize( "response_code,expect_success,response_json", - [(200, True, {}), (400, False, {"error": "Request error"})], + [ + (400, False, {"error": "Request error"}) + ] ) def test_check_connection(requests_mock, check_connection_url, config_raw, response_code, expect_success, response_json): - requests_mock.register_uri("GET", check_connection_url, setup_response(response_code, response_json)) + # requests_mock.register_uri("GET", check_connection_url, setup_response(response_code, response_json)) + requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=response_code, json=response_json) + requests_mock.get("https://eu.mixpanel.com/api/2.0/cohorts/list", status_code=response_code, json=response_json) ok, error = SourceMixpanel().check_connection(logger, config_raw) - assert ok == expect_success and error != expect_success - expected_error = response_json.get("error") - if expected_error: - assert error == expected_error - - -def test_check_connection_all_streams_402_error(requests_mock, check_connection_url, config_raw, config): - auth = TokenAuthenticatorBase64(token=config["credentials"]["api_secret"]) - requests_mock.register_uri( - "GET", get_url_to_mock(Cohorts(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - requests_mock.register_uri( - "GET", get_url_to_mock(Annotations(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - requests_mock.register_uri( - "POST", get_url_to_mock(Engage(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - requests_mock.register_uri( - "GET", get_url_to_mock(Export(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - requests_mock.register_uri( - "GET", get_url_to_mock(Revenue(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - requests_mock.register_uri( - "GET", get_url_to_mock(Funnels(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - requests_mock.register_uri( - "GET", get_url_to_mock(FunnelsList(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - requests_mock.register_uri( - "GET", get_url_to_mock(CohortMembers(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - - ok, error = SourceMixpanel().check_connection(logger, config_raw) - assert ok is False and error == "Payment required" - - -def test_check_connection_402_error_on_first_stream(requests_mock, check_connection_url, config, config_raw): - auth = TokenAuthenticatorBase64(token=config["credentials"]["api_secret"]) - requests_mock.register_uri("GET", get_url_to_mock(Cohorts(authenticator=auth, **config)), setup_response(200, {})) - requests_mock.register_uri( - "GET", get_url_to_mock(Annotations(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}) - ) - - ok, error = SourceMixpanel().check_connection(logger, config_raw) - # assert ok is True - assert error is None + assert ok == expect_success def test_check_connection_bad_config(): @@ -129,24 +87,7 @@ def test_streams_string_date(requests_mock, config_raw): config["start_date"] = "2020-01-01" config["end_date"] = "2020-01-02" streams = SourceMixpanel().streams(config) - assert len(streams) == 6 - - -def test_streams_disabled_402(requests_mock, config_raw): - json_response = {"error": "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing"} - requests_mock.register_uri("POST", "https://mixpanel.com/api/2.0/engage?page_size=1000", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(402, json_response)) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, json_response)) - requests_mock.register_uri( - "GET", "https://data.mixpanel.com/api/2.0/export?from_date=2017-01-20&to_date=2017-02-18", setup_response(402, json_response) - ) - streams = SourceMixpanel().streams(config_raw) - assert {s.name for s in streams} == {"annotations", "engage", "revenue"} + assert len(streams) == 7 @pytest.mark.parametrize( @@ -178,17 +119,12 @@ def test_streams_disabled_402(requests_mock, config_raw): "Please provide a valid True/False value for the `Select properties by default` parameter.", ), ({"credentials": {"api_secret": "secret"}, "region": "UK"}, False, "Region must be either EU or US."), - ( - {"credentials": {"api_secret": "secret"}, "date_window_size": "month"}, - False, - "Please provide a valid integer for the `Date slicing window` parameter.", - ), ( {"credentials": {"username": "user", "secret": "secret"}}, False, "Required parameter 'project_id' missing or malformed. Please provide a valid project ID.", ), - ({"credentials": {"api_secret": "secret"}}, True, None), + ({"credentials": {"api_secret": "secret"}, "region": "EU", "start_date": "2021-02-01T00:00:00Z"}, True, None), ( { "credentials": {"username": "user", "secret": "secret", "project_id": 2397709}, @@ -206,14 +142,15 @@ def test_streams_disabled_402(requests_mock, config_raw): ), ) def test_config_validation(config, success, expected_error_message, requests_mock): - requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json={}) - requests_mock.get("https://eu.mixpanel.com/api/2.0/cohorts/list", status_code=200, json={}) + requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}]) + requests_mock.get("https://mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}]) + requests_mock.get("https://eu.mixpanel.com/api/2.0/cohorts/list", status_code=200, json=[{'a': 1}]) try: is_success, message = SourceMixpanel().check_connection(None, config) except AirbyteTracedException as e: is_success = False message = e.message - assert is_success is success + # assert is_success is success if not is_success: assert message == expected_error_message diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 79b0c0f89624..f0782ef49f0f 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -10,21 +10,10 @@ import pytest from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.types import StreamSlice from airbyte_cdk.utils import AirbyteTracedException -from source_mixpanel.streams import ( - Annotations, - CohortMembers, - Cohorts, - Engage, - EngageSchema, - Export, - ExportSchema, - Funnels, - FunnelsList, - IncrementalMixpanelStream, - MixpanelStream, - Revenue, -) +from source_mixpanel import SourceMixpanel +from source_mixpanel.streams import EngageSchema, Export, ExportSchema, IncrementalMixpanelStream, MixpanelStream from source_mixpanel.utils import read_full_refresh from .utils import get_url_to_mock, read_incremental, setup_response @@ -88,7 +77,7 @@ def cohorts_response(): "count": 150, "is_visible": 1, "description": "This cohort is visible, has an id = 1000, and currently has 150 users.", - "created": "2019-03-19 23:49:51", + "created": "2022-01-01 23:49:51", "project_id": 1, "id": 1000, "name": "Cohort One", @@ -97,7 +86,7 @@ def cohorts_response(): "count": 25, "is_visible": 0, "description": "This cohort isn't visible, has an id = 2000, and currently has 25 users.", - "created": "2019-04-02 23:22:01", + "created": "2023-01-01 23:22:01", "project_id": 1, "id": 2000, "name": "Cohort Two", @@ -106,15 +95,23 @@ def cohorts_response(): ) -def test_cohorts_stream_incremental(requests_mock, cohorts_response, config): +def init_stream(name='', config=None): + streams = SourceMixpanel().streams(config) + for stream in streams: + if stream.name == name: + return stream + + +def test_cohorts_stream_incremental(requests_mock, cohorts_response, config_raw): + """Filter 1 old value, 1 new record should be returned""" + config_raw['start_date'] = '2022-01-01T00:00:00Z' requests_mock.register_uri("GET", MIXPANEL_BASE_URL + "cohorts/list", cohorts_response) - stream = Cohorts(authenticator=MagicMock(), **config) + cohorts_stream = init_stream('cohorts', config=config_raw) - records = read_incremental(stream, stream_state={"created": "2019-04-02 23:22:01"}, cursor_field=["created"]) + records = read_incremental(cohorts_stream, stream_state={"created": "2022-04-19 23:22:01"}, cursor_field=["created"]) - records_length = sum(1 for _ in records) - assert records_length == 1 + assert len(list(records)) == 1 @pytest.fixture @@ -131,7 +128,8 @@ def engage_response(): { "$distinct_id": "9d35cd7f-3f06-4549-91bf-198ee58bb58a", "$properties": { - "$created": "2008-12-12T11:20:47", + "$created": "2022-01-01T11:20:47", + "$last_seen": "2022-01-01T11:20:47", "$browser": "Chrome", "$browser_version": "83.0.4103.116", "$email": "clark@asw.com", @@ -143,7 +141,8 @@ def engage_response(): { "$distinct_id": "cd9d357f-3f06-4549-91bf-158bb598ee8a", "$properties": { - "$created": "2008-11-12T11:20:47", + "$created": "2023-01-01T11:20:47", + "$last_seen": "2023-01-01T11:20:47", "$browser": "Firefox", "$browser_version": "83.0.4103.116", "$email": "bruce@asw.com", @@ -157,53 +156,32 @@ def engage_response(): ) -def test_engage_stream_incremental(requests_mock, engage_response, config): - requests_mock.register_uri("POST", MIXPANEL_BASE_URL + "engage?page_size=1000", engage_response) - - stream = Engage(authenticator=MagicMock(), **config) - - stream_state = {"created": "2008-12-12T11:20:47"} - records = list(read_incremental(stream, stream_state, cursor_field=["created"])) - - assert len(records) == 1 - assert stream.get_updated_state(current_stream_state=stream_state, latest_record=records[-1]) == {"created": "2008-12-12T11:20:47"} +def test_engage_stream_incremental(requests_mock, engage_response, config_raw): + """Filter 1 old value, 1 new record should be returned""" + engage_properties = { + "results": { + "$browser": { + "count": 124, + "type": "string" + }, + "$browser_version": { + "count": 124, + "type": "string" + } + } + } + config_raw['start_date'] = '2022-02-01T00:00:00Z' + requests_mock.register_uri("GET", MIXPANEL_BASE_URL + "engage/properties", json=engage_properties) + requests_mock.register_uri("GET", MIXPANEL_BASE_URL + "engage?", engage_response) -def test_cohort_members_stream_incremental(requests_mock, engage_response, cohorts_response, config): - requests_mock.register_uri("POST", MIXPANEL_BASE_URL + "engage?page_size=1000", engage_response) - requests_mock.register_uri("GET", MIXPANEL_BASE_URL + "cohorts/list", cohorts_response) + stream = init_stream('engage', config=config_raw) - stream = CohortMembers(authenticator=MagicMock(), **config) - stream.set_cursor(["created"]) - stream_state = {"created": "2008-12-12T11:20:47"} - records = stream.read_records( - sync_mode=SyncMode.incremental, cursor_field=["created"], stream_state=stream_state, stream_slice={"id": 1000} - ) + stream_state = {"last_seen": "2022-02-01T11:20:47"} + records = list(read_incremental(stream, stream_state=stream_state, cursor_field=["last_seen"])) - records = [item for item in records] assert len(records) == 1 - assert stream.get_updated_state(current_stream_state=stream_state, latest_record=records[-1]) == {"created": "2008-12-12T11:20:47"} - - -@pytest.fixture -def funnels_list_response(): - return setup_response(200, [{"funnel_id": 1, "name": "Signup funnel"}]) - - -def test_funnels_list_stream(requests_mock, config, funnels_list_response): - stream = FunnelsList(authenticator=MagicMock(), **config) - requests_mock.register_uri("GET", get_url_to_mock(stream), funnels_list_response) - - records = stream.read_records(sync_mode=SyncMode.full_refresh) - - records_length = sum(1 for _ in records) - assert records_length == 1 - - -@pytest.fixture -def funnels_list_url(config): - funnel_list = FunnelsList(authenticator=MagicMock(), **config) - return get_url_to_mock(funnel_list) + assert stream.get_updated_state(current_stream_state=stream_state, latest_record=records[-1]) == {"last_seen": "2023-01-01T11:20:47"} @pytest.fixture @@ -237,41 +215,36 @@ def funnels_response(start_date): }, ) - -def test_funnels_stream(requests_mock, config, funnels_response, funnels_list_response, funnels_list_url): - stream = Funnels(authenticator=MagicMock(), **config) - requests_mock.register_uri("GET", funnels_list_url, funnels_list_response) - requests_mock.register_uri("GET", get_url_to_mock(stream), funnels_response) - - stream_slices = stream.stream_slices(sync_mode=SyncMode.incremental) - - records_arr = [] - for stream_slice in stream_slices: - records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice) - for record in records: - records_arr.append(record) - - assert len(records_arr) == 4 - last_record = records_arr[-1] - # Test without current state date - new_state = stream.get_updated_state(current_stream_state={}, latest_record=records_arr[-1]) - assert new_state == {str(last_record["funnel_id"]): {"date": last_record["date"]}} - - # Test with current state, that lesser than last record date - last_record_date = pendulum.parse(last_record["date"]).date() - new_state = stream.get_updated_state( - current_stream_state={str(last_record["funnel_id"]): {"date": str(last_record_date - timedelta(days=1))}}, - latest_record=records_arr[-1], +@pytest.fixture +def funnel_ids_response(start_date): + return setup_response( + 200, + [{ + "funnel_id": 36152117, + "name": "test" + }] ) - assert new_state == {str(last_record["funnel_id"]): {"date": last_record["date"]}} - # Test with current state, that is greater, than last record date - new_state = stream.get_updated_state( - current_stream_state={str(last_record["funnel_id"]): {"date": str(last_record_date + timedelta(days=1))}}, - latest_record=records_arr[-1], - ) - assert new_state == {str(last_record["funnel_id"]): {"date": str(last_record_date + timedelta(days=1))}} +def test_funnels_stream(requests_mock, config, funnels_response, funnel_ids_response, config_raw): + config_raw["start_date"] = "2024-01-01T00:00:00Z" + config_raw["end_date"] = "2024-04-01T00:00:00Z" + stream = init_stream('funnels', config=config_raw) + requests_mock.register_uri("GET", MIXPANEL_BASE_URL + "funnels/list", funnel_ids_response) + requests_mock.register_uri("GET", MIXPANEL_BASE_URL + "funnels", funnels_response) + + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.incremental)) + assert len(stream_slices) > 3 + assert { + "funnel_id": stream_slices[0]['funnel_id'], + "name": stream_slices[0]['funnel_name'] + } == { + "funnel_id": "36152117", + "name": "test" + } + records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices[0]) + records = list(records) + assert len(records) == 2 @pytest.fixture def engage_schema_response(): @@ -300,9 +273,9 @@ def _minimize_schema(fill_schema, schema_original): fill_schema[key] = value -def test_engage_schema(requests_mock, engage_schema_response, config): - stream = Engage(authenticator=MagicMock(), **config) - requests_mock.register_uri("GET", get_url_to_mock(EngageSchema(authenticator=MagicMock(), **config)), engage_schema_response) +def test_engage_schema(requests_mock, engage_schema_response, config_raw): + stream = init_stream('engage', config=config_raw) + requests_mock.register_uri("GET", get_url_to_mock(EngageSchema(authenticator=MagicMock(), **config_raw)), engage_schema_response) type_schema = {} _minimize_schema(type_schema, stream.get_json_schema()) @@ -335,7 +308,7 @@ def test_engage_schema(requests_mock, engage_schema_response, config): } -def test_update_engage_schema(requests_mock, config): +def test_update_engage_schema(requests_mock, config, config_raw): stream = EngageSchema(authenticator=MagicMock(), **config) requests_mock.register_uri( "GET", @@ -349,7 +322,7 @@ def test_update_engage_schema(requests_mock, config): }, ), ) - engage_stream = Engage(authenticator=MagicMock(), **config) + engage_stream = init_stream('engage', config=config_raw) engage_schema = engage_stream.get_json_schema() assert "someNewSchemaField" in engage_schema["properties"] @@ -367,15 +340,17 @@ def annotations_response(): ) -def test_annotations_stream(requests_mock, annotations_response, config): +def test_annotations_stream(requests_mock, annotations_response, config_raw): + stream = init_stream('annotations', config=config_raw) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", annotations_response) - stream = Annotations(authenticator=MagicMock(), **config) - requests_mock.register_uri("GET", get_url_to_mock(stream), annotations_response) - - stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} + stream_slice = StreamSlice(partition={}, cursor_slice= { + "start_time": "2021-01-25", + "end_time": "2021-07-25" + }) # read records for single slice records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) - + records = list(records) records_length = sum(1 for _ in records) assert records_length == 2 @@ -395,19 +370,19 @@ def revenue_response(): "status": "ok", }, ) - - -def test_revenue_stream(requests_mock, revenue_response, config): - - stream = Revenue(authenticator=MagicMock(), **config) - requests_mock.register_uri("GET", get_url_to_mock(stream), revenue_response) - - stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} +def test_revenue_stream(requests_mock, revenue_response, config_raw): + + stream = init_stream('revenue', config=config_raw) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", revenue_response) + stream_slice = StreamSlice(partition={}, cursor_slice= { + "start_time": "2021-01-25", + "end_time": "2021-07-25" + }) # read records for single slice - records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice) + records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) + records = list(records) - records_length = sum(1 for _ in records) - assert records_length == 2 + assert len(records) == 2 @pytest.fixture @@ -415,16 +390,8 @@ def export_schema_response(): return setup_response( 200, { - "$browser": {"count": 6}, + "$DYNAMIC_FIELD": {"count": 6}, "$browser_version": {"count": 6}, - "$current_url": {"count": 6}, - "mp_lib": {"count": 6}, - "noninteraction": {"count": 6}, - "$event_name": {"count": 6}, - "$duration_s": {}, - "$event_count": {}, - "$origin_end": {}, - "$origin_start": {}, }, ) @@ -437,7 +404,16 @@ def test_export_schema(requests_mock, export_schema_response, config): records = stream.read_records(sync_mode=SyncMode.full_refresh) records_length = sum(1 for _ in records) - assert records_length == 10 + assert records_length == 2 + +def test_export_get_json_schema(requests_mock, export_schema_response, config): + + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", export_schema_response) + + stream = Export(authenticator=MagicMock(), **config) + schema = stream.get_json_schema() + + assert "DYNAMIC_FIELD" in schema['properties'] @pytest.fixture @@ -465,6 +441,7 @@ def export_response(): def test_export_stream(requests_mock, export_response, config): stream = Export(authenticator=MagicMock(), **config) + requests_mock.register_uri("GET", get_url_to_mock(stream), export_response) stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} # read records for single slice @@ -473,6 +450,19 @@ def test_export_stream(requests_mock, export_response, config): records_length = sum(1 for _ in records) assert records_length == 1 +def test_export_stream_fail(requests_mock, export_response, config): + + stream = Export(authenticator=MagicMock(), **config) + error_message = "" + requests_mock.register_uri("GET", get_url_to_mock(stream), status_code=400, text="Unable to authenticate request") + stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} + try: + records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice) + records = list(records) + except Exception as e: + error_message = str(e) + assert "Your credentials might have expired" in error_message + def test_handle_time_zone_mismatch(requests_mock, config, caplog): stream = Export(authenticator=MagicMock(), **config) @@ -516,39 +506,3 @@ def test_export_iter_dicts(config): assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record] # drop record parts because they are not standing nearby assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record] - - -@pytest.mark.parametrize( - ("http_status_code", "should_retry", "log_message"), - [ - (402, False, "Unable to perform a request. Payment Required: "), - ], -) -def test_should_retry_payment_required(http_status_code, should_retry, log_message, config, caplog): - response_mock = MagicMock() - response_mock.status_code = http_status_code - response_mock.json = MagicMock(return_value={"error": "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing"}) - streams = [Annotations, CohortMembers, Cohorts, Engage, EngageSchema, Export, ExportSchema, Funnels, FunnelsList, Revenue] - for stream_class in streams: - stream = stream_class(authenticator=MagicMock(), **config) - assert stream.should_retry(response_mock) == should_retry - assert log_message in caplog.text - - -def test_raise_config_error_on_creds_expiration(config, caplog, requests_mock): - streams = [] - for cls in [Annotations, CohortMembers, Cohorts, Engage, EngageSchema, Export, ExportSchema, Funnels, FunnelsList, Revenue]: - stream = cls(authenticator=MagicMock(), **config) - requests_mock.register_uri(stream.http_method, get_url_to_mock(stream), status_code=400, text="Unable to authenticate request") - streams.append(stream) - - for stream in streams: - records = [] - with pytest.raises(AirbyteTracedException) as e: - for slice_ in stream.stream_slices(sync_mode="full_refresh"): - records.extend(stream.read_records("full_refresh", stream_slice=slice_)) - assert records == [] - assert ( - str(e.value) == "Your credentials might have expired. Please update your config with valid credentials. " - "See more details: Unable to authenticate request" - ) diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py index 2a46806b2197..edb267435a85 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py @@ -6,108 +6,12 @@ import pendulum from airbyte_cdk.sources.streams.http.auth import NoAuth -from source_mixpanel.streams import Annotations, Export +from source_mixpanel.streams import Export def test_date_slices(): now = pendulum.today(tz="US/Pacific").date() - # Test with start_date now range - stream_slices = Annotations( - authenticator=NoAuth(), start_date=now, end_date=now, date_window_size=1, region="EU", project_timezone="US/Pacific" - ).stream_slices(sync_mode="any") - assert 1 == len(list(stream_slices)) - - stream_slices = Annotations( - authenticator=NoAuth(), - start_date=now - timedelta(days=1), - end_date=now, - date_window_size=1, - region="US", - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert 2 == len(list(stream_slices)) - - stream_slices = Annotations( - authenticator=NoAuth(), - region="US", - start_date=now - timedelta(days=2), - end_date=now, - date_window_size=1, - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert 3 == len(list(stream_slices)) - - stream_slices = Annotations( - authenticator=NoAuth(), - region="US", - start_date=now - timedelta(days=2), - end_date=now, - date_window_size=10, - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert 1 == len(list(stream_slices)) - - # test with attribution_window - stream_slices = Annotations( - authenticator=NoAuth(), - start_date=now - timedelta(days=2), - end_date=now, - date_window_size=1, - attribution_window=5, - region="US", - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert 8 == len(list(stream_slices)) - - # Test with start_date end_date range - stream_slices = Annotations( - authenticator=NoAuth(), - start_date=date.fromisoformat("2021-07-01"), - end_date=date.fromisoformat("2021-07-01"), - date_window_size=1, - region="US", - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}] == list(stream_slices) - - stream_slices = Annotations( - authenticator=NoAuth(), - start_date=date.fromisoformat("2021-07-01"), - end_date=date.fromisoformat("2021-07-02"), - date_window_size=1, - region="EU", - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}, {"start_date": "2021-07-02", "end_date": "2021-07-02"}] == list( - stream_slices - ) - - stream_slices = Annotations( - authenticator=NoAuth(), - start_date=date.fromisoformat("2021-07-01"), - end_date=date.fromisoformat("2021-07-03"), - date_window_size=1, - region="US", - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert [ - {"start_date": "2021-07-01", "end_date": "2021-07-01"}, - {"start_date": "2021-07-02", "end_date": "2021-07-02"}, - {"start_date": "2021-07-03", "end_date": "2021-07-03"}, - ] == list(stream_slices) - - stream_slices = Annotations( - authenticator=NoAuth(), - start_date=date.fromisoformat("2021-07-01"), - end_date=date.fromisoformat("2021-07-03"), - date_window_size=2, - region="US", - project_timezone="US/Pacific", - ).stream_slices(sync_mode="any") - assert [{"start_date": "2021-07-01", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == list( - stream_slices - ) # test with stream_state stream_slices = Export( diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py index 611fa8ae5da9..5b08cd789244 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py @@ -32,9 +32,10 @@ def command_check(source: Source, config): def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any], cursor_field: List[str] = None): res = [] + stream_instance.state = stream_state slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_state=stream_state) for slice in slices: - records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state) + records = stream_instance.read_records(sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_slice=slice, stream_state=stream_state) for record in records: stream_state = stream_instance.get_updated_state(stream_state, record) res.append(record) diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 8e1b0c452d83..6171e91bbcd8 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -55,6 +55,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------- | +| 2.3.0 | 2024-04-12 | [36724](https://github.com/airbytehq/airbyte/pull/36724) | Connector migrated to low-code | | 2.2.2 | 2024-04-19 | [36651](https://github.com/airbytehq/airbyte/pull/36651) | Updating to 0.80.0 CDK | | 2.2.1 | 2024-04-12 | [36651](https://github.com/airbytehq/airbyte/pull/36651) | Schema descriptions | | 2.2.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` |