diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 856be97ae6197..dd4f72314461d 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3141,6 +3141,7 @@ components: sourceCatalogId: type: string format: uuid + nullable: true WebBackendConnectionCreate: type: object required: @@ -3221,6 +3222,7 @@ components: sourceCatalogId: type: string format: uuid + nullable: true WebBackendConnectionUpdate: type: object required: @@ -3307,6 +3309,7 @@ components: sourceCatalogId: type: string format: uuid + nullable: true ConnectionSearch: type: object properties: @@ -3597,6 +3600,7 @@ components: sourceDefinedCursor: description: If the source defines the cursor field, then any other cursor field inputs will be ignored. If it does not, either the user_provided one is used, or the default one is used as a backup. type: boolean + nullable: true defaultCursorField: description: Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves. type: array @@ -3611,6 +3615,7 @@ components: type: string namespace: type: string + nullable: true description: Optional Source-defined namespace. Airbyte streams from the same sources should have the same namespace. Currently only used by JDBC destinations to determine what schema to write to. StreamJsonSchema: type: object diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 4b45a8cae41a2..6f18ec170c8f3 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -455,6 +455,7 @@ public void testDiscoverSourceSchema() throws ApiException { .name(STREAM_NAME) .namespace("public") .jsonSchema(jsonSchema) + .sourceDefinedCursor(null) .defaultCursorField(Collections.emptyList()) .sourceDefinedPrimaryKey(Collections.emptyList()) .supportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); diff --git a/octavia-cli/README.md b/octavia-cli/README.md index c1e8ea05dd6b4..1730fcefa2840 100644 --- a/octavia-cli/README.md +++ b/octavia-cli/README.md @@ -362,8 +362,9 @@ You can disable telemetry by setting the `OCTAVIA_ENABLE_TELEMETRY` environment ## Changelog -| Version | Date | Description | PR | -|---------|------------|-------------------|----------------------------------------------------------| -| 0.36.2 | 2022-04-15 | Improve telemetry | [#12072](https://github.com/airbytehq/airbyte/issues/11896)| -| 0.35.68 | 2022-04-12 | Add telemetry | [#11896](https://github.com/airbytehq/airbyte/issues/11896)| -| 0.35.61 | 2022-04-07 | Alpha release | [EPIC](https://github.com/airbytehq/airbyte/issues/10704)| +| Version | Date | Description | PR | +|---------|------------|-------------------------------------|----------------------------------------------------------| +| 0.36.11 | 2022-05-05 | Use snake case in connection fields | [#12133](https://github.com/airbytehq/airbyte/pull/12133)| +| 0.36.2 | 2022-04-15 | Improve telemetry | [#12072](https://github.com/airbytehq/airbyte/issues/11896)| +| 0.35.68 | 2022-04-12 | Add telemetry | [#11896](https://github.com/airbytehq/airbyte/issues/11896)| +| 0.35.61 | 2022-04-07 | Alpha release | [EPIC](https://github.com/airbytehq/airbyte/issues/10704)| diff --git a/octavia-cli/integration_tests/configurations/connections/poke_to_pg/configuration.yaml b/octavia-cli/integration_tests/configurations/connections/poke_to_pg/configuration.yaml index e4ddd7c8da74f..6c6941393aadc 100644 --- a/octavia-cli/integration_tests/configurations/connections/poke_to_pg/configuration.yaml +++ b/octavia-cli/integration_tests/configurations/connections/poke_to_pg/configuration.yaml @@ -1,38 +1,35 @@ -# Configuration for connection poke_to_g +# Configuration for connection poke_to_pg definition_type: connection -resource_name: poke_to_g +resource_name: poke_to_pg source_id: TO_UPDATE_FROM_TEST destination_id: TO_UPDATE_FROM_TEST # EDIT THE CONFIGURATION BELOW! configuration: - sourceId: TO_UPDATE_FROM_TEST # REQUIRED | string - destinationId: TO_UPDATE_FROM_TEST # REQUIRED | string status: active # REQUIRED | string | Allowed values: active, inactive, deprecated - name: poke_to_g # OPTIONAL | string | Optional name of the connection - namespaceDefinition: source # OPTIONAL | string | Allowed values: source, destination, customformat - namespaceFormat: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'. + namespace_definition: source # OPTIONAL | string | Allowed values: source, destination, customformat + namespace_format: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'. prefix: "" # REQUIRED | Prefix that will be prepended to the name of each stream when it is written to the destination - resourceRequirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations) + resource_requirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations) cpu_limit: "" # OPTIONAL cpu_request: "" # OPTIONAL memory_limit: "" # OPTIONAL memory_request: "" # OPTIONAL schedule: # OPTIONAL | object - timeUnit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months + time_unit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months units: 1 # REQUIRED | integer - syncCatalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed. + sync_catalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed. streams: - config: - aliasName: pokemon - cursorField: [] - destinationSyncMode: append - primaryKey: [] + alias_name: pokemon + cursor_field: [] + destination_sync_mode: append + primary_key: [] selected: true - syncMode: full_refresh + sync_mode: full_refresh stream: - defaultCursorField: [] - jsonSchema: + default_cursor_field: [] + json_schema: $schema: http://json-schema.org/draft-07/schema# properties: abilities: @@ -358,7 +355,7 @@ configuration: type: object name: pokemon namespace: null - sourceDefinedCursor: null - sourceDefinedPrimaryKey: [] - supportedSyncModes: + source_defined_cursor: null + source_defined_primary_key: [] + supported_sync_modes: - full_refresh diff --git a/octavia-cli/integration_tests/conftest.py b/octavia-cli/integration_tests/conftest.py index 73e384ef10373..76c6ae70b791f 100644 --- a/octavia-cli/integration_tests/conftest.py +++ b/octavia-cli/integration_tests/conftest.py @@ -112,15 +112,13 @@ def updated_connection_configuration_and_path(octavia_test_project_directory, so local_configuration = yaml.safe_load(dumb_local_configuration_file) local_configuration["source_id"] = source.resource_id local_configuration["destination_id"] = destination.resource_id - local_configuration["configuration"]["sourceId"] = source.resource_id - local_configuration["configuration"]["destinationId"] = destination.resource_id with open(edited_path, "w") as updated_configuration_file: yaml.dump(local_configuration, updated_configuration_file) return local_configuration, edited_path @pytest.fixture(scope="session") -def connection(api_client, workspace_id, octavia_test_project_directory, source, destination): +def connection(api_client, workspace_id, octavia_test_project_directory, source, destination, connection_state_path): configuration, configuration_path = updated_connection_configuration_and_path(octavia_test_project_directory, source, destination) connection = Connection(api_client, workspace_id, configuration, configuration_path) yield connection diff --git a/octavia-cli/integration_tests/test_apply/test_resources.py b/octavia-cli/integration_tests/test_apply/test_resources.py index b8dc140b49028..a56292cac78f9 100644 --- a/octavia-cli/integration_tests/test_apply/test_resources.py +++ b/octavia-cli/integration_tests/test_apply/test_resources.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # + import pytest pytestmark = pytest.mark.integration @@ -10,23 +11,25 @@ def test_source_lifecycle(source): assert not source.was_created source.create() - source.state = source._get_state_from_file() + source.state = source._get_state_from_file(source.configuration_path) assert source.was_created assert not source.get_diff_with_remote_resource() - source.local_configuration["configuration"]["pokemon_name"] = "snorlax" + source.raw_configuration["configuration"]["pokemon_name"] = "snorlax" + source.configuration = source._deserialize_raw_configuration() assert 'changed from "ditto" to "snorlax"' in source.get_diff_with_remote_resource() source.update() assert not source.get_diff_with_remote_resource() - assert source.catalog["streams"][0]["config"]["aliasName"] == "pokemon" + assert source.catalog["streams"][0]["config"]["alias_name"] == "pokemon" def test_destination_lifecycle(destination): assert not destination.was_created destination.create() - destination.state = destination._get_state_from_file() + destination.state = destination._get_state_from_file(destination.configuration_path) assert destination.was_created assert not destination.get_diff_with_remote_resource() - destination.local_configuration["configuration"]["host"] = "foo" + destination.raw_configuration["configuration"]["host"] = "foo" + destination.configuration = destination._deserialize_raw_configuration() assert 'changed from "localhost" to "foo"' in destination.get_diff_with_remote_resource() destination.update() assert not destination.get_diff_with_remote_resource() @@ -37,10 +40,11 @@ def test_connection_lifecycle(source, destination, connection): assert destination.was_created assert not connection.was_created connection.create() - connection.state = connection._get_state_from_file() + connection.state = connection._get_state_from_file(connection.configuration_path) assert connection.was_created assert not connection.get_diff_with_remote_resource() - connection.local_configuration["configuration"]["status"] = "inactive" + connection.raw_configuration["configuration"]["status"] = "inactive" + connection.configuration = connection._deserialize_raw_configuration() assert 'changed from "active" to "inactive"' in connection.get_diff_with_remote_resource() connection.update() assert not connection.get_diff_with_remote_resource() diff --git a/octavia-cli/integration_tests/test_generate/expected_rendered_yaml/connection/expected.yaml b/octavia-cli/integration_tests/test_generate/expected_rendered_yaml/connection/expected.yaml index 9ba155cc10ded..f23d68f79d199 100644 --- a/octavia-cli/integration_tests/test_generate/expected_rendered_yaml/connection/expected.yaml +++ b/octavia-cli/integration_tests/test_generate/expected_rendered_yaml/connection/expected.yaml @@ -6,22 +6,19 @@ destination_id: my_destination_id # EDIT THE CONFIGURATION BELOW! configuration: - sourceId: my_source_id # REQUIRED | string - destinationId: my_destination_id # REQUIRED | string status: active # REQUIRED | string | Allowed values: active, inactive, deprecated - name: my_new_connection # OPTIONAL | string | Optional name of the connection - namespaceDefinition: source # OPTIONAL | string | Allowed values: source, destination, customformat - namespaceFormat: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'. + namespace_definition: source # OPTIONAL | string | Allowed values: source, destination, customformat + namespace_format: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'. prefix: "" # REQUIRED | Prefix that will be prepended to the name of each stream when it is written to the destination - resourceRequirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations) + resource_requirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations) cpu_limit: "" # OPTIONAL cpu_request: "" # OPTIONAL memory_limit: "" # OPTIONAL memory_request: "" # OPTIONAL schedule: # OPTIONAL | object - timeUnit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months + time_unit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months units: 1 # REQUIRED | integer - syncCatalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed. + sync_catalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed. streams: - config: aliasName: aliasMock diff --git a/octavia-cli/octavia_cli/apply/resources.py b/octavia-cli/octavia_cli/apply/resources.py index 664a9746c4637..90a7901ab6d31 100644 --- a/octavia-cli/octavia_cli/apply/resources.py +++ b/octavia-cli/octavia_cli/apply/resources.py @@ -5,33 +5,36 @@ import abc import os import time +from copy import deepcopy from pathlib import Path -from typing import Any, Callable, Optional, Union +from typing import Callable, Optional, Set, Union import airbyte_api_client import yaml from airbyte_api_client.api import connection_api, destination_api, source_api from airbyte_api_client.model.airbyte_catalog import AirbyteCatalog +from airbyte_api_client.model.airbyte_stream import AirbyteStream +from airbyte_api_client.model.airbyte_stream_and_configuration import AirbyteStreamAndConfiguration +from airbyte_api_client.model.airbyte_stream_configuration import AirbyteStreamConfiguration from airbyte_api_client.model.connection_create import ConnectionCreate from airbyte_api_client.model.connection_id_request_body import ConnectionIdRequestBody from airbyte_api_client.model.connection_read import ConnectionRead -from airbyte_api_client.model.connection_read_list import ConnectionReadList -from airbyte_api_client.model.connection_search import ConnectionSearch +from airbyte_api_client.model.connection_schedule import ConnectionSchedule from airbyte_api_client.model.connection_status import ConnectionStatus from airbyte_api_client.model.connection_update import ConnectionUpdate from airbyte_api_client.model.destination_create import DestinationCreate from airbyte_api_client.model.destination_id_request_body import DestinationIdRequestBody from airbyte_api_client.model.destination_read import DestinationRead -from airbyte_api_client.model.destination_read_list import DestinationReadList -from airbyte_api_client.model.destination_search import DestinationSearch +from airbyte_api_client.model.destination_sync_mode import DestinationSyncMode from airbyte_api_client.model.destination_update import DestinationUpdate +from airbyte_api_client.model.namespace_definition_type import NamespaceDefinitionType +from airbyte_api_client.model.resource_requirements import ResourceRequirements from airbyte_api_client.model.source_create import SourceCreate from airbyte_api_client.model.source_discover_schema_request_body import SourceDiscoverSchemaRequestBody from airbyte_api_client.model.source_id_request_body import SourceIdRequestBody from airbyte_api_client.model.source_read import SourceRead -from airbyte_api_client.model.source_read_list import SourceReadList -from airbyte_api_client.model.source_search import SourceSearch from airbyte_api_client.model.source_update import SourceUpdate +from airbyte_api_client.model.sync_mode import SyncMode from click import ClickException from .diff_helpers import compute_diff, hash_config @@ -58,7 +61,7 @@ def __init__(self, configuration_path: str, resource_id: str, generation_timesta configuration_path (str): Path to the configuration this state relates to. resource_id (str): Id of the resource the state relates to. generation_timestamp (int): State generation timestamp. - configuration_hash (str): Checksum of the configuration file. + configuration_hash (str): Hash of the loaded configuration file. """ self.configuration_path = configuration_path self.resource_id = resource_id @@ -80,19 +83,18 @@ def _save(self) -> None: yaml.dump(self.as_dict(), state_file) @classmethod - def create(cls, configuration_path: str, configuration: dict, resource_id: str) -> "ResourceState": + def create(cls, configuration_path: str, configuration_hash: str, resource_id: str) -> "ResourceState": """Create a state for a resource configuration. Args: configuration_path (str): Path to the YAML file defining the resource. - configuration (dict): Configuration object that will be hashed. + configuration_hash (str): Hash of the loaded configuration fie. resource_id (str): UUID of the resource. Returns: ResourceState: state representing the resource. """ generation_timestamp = int(time.time()) - configuration_hash = hash_config(configuration) state = ResourceState(configuration_path, resource_id, generation_timestamp, configuration_hash) state._save() return state @@ -157,14 +159,14 @@ def update_function_name( @property @abc.abstractmethod - def search_function_name( + def get_function_name( self, ): # pragma: no cover pass @property @abc.abstractmethod - def search_payload( + def get_payload( self, ): # pragma: no cover pass @@ -191,101 +193,95 @@ def resource_type( pass def __init__( - self, api_client: airbyte_api_client.ApiClient, workspace_id: str, local_configuration: dict, configuration_path: str + self, api_client: airbyte_api_client.ApiClient, workspace_id: str, raw_configuration: dict, configuration_path: str ) -> None: """Create a BaseResource object. Args: api_client (airbyte_api_client.ApiClient): the Airbyte API client. workspace_id (str): the workspace id. - local_configuration (dict): The local configuration describing the resource. + raw_configuration (dict): The local configuration describing the resource. configuration_path (str): The path to the local configuration describing the resource with YAML. """ self._create_fn = getattr(self.api, self.create_function_name) self._update_fn = getattr(self.api, self.update_function_name) - self._search_fn = getattr(self.api, self.search_function_name) - self.workspace_id = workspace_id - self.local_configuration = local_configuration + self._get_fn = getattr(self.api, self.get_function_name) + self.configuration_path = configuration_path - self.api_instance = self.api(api_client) - self.state = self._get_state_from_file() - self.local_file_changed = True if self.state is None else hash_config(self.local_configuration) != self.state.configuration_hash + self.state = self._get_state_from_file(configuration_path) + self.configuration_hash = hash_config( + raw_configuration + ) # Hash as early as possible to limit risks of raw_configuration downstream mutations. - @property - def remote_resource(self): - return self._get_remote_resource() + self.local_file_changed = True if self.state is None else self.configuration_hash != self.state.configuration_hash - def _get_comparable_configuration( - self, - ) -> Union[SourceRead, DestinationRead, dict]: # pragma: no cover - """Get the object to which local configuration will be compared to. + self.raw_configuration = raw_configuration + self.configuration = self._deserialize_raw_configuration() + self.api_instance = self.api(api_client) + self.workspace_id = workspace_id + self.resource_name = raw_configuration["resource_name"] - Raises: - NonExistingResourceError: Raised if the remote resource does not exists. + def _deserialize_raw_configuration(self): + """Deserialize a raw configuration into another object and perform extra validation if needed. + The base implementation does nothing except extracting the configuration field and returning a copy of it. Returns: - Union[SourceRead, DestinationRead, dict]: The comparable configuration + dict: Deserialized configuration """ - if not self.was_created: - raise NonExistingResourceError("Can't find a comparable configuration as the remote resource does not exists.") - else: - return self.remote_resource + return deepcopy(self.raw_configuration["configuration"]) - @property - def was_created(self): - return True if self.remote_resource else False - - def __getattr__(self, name: str) -> Any: - """Map attribute of the YAML config to the Resource object. + @staticmethod + def _check_for_invalid_configuration_keys(dict_to_check: dict, invalid_keys: Set[str], error_message: str): + """Utils function to check if a configuration dictionnary has legacy keys that were removed/renamed after an octavia update. Args: - name (str): Attribute name + dict_to_check (dict): The dictionnary for which keys should be checked + invalid_keys (Set[str]): The set of invalid keys we want to check the existence + error_message (str): The error message to display to the user Raises: - AttributeError: Raised if the attributed was not found in the local configuration. - - Returns: - [Any]: Attribute value + InvalidConfigurationError: Raised if an invalid key was found in the dict_to_check """ - if name in self.local_configuration: - return self.local_configuration.get(name) - raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.") + invalid_keys = list(set(dict_to_check.keys()) & invalid_keys) + if invalid_keys: + raise InvalidConfigurationError(f"{error_message}: {', '.join(invalid_keys)}") + + @property + def remote_resource(self): + return self._get_remote_resource() if self.state else None - def _search(self, check_return_type=True) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]: - """Run search of a resources on the remote Airbyte instance. + def _get_local_comparable_configuration(self) -> dict: + return self.raw_configuration["configuration"] + + @abc.abstractmethod + def _get_remote_comparable_configuration( + self, + ) -> dict: # pragma: no cover + raise NotImplementedError + + @property + def was_created(self): + return True if self.remote_resource else False + + def _get_remote_resource(self) -> Union[SourceRead, DestinationRead, ConnectionRead]: + """Retrieve a resources on the remote Airbyte instance. Returns: Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results """ - return self._search_fn(self.api_instance, self.search_payload, _check_return_type=check_return_type) + return self._get_fn(self.api_instance, self.get_payload) - def _get_state_from_file(self) -> Optional[ResourceState]: + @staticmethod + def _get_state_from_file(configuration_file: str) -> Optional[ResourceState]: """Retrieve a state object from a local YAML file if it exists. Returns: Optional[ResourceState]: the deserialized resource state if YAML file found. """ - expected_state_path = Path(os.path.join(os.path.dirname(self.configuration_path), "state.yaml")) + expected_state_path = Path(os.path.join(os.path.dirname(configuration_file), "state.yaml")) if expected_state_path.is_file(): return ResourceState.from_file(expected_state_path) - def _get_remote_resource(self) -> Optional[Union[SourceRead, DestinationRead, ConnectionRead]]: - """Find the remote resource on the Airbyte instance associated with the current resource. - - Raises: - DuplicateResourceError: raised if the search results return multiple resources. - - Returns: - Optional[Union[SourceRead, DestinationRead, ConnectionRead]]: The remote resource found. - """ - search_results = self._search().get(f"{self.resource_type}s", []) - if len(search_results) > 1: - raise DuplicateResourceError("Two or more ressources exist with the same name.") - if len(search_results) == 1: - return search_results[0] - else: - return None - def get_diff_with_remote_resource(self) -> str: """Compute the diff between current resource and the remote resource. @@ -297,35 +293,32 @@ def get_diff_with_remote_resource(self) -> str: """ if not self.was_created: raise NonExistingResourceError("Cannot compute diff with a non existing remote resource.") - current_config = self.configuration - remote_config = self._get_comparable_configuration() - diff = compute_diff(remote_config, current_config) + local_config = self._get_local_comparable_configuration() + remote_config = self._get_remote_comparable_configuration() + diff = compute_diff(remote_config, local_config) return diff.pretty() def _create_or_update( self, operation_fn: Callable, payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate, ConnectionCreate, ConnectionUpdate], - _check_return_type: bool = True, ) -> Union[SourceRead, DestinationRead]: """Wrapper to trigger create or update of remote resource. - Args: - operation_fn (Callable): The API function to run. - payload (Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate]): The payload to send to create or update the resource. + Args: + operation_fn (Callable): The API function to run. + payload (Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate]): The payload to send to create or update the resource. + . + Raises: + InvalidConfigurationError: Raised if the create or update payload is invalid. + ApiException: Raised in case of other API errors. - Kwargs: - _check_return_type (boolean): Whether to check the types returned in the API agains airbyte-api-client open api spec. - Raises: - InvalidConfigurationError: Raised if the create or update payload is invalid. - ApiException: Raised in case of other API errors. - - Returns: - Union[SourceRead, DestinationRead, ConnectionRead]: The created or updated resource. + Returns: + Union[SourceRead, DestinationRead, ConnectionRead]: The created or updated resource. """ try: - result = operation_fn(self.api_instance, payload, _check_return_type=_check_return_type) - return result, ResourceState.create(self.configuration_path, self.local_configuration, result[self.resource_id_field]) + result = operation_fn(self.api_instance, payload) + return result, ResourceState.create(self.configuration_path, self.configuration_hash, result[self.resource_id_field]) except airbyte_api_client.ApiException as api_error: if api_error.status == 422: # This API response error is really verbose, but it embodies all the details about why the config is not valid. @@ -374,13 +367,30 @@ def resource_id_request_body(self) -> Union[SourceIdRequestBody, DestinationIdRe return self.ResourceIdRequestBody(self.resource_id) -class Source(BaseResource): +class SourceAndDestination(BaseResource): + @property + def definition_id(self): + return self.raw_configuration["definition_id"] + + @property + def definition_image(self): + return self.raw_configuration["definition_image"] + + @property + def definition_version(self): + return self.raw_configuration["definition_version"] + + def _get_remote_comparable_configuration(self) -> dict: + return self.remote_resource.connection_configuration + + +class Source(SourceAndDestination): api = source_api.SourceApi create_function_name = "create_source" resource_id_field = "source_id" ResourceIdRequestBody = SourceIdRequestBody - search_function_name = "search_sources" + get_function_name = "get_source" update_function_name = "update_source" resource_type = "source" @@ -389,11 +399,13 @@ def create_payload(self): return SourceCreate(self.definition_id, self.configuration, self.workspace_id, self.resource_name) @property - def search_payload(self): - if self.state is None: - return SourceSearch(source_definition_id=self.definition_id, workspace_id=self.workspace_id, name=self.resource_name) - else: - return SourceSearch(source_definition_id=self.definition_id, workspace_id=self.workspace_id, source_id=self.state.resource_id) + def get_payload(self) -> Optional[SourceIdRequestBody]: + """Defines the payload to retrieve the remote source if a state exists. + Returns: + SourceIdRequestBody: The SourceIdRequestBody payload. + """ + if self.state is not None: + return SourceIdRequestBody(self.state.resource_id) @property def update_payload(self): @@ -403,10 +415,6 @@ def update_payload(self): name=self.resource_name, ) - def _get_comparable_configuration(self): - comparable_configuration = super()._get_comparable_configuration() - return comparable_configuration.connection_configuration - @property def source_discover_schema_request_body(self) -> SourceDiscoverSchemaRequestBody: """Creates SourceDiscoverSchemaRequestBody from resource id. @@ -428,17 +436,16 @@ def catalog(self) -> AirbyteCatalog: Returns: AirbyteCatalog: The catalog issued by schema discovery. """ - schema = self.api_instance.discover_schema_for_source(self.source_discover_schema_request_body, _check_return_type=False) + schema = self.api_instance.discover_schema_for_source(self.source_discover_schema_request_body) return schema.catalog -class Destination(BaseResource): - +class Destination(SourceAndDestination): api = destination_api.DestinationApi create_function_name = "create_destination" resource_id_field = "destination_id" ResourceIdRequestBody = DestinationIdRequestBody - search_function_name = "search_destinations" + get_function_name = "get_destination" update_function_name = "update_destination" resource_type = "destination" @@ -452,17 +459,13 @@ def create_payload(self) -> DestinationCreate: return DestinationCreate(self.workspace_id, self.resource_name, self.definition_id, self.configuration) @property - def search_payload(self) -> DestinationSearch: - """Defines the payload to search the remote resource. Search by resource name if no state found, otherwise search by resource id found in the state. + def get_payload(self) -> Optional[DestinationRead]: + """Defines the payload to retrieve the remote destination if a state exists. Returns: - DestinationSearch: The DestinationSearch model instance + DestinationRead: The DestinationRead model instance """ - if self.state is None: - return DestinationSearch(destination_definition_id=self.definition_id, workspace_id=self.workspace_id, name=self.resource_name) - else: - return DestinationSearch( - destination_definition_id=self.definition_id, workspace_id=self.workspace_id, destination_id=self.state.resource_id - ) + if self.state is not None: + return DestinationIdRequestBody(self.state.resource_id) @property def update_payload(self) -> DestinationUpdate: @@ -477,10 +480,6 @@ def update_payload(self) -> DestinationUpdate: name=self.resource_name, ) - def _get_comparable_configuration(self) -> DestinationRead: - comparable_configuration = super()._get_comparable_configuration() - return comparable_configuration.connection_configuration - class Connection(BaseResource): APPLY_PRIORITY = 1 # Set to 1 to create connection after source or destination. @@ -488,36 +487,53 @@ class Connection(BaseResource): create_function_name = "create_connection" resource_id_field = "connection_id" ResourceIdRequestBody = ConnectionIdRequestBody - search_function_name = "search_connections" + get_function_name = "get_connection" update_function_name = "update_connection" resource_type = "connection" + def _deserialize_raw_configuration(self): + """Deserialize a raw configuration into another dict and perform serialization if needed. + In this implementation we cast raw types to Airbyte API client models types for validation. + Args: + raw_configuration (dict): The raw configuration + + Returns: + dict: Deserialized connection configuration + """ + configuration = super()._deserialize_raw_configuration() + self._check_for_legacy_connection_configuration_keys(configuration) + configuration["sync_catalog"] = self._create_configured_catalog(configuration["sync_catalog"]) + configuration["namespace_definition"] = NamespaceDefinitionType(configuration["namespace_definition"]) + configuration["schedule"] = ConnectionSchedule(**configuration["schedule"]) + configuration["resource_requirements"] = ResourceRequirements(**configuration["resource_requirements"]) + configuration["status"] = ConnectionStatus(configuration["status"]) + return configuration + + @property + def source_id(self): + return self.raw_configuration["source_id"] + @property - def status(self) -> ConnectionStatus: - return ConnectionStatus(self.local_configuration["configuration"]["status"]) + def destination_id(self): + return self.raw_configuration["destination_id"] @property def create_payload(self) -> ConnectionCreate: """Defines the payload to create the remote connection. - Disable snake case parameter usage with _spec_property_naming=True Returns: ConnectionCreate: The ConnectionCreate model instance """ - return ConnectionCreate(**self.configuration, _check_type=False, _spec_property_naming=True) + return ConnectionCreate(name=self.resource_name, source_id=self.source_id, destination_id=self.destination_id, **self.configuration) @property - def search_payload(self) -> ConnectionSearch: - """Defines the payload to search the remote connection. Search by connection name if no state found, otherwise search by connection id found in the state. + def get_payload(self) -> Optional[ConnectionIdRequestBody]: + """Defines the payload to retrieve the remote connection if a state exists. Returns: - ConnectionSearch: The ConnectionSearch model instance + ConnectionIdRequestBody: The ConnectionIdRequestBody payload. """ - if self.state is None: - return ConnectionSearch( - source_id=self.source_id, destination_id=self.destination_id, name=self.resource_name, status=self.status - ) - else: - return ConnectionSearch(connection_id=self.state.resource_id, source_id=self.source_id, destination_id=self.destination_id) + if self.state is not None: + return ConnectionIdRequestBody(self.state.resource_id) @property def update_payload(self) -> ConnectionUpdate: @@ -526,35 +542,72 @@ def update_payload(self) -> ConnectionUpdate: Returns: ConnectionUpdate: The DestinationUpdate model instance. """ - return ConnectionUpdate( - connection_id=self.resource_id, - sync_catalog=self.configuration["syncCatalog"], - status=self.configuration["status"], - namespace_definition=self.configuration["namespaceDefinition"], - namespace_format=self.configuration["namespaceFormat"], - prefix=self.configuration["prefix"], - schedule=self.configuration["schedule"], - resource_requirements=self.configuration["resourceRequirements"], - _check_type=False, - ) + return ConnectionUpdate(connection_id=self.resource_id, **self.configuration) def create(self) -> dict: - return self._create_or_update( - self._create_fn, self.create_payload, _check_return_type=False - ) # Disable check_return_type as the returned payload does not match the open api spec. + return self._create_or_update(self._create_fn, self.create_payload) def update(self) -> dict: - return self._create_or_update( - self._update_fn, self.update_payload, _check_return_type=False - ) # Disable check_return_type as the returned payload does not match the open api spec. + return self._create_or_update(self._update_fn, self.update_payload) - def _search(self, check_return_type=True) -> dict: - return self._search_fn(self.api_instance, self.search_payload, _check_return_type=False) + @staticmethod + def _create_configured_catalog(sync_catalog: dict) -> AirbyteCatalog: + """Deserialize a sync_catalog represented as dict to an AirbyteCatalog. + + Args: + sync_catalog (dict): The sync catalog represented as a dict. + + Returns: + AirbyteCatalog: The configured catalog. + """ + streams_and_configurations = [] + for stream in sync_catalog["streams"]: + stream["stream"]["supported_sync_modes"] = [SyncMode(sm) for sm in stream["stream"]["supported_sync_modes"]] + stream["config"]["sync_mode"] = SyncMode(stream["config"]["sync_mode"]) + stream["config"]["destination_sync_mode"] = DestinationSyncMode(stream["config"]["destination_sync_mode"]) + + streams_and_configurations.append( + AirbyteStreamAndConfiguration( + stream=AirbyteStream(**stream["stream"]), config=AirbyteStreamConfiguration(**stream["config"]) + ) + ) + return AirbyteCatalog(streams_and_configurations) + + # TODO this check can be removed when all our active user are on >= 0.36.11 + def _check_for_legacy_connection_configuration_keys(self, configuration_to_check): + """We changed connection configuration keys from camelCase to snake_case in 0.36.11. + This function check if the connection configuration has some camelCase keys and display a meaningful error message. + + Args: + configuration_to_check (dict): Configuration to validate + """ + error_message = ( + "The following keys should be in snake_case since version 0.36.10, please edit or regenerate your connection configuration" + ) + self._check_for_invalid_configuration_keys( + configuration_to_check, {"syncCatalog", "namespaceDefinition", "namespaceFormat", "resourceRequirements"}, error_message + ) + self._check_for_invalid_configuration_keys(configuration_to_check["schedule"], {"timeUnit"}, error_message) + for stream in configuration_to_check["sync_catalog"]["streams"]: + self._check_for_invalid_configuration_keys( + stream["stream"], + {"defaultCursorField", "jsonSchema", "sourceDefinedCursor", "sourceDefinedPrimaryKey", "supportedSyncModes"}, + error_message, + ) + self._check_for_invalid_configuration_keys( + stream["config"], {"aliasName", "cursorField", "destinationSyncMode", "primaryKey", "syncMode"}, error_message + ) - def _get_comparable_configuration(self) -> dict: - keys_to_filter_out = ["connectionId", "operationIds", "sourceCatalogId"] - comparable_configuration = super()._get_comparable_configuration() - return {k: v for k, v in comparable_configuration.items() if k not in keys_to_filter_out} + def _get_remote_comparable_configuration(self) -> dict: + keys_to_filter_out = [ + "name", + "source_id", + "destination_id", + "connection_id", + "operation_ids", + "source_catalog_id", + ] # We do not allow local editing of these keys + return {k: v for k, v in self.remote_resource.to_dict().items() if k not in keys_to_filter_out} def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination, Connection]: @@ -572,12 +625,12 @@ def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configu Union[Source, Destination, Connection]: The resource object created from the YAML config. """ with open(configuration_path, "r") as f: - local_configuration = yaml.load(f, EnvVarLoader) - if local_configuration["definition_type"] == "source": - return Source(api_client, workspace_id, local_configuration, configuration_path) - if local_configuration["definition_type"] == "destination": - return Destination(api_client, workspace_id, local_configuration, configuration_path) - if local_configuration["definition_type"] == "connection": - return Connection(api_client, workspace_id, local_configuration, configuration_path) + raw_configuration = yaml.load(f, EnvVarLoader) + if raw_configuration["definition_type"] == "source": + return Source(api_client, workspace_id, raw_configuration, configuration_path) + if raw_configuration["definition_type"] == "destination": + return Destination(api_client, workspace_id, raw_configuration, configuration_path) + if raw_configuration["definition_type"] == "connection": + return Connection(api_client, workspace_id, raw_configuration, configuration_path) else: - raise NotImplementedError(f"Resource {local_configuration['definition_type']} was not yet implemented") + raise NotImplementedError(f"Resource {raw_configuration['definition_type']} was not yet implemented") diff --git a/octavia-cli/octavia_cli/generate/renderers.py b/octavia-cli/octavia_cli/generate/renderers.py index 0b81adfd25708..0d84550eaf590 100644 --- a/octavia-cli/octavia_cli/generate/renderers.py +++ b/octavia-cli/octavia_cli/generate/renderers.py @@ -243,7 +243,6 @@ def __init__(self, connection_name: str, source: resources.Source, destination: @staticmethod def catalog_to_yaml(catalog: dict) -> str: """Convert the source catalog to a YAML string. - Convert camel case to snake case. Args: catalog (dict): Source's catalog. diff --git a/octavia-cli/octavia_cli/generate/templates/connection.yaml.j2 b/octavia-cli/octavia_cli/generate/templates/connection.yaml.j2 index 8ddb33caca27e..d711d0d2f8a09 100644 --- a/octavia-cli/octavia_cli/generate/templates/connection.yaml.j2 +++ b/octavia-cli/octavia_cli/generate/templates/connection.yaml.j2 @@ -6,20 +6,17 @@ destination_id: {{ destination_id }} # EDIT THE CONFIGURATION BELOW! configuration: - sourceId: {{ source_id }} # REQUIRED | string - destinationId: {{ destination_id }} # REQUIRED | string status: active # REQUIRED | string | Allowed values: active, inactive, deprecated - name: {{ connection_name }} # OPTIONAL | string | Optional name of the connection - namespaceDefinition: source # OPTIONAL | string | Allowed values: source, destination, customformat - namespaceFormat: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'. + namespace_definition: source # OPTIONAL | string | Allowed values: source, destination, customformat + namespace_format: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'. prefix: "" # REQUIRED | Prefix that will be prepended to the name of each stream when it is written to the destination - resourceRequirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations) + resource_requirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations) cpu_limit: "" # OPTIONAL cpu_request: "" # OPTIONAL memory_limit: "" # OPTIONAL memory_request: "" # OPTIONAL schedule: # OPTIONAL | object - timeUnit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months + time_unit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months units: 1 # REQUIRED | integer - syncCatalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed. + sync_catalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed. {{ catalog | indent(4)}} diff --git a/octavia-cli/unit_tests/test_apply/test_resources.py b/octavia-cli/unit_tests/test_apply/test_resources.py index ba3accd05bb80..248121096a4a6 100644 --- a/octavia-cli/unit_tests/test_apply/test_resources.py +++ b/octavia-cli/unit_tests/test_apply/test_resources.py @@ -6,6 +6,11 @@ import pytest from airbyte_api_client import ApiException +from airbyte_api_client.model.airbyte_catalog import AirbyteCatalog +from airbyte_api_client.model.connection_schedule import ConnectionSchedule +from airbyte_api_client.model.connection_status import ConnectionStatus +from airbyte_api_client.model.namespace_definition_type import NamespaceDefinitionType +from airbyte_api_client.model.resource_requirements import ResourceRequirements from octavia_cli.apply import resources, yaml_loaders @@ -45,9 +50,8 @@ def test_save(self, mocker, state): def test_create(self, mocker): mocker.patch.object(resources.time, "time", mocker.Mock(return_value=0)) - mocker.patch.object(resources, "hash_config", mocker.Mock(return_value="my_hash")) mocker.patch.object(resources.ResourceState, "_save") - state = resources.ResourceState.create("config_path", {"my": "config"}, "resource_id") + state = resources.ResourceState.create("config_path", "my_hash", "resource_id") assert isinstance(state, resources.ResourceState) resources.ResourceState._save.assert_called_once() assert state.configuration_path == "config_path" @@ -75,7 +79,14 @@ def test_from_file(self, mocker): @pytest.fixture def local_configuration(): - return {"exotic_attribute": "foo", "configuration": {"foo": "bar"}, "resource_name": "bar", "definition_id": "bar"} + return { + "exotic_attribute": "foo", + "configuration": {"foo": "bar"}, + "resource_name": "bar", + "definition_id": "bar", + "definition_image": "fooo", + "definition_version": "barrr", + } class TestBaseResource: @@ -86,25 +97,24 @@ def patch_base_class(self, mocker): mocker.patch.object(resources.BaseResource, "create_function_name", "create_resource") mocker.patch.object(resources.BaseResource, "resource_id_field", "resource_id") mocker.patch.object(resources.BaseResource, "ResourceIdRequestBody") - mocker.patch.object(resources.BaseResource, "search_function_name", "search_resource") mocker.patch.object(resources.BaseResource, "update_function_name", "update_resource") + mocker.patch.object(resources.BaseResource, "get_function_name", "get_resource") mocker.patch.object(resources.BaseResource, "resource_type", "universal_resource") mocker.patch.object(resources.BaseResource, "api") def test_init_no_remote_resource(self, mocker, patch_base_class, mock_api_client, local_configuration): mocker.patch.object(resources.BaseResource, "_get_state_from_file", mocker.Mock(return_value=None)) - mocker.patch.object(resources.BaseResource, "_get_remote_resource", mocker.Mock(return_value=False)) mocker.patch.object(resources, "hash_config") resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") assert resource.APPLY_PRIORITY == 0 assert resource.workspace_id == "workspace_id" - assert resource.local_configuration == local_configuration + assert resource.raw_configuration == local_configuration assert resource.configuration_path == "bar.yaml" assert resource.api_instance == resource.api.return_value resource.api.assert_called_with(mock_api_client) assert resource.state == resource._get_state_from_file.return_value - assert resource.remote_resource == resource._get_remote_resource.return_value - assert resource.was_created == resource._get_remote_resource.return_value + assert resource.remote_resource is None + assert resource.was_created is False assert resource.local_file_changed is True assert resource.resource_id is None @@ -137,34 +147,11 @@ def test_init_with_remote_resource_changed(self, mocker, patch_base_class, mock_ def resource(self, patch_base_class, mock_api_client, local_configuration): return resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") - def test_get_attr(self, resource, local_configuration): - assert resource.exotic_attribute == local_configuration["exotic_attribute"] - with pytest.raises(AttributeError): - resource.wrong_attribute - - def test_search(self, resource): - search_results = resource._search() - assert search_results == resource._search_fn.return_value - resource._search_fn.assert_called_with(resource.api_instance, resource.search_payload, _check_return_type=True) - - @pytest.mark.parametrize( - "search_results,expected_error,expected_output", - [ - ([], None, None), - (["foo"], None, "foo"), - (["foo", "bar"], resources.DuplicateResourceError, None), - ], - ) - def test_get_remote_resource(self, resource, mocker, search_results, expected_error, expected_output): - mock_search_results = mocker.Mock(return_value=search_results) - mocker.patch.object(resource, "_search", mocker.Mock(return_value=mocker.Mock(get=mock_search_results))) - if expected_error is None: - remote_resource = resource._get_remote_resource() - assert remote_resource == expected_output - else: - with pytest.raises(expected_error): - remote_resource = resource._get_remote_resource() - resource._search.return_value.get.assert_called_with("universal_resources", []) + def test_get_remote_resource(self, resource, mocker): + mocker.patch.object(resource, "_get_fn") + remote_resource = resource._get_remote_resource() + assert remote_resource == resource._get_fn.return_value + resource._get_fn.assert_called_with(resource.api_instance, resource.get_payload) @pytest.mark.parametrize( "state_path_is_file", @@ -175,7 +162,7 @@ def test_get_state_from_file(self, mocker, resource, state_path_is_file): mock_expected_state_path = mocker.Mock(is_file=mocker.Mock(return_value=state_path_is_file)) mocker.patch.object(resources, "Path", mocker.Mock(return_value=mock_expected_state_path)) mocker.patch.object(resources, "ResourceState") - state = resource._get_state_from_file() + state = resource._get_state_from_file(resource.configuration_path) resources.os.path.dirname.assert_called_with(resource.configuration_path) resources.os.path.join.assert_called_with(resources.os.path.dirname.return_value, "state.yaml") resources.Path.assert_called_with(resources.os.path.join.return_value) @@ -204,13 +191,13 @@ def test_resource_id_request_body(self, mocker, resource_id, resource): [True, False], ) def test_get_diff_with_remote_resource(self, patch_base_class, mocker, mock_api_client, local_configuration, was_created): - mocker.patch.object(resources.BaseResource, "_get_comparable_configuration") + mocker.patch.object(resources.BaseResource, "_get_remote_comparable_configuration") mocker.patch.object(resources.BaseResource, "was_created", was_created) resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") mocker.patch.object(resources, "compute_diff") if was_created: diff = resource.get_diff_with_remote_resource() - resources.compute_diff.assert_called_with(resource._get_comparable_configuration.return_value, resource.configuration) + resources.compute_diff.assert_called_with(resource._get_remote_comparable_configuration.return_value, resource.configuration) assert diff == resources.compute_diff.return_value.pretty.return_value else: with pytest.raises(resources.NonExistingResourceError): @@ -224,7 +211,7 @@ def test_create_or_update(self, mocker, resource): result, state = resource._create_or_update(operation_fn, payload) assert result == expected_results assert state == resources.ResourceState.create.return_value - resources.ResourceState.create.assert_called_with(resource.configuration_path, resource.local_configuration, "resource_id") + resources.ResourceState.create.assert_called_with(resource.configuration_path, resource.configuration_hash, "resource_id") @pytest.mark.parametrize( "response_status,expected_error", @@ -247,18 +234,45 @@ def test_update(self, mocker, resource): resource._create_or_update.assert_called_with(resource._update_fn, resource.update_payload) @pytest.mark.parametrize( - "was_created", - [True, False], + "configuration, invalid_keys, expect_error", + [ + ({"valid_key": "foo", "invalidKey": "bar"}, {"invalidKey"}, True), + ({"valid_key": "foo", "invalidKey": "bar", "secondInvalidKey": "bar"}, {"invalidKey", "secondInvalidKey"}, True), + ({"valid_key": "foo", "validKey": "bar"}, {"invalidKey"}, False), + ], ) - def test_get_comparable_configuration(self, patch_base_class, mocker, mock_api_client, local_configuration, was_created): - mocker.patch.object(resources.BaseResource, "was_created", was_created) - mocker.patch.object(resources.BaseResource, "remote_resource") - resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") - if not was_created: - with pytest.raises(resources.NonExistingResourceError): - resource._get_comparable_configuration() + def test__check_for_invalid_configuration_keys(self, configuration, invalid_keys, expect_error): + if not expect_error: + result = resources.BaseResource._check_for_invalid_configuration_keys(configuration, invalid_keys, "You have some invalid keys") + assert result is None else: - assert resource._get_comparable_configuration() == resource.remote_resource + with pytest.raises(resources.InvalidConfigurationError, match="You have some invalid keys: ") as error_info: + resources.BaseResource._check_for_invalid_configuration_keys(configuration, invalid_keys, "You have some invalid keys") + assert all([invalid_key in str(error_info) for invalid_key in invalid_keys]) + + +class TestSourceAndDestination: + @pytest.fixture + def patch_source_and_destination(self, mocker): + mocker.patch.object(resources.SourceAndDestination, "__abstractmethods__", set()) + mocker.patch.object(resources.SourceAndDestination, "api") + mocker.patch.object(resources.SourceAndDestination, "create_function_name", "create") + mocker.patch.object(resources.SourceAndDestination, "update_function_name", "update") + mocker.patch.object(resources.SourceAndDestination, "get_function_name", "get") + mocker.patch.object(resources.SourceAndDestination, "_get_state_from_file", mocker.Mock(return_value=None)) + mocker.patch.object(resources, "hash_config") + + def test_init(self, patch_source_and_destination, mocker, mock_api_client, local_configuration): + assert resources.SourceAndDestination.__base__ == resources.BaseResource + resource = resources.SourceAndDestination(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + assert resource.definition_id == local_configuration["definition_id"] + assert resource.definition_image == local_configuration["definition_image"] + assert resource.definition_version == local_configuration["definition_version"] + + def test_get_remote_comparable_configuration(self, patch_source_and_destination, mocker, mock_api_client, local_configuration): + mocker.patch.object(resources.Source, "remote_resource") + resource = resources.Source(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + assert resource._get_remote_comparable_configuration() == resource.remote_resource.connection_configuration class TestSource: @@ -267,14 +281,13 @@ class TestSource: [None, resources.ResourceState("config_path", "resource_id", 123, "abc")], ) def test_init(self, mocker, mock_api_client, local_configuration, state): - assert resources.Source.__base__ == resources.BaseResource + assert resources.Source.__base__ == resources.SourceAndDestination mocker.patch.object(resources.Source, "resource_id", "foo") source = resources.Source(mock_api_client, "workspace_id", local_configuration, "bar.yaml") mocker.patch.object(source, "state", state) assert source.api == resources.source_api.SourceApi assert source.create_function_name == "create_source" assert source.resource_id_field == "source_id" - assert source.search_function_name == "search_sources" assert source.update_function_name == "update_source" assert source.resource_type == "source" assert source.APPLY_PRIORITY == 0 @@ -285,22 +298,9 @@ def test_init(self, mocker, mock_api_client, local_configuration, state): source_id=source.resource_id, connection_configuration=source.configuration, name=source.resource_name ) if state is None: - assert source.search_payload == resources.SourceSearch( - source_definition_id=source.definition_id, workspace_id=source.workspace_id, name=source.resource_name - ) + assert source.get_payload is None else: - assert source.search_payload == resources.SourceSearch( - source_definition_id=source.definition_id, workspace_id=source.workspace_id, source_id=source.state.resource_id - ) - - def test_get_comparable_configuration(self, mocker, mock_api_client, local_configuration): - mock_base_comparable_configuration = mocker.Mock() - mocker.patch.object(resources.BaseResource, "_get_comparable_configuration", mock_base_comparable_configuration) - mocker.patch.object(resources.Source, "was_created", True) - mocker.patch.object(resources.Source, "remote_resource") - - resource = resources.Source(mock_api_client, "workspace_id", local_configuration, "bar.yaml") - assert resource._get_comparable_configuration() == mock_base_comparable_configuration.return_value.connection_configuration + assert source.get_payload == resources.SourceIdRequestBody(state.resource_id) @pytest.mark.parametrize( "resource_id", @@ -324,9 +324,7 @@ def test_catalog(self, mocker, mock_api_client, local_configuration): source.api_instance = mocker.Mock() catalog = source.catalog assert catalog == source.api_instance.discover_schema_for_source.return_value.catalog - source.api_instance.discover_schema_for_source.assert_called_with( - source.source_discover_schema_request_body, _check_return_type=False - ) + source.api_instance.discover_schema_for_source.assert_called_with(source.source_discover_schema_request_body) class TestDestination: @@ -335,14 +333,13 @@ class TestDestination: [None, resources.ResourceState("config_path", "resource_id", 123, "abc")], ) def test_init(self, mocker, mock_api_client, local_configuration, state): - assert resources.Destination.__base__ == resources.BaseResource + assert resources.Destination.__base__ == resources.SourceAndDestination mocker.patch.object(resources.Destination, "resource_id", "foo") destination = resources.Destination(mock_api_client, "workspace_id", local_configuration, "bar.yaml") mocker.patch.object(destination, "state", state) assert destination.api == resources.destination_api.DestinationApi assert destination.create_function_name == "create_destination" assert destination.resource_id_field == "destination_id" - assert destination.search_function_name == "search_destinations" assert destination.update_function_name == "update_destination" assert destination.resource_type == "destination" assert destination.APPLY_PRIORITY == 0 @@ -353,24 +350,9 @@ def test_init(self, mocker, mock_api_client, local_configuration, state): destination_id=destination.resource_id, connection_configuration=destination.configuration, name=destination.resource_name ) if state is None: - assert destination.search_payload == resources.DestinationSearch( - destination_definition_id=destination.definition_id, workspace_id=destination.workspace_id, name=destination.resource_name - ) + assert destination.get_payload is None else: - assert destination.search_payload == resources.DestinationSearch( - destination_definition_id=destination.definition_id, - workspace_id=destination.workspace_id, - destination_id=destination.state.resource_id, - ) - - def test_get_comparable_configuration(self, mocker, mock_api_client, local_configuration): - mock_base_comparable_configuration = mocker.Mock() - mocker.patch.object(resources.BaseResource, "_get_comparable_configuration", mock_base_comparable_configuration) - mocker.patch.object(resources.Destination, "was_created", True) - mocker.patch.object(resources.Destination, "remote_resource") - - resource = resources.Destination(mock_api_client, "workspace_id", local_configuration, "bar.yaml") - assert resource._get_comparable_configuration() == mock_base_comparable_configuration.return_value.connection_configuration + assert destination.get_payload == resources.DestinationIdRequestBody(state.resource_id) class TestConnection: @@ -382,40 +364,117 @@ def connection_configuration(self): "source_id": "my_source", "destination_id": "my_destination", "configuration": { - "sourceId": "my_source", - "destinationId": "my_destination", - "namespaceDefinition": "customformat", - "namespaceFormat": "foo", + "namespace_definition": "customformat", + "namespace_format": "foo", "prefix": "foo", - "syncCatalog": { + "sync_catalog": { "streams": [ { "stream": { "name": "name_example", - "jsonSchema": {}, - "supportedSyncModes": ["incremental"], - "sourceDefinedCursor": True, - "defaultCursorField": ["default_cursor_field"], - "sourceDefinedPrimary_key": [["string_example"]], + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": True, + "default_cursor_field": ["default_cursor_field"], + "source_defined_primary_key": [["string_example"]], "namespace": "namespace_example", }, "config": { - "syncMode": "incremental", - "cursorField": ["cursor_field_example"], - "destinationSyncMode": "append_dedup", - "primaryKey": [["string_example"]], - "aliasName": "alias_name_example", + "sync_mode": "incremental", + "cursor_field": ["cursor_field_example"], + "destination_sync_mode": "append_dedup", + "primary_key": [["string_example"]], + "alias_name": "alias_name_example", "selected": True, }, } ] }, - "schedule": {"units": 1, "time_units": "days"}, + "schedule": {"units": 1, "time_unit": "days"}, "status": "active", - "resourceRequirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, + "resource_requirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, }, } + @pytest.fixture + def legacy_connection_configurations(self): + return [ + { + "definition_type": "connection", + "resource_name": "my_connection", + "source_id": "my_source", + "destination_id": "my_destination", + "configuration": { + "namespaceDefinition": "customformat", + "namespaceFormat": "foo", + "prefix": "foo", + "syncCatalog": { + "streams": [ + { + "stream": { + "name": "name_example", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": True, + "default_cursor_field": ["default_cursor_field"], + "source_defined_primary_key": [["string_example"]], + "namespace": "namespace_example", + }, + "config": { + "sync_mode": "incremental", + "cursor_field": ["cursor_field_example"], + "destination_sync_mode": "append_dedup", + "primary_key": [["string_example"]], + "alias_name": "alias_name_example", + "selected": True, + }, + } + ] + }, + "schedule": {"units": 1, "time_unit": "days"}, + "status": "active", + "resourceRequirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, + }, + }, + { + "definition_type": "connection", + "resource_name": "my_connection", + "source_id": "my_source", + "destination_id": "my_destination", + "configuration": { + "namespace_definition": "customformat", + "namespace_format": "foo", + "prefix": "foo", + "sync_catalog": { + "streams": [ + { + "stream": { + "name": "name_example", + "jsonSchema": {}, + "supportedSyncModes": ["incremental"], + "sourceDefinedCursor": True, + "defaultCursorField": ["default_cursor_field"], + "sourceDefinedPrimary_key": [["string_example"]], + "namespace": "namespace_example", + }, + "config": { + "syncMode": "incremental", + "cursorField": ["cursor_field_example"], + "destinationSyncMode": "append_dedup", + "primaryKey": [["string_example"]], + "aliasName": "alias_name_example", + "selected": True, + }, + } + ] + }, + "schedule": {"units": 1, "time_unit": "days"}, + "status": "active", + "resource_requirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, + }, + }, + ] + @pytest.mark.parametrize( "state", [None, resources.ResourceState("config_path", "resource_id", 123, "abc")], @@ -428,70 +487,112 @@ def test_init(self, mocker, mock_api_client, state, connection_configuration): assert connection.api == resources.connection_api.ConnectionApi assert connection.create_function_name == "create_connection" assert connection.resource_id_field == "connection_id" - assert connection.search_function_name == "search_connections" assert connection.update_function_name == "update_connection" assert connection.resource_type == "connection" assert connection.APPLY_PRIORITY == 1 assert connection.create_payload == resources.ConnectionCreate( - **connection.configuration, _check_type=False, _spec_property_naming=True - ) - assert connection.update_payload == resources.ConnectionUpdate( - connection_id=connection.resource_id, - sync_catalog=connection.configuration["syncCatalog"], - status=connection.configuration["status"], - namespace_definition=connection.configuration["namespaceDefinition"], - namespace_format=connection.configuration["namespaceFormat"], - prefix=connection.configuration["prefix"], - schedule=connection.configuration["schedule"], - resource_requirements=connection.configuration["resourceRequirements"], - _check_type=False, + name=connection.resource_name, + source_id=connection.source_id, + destination_id=connection.destination_id, + **connection.configuration, ) + assert connection.update_payload == resources.ConnectionUpdate(connection_id=connection.resource_id, **connection.configuration) if state is None: - assert connection.search_payload == resources.ConnectionSearch( - source_id=connection.source_id, - destination_id=connection.destination_id, - name=connection.resource_name, - status=resources.ConnectionStatus( - connection_configuration["configuration"]["status"], - ), - ) + assert connection.get_payload is None else: - assert connection.search_payload == resources.ConnectionSearch( - connection_id=connection.state.resource_id, source_id=connection.source_id, destination_id=connection.destination_id - ) + assert connection.get_payload == resources.ConnectionIdRequestBody(state.resource_id) - def test_get_comparable_configuration(self, mocker, mock_api_client, connection_configuration): - mock_base_comparable_configuration = mocker.Mock( - return_value={"foo": "bar", "connectionId": "should be popped", "operationIds": "should be popped"} + def test_get_remote_comparable_configuration(self, mocker, mock_api_client, connection_configuration): + mocker.patch.object( + resources.Connection, + "remote_resource", + mocker.Mock( + to_dict=mocker.Mock( + return_value={ + "name": "foo", + "source_id": "bar", + "destination_id": "fooo", + "connection_id": "baar", + "operation_ids": "foooo", + "foo": "bar", + } + ) + ), ) - mocker.patch.object(resources.BaseResource, "_get_comparable_configuration", mock_base_comparable_configuration) - mocker.patch.object(resources.Connection, "was_created", True) - mocker.patch.object(resources.Connection, "remote_resource") - resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") - assert resource._get_comparable_configuration() == {"foo": "bar"} - - def test__search(self, mocker, mock_api_client, connection_configuration): - resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") - mocker.patch.object(resource, "_search_fn") - search_results = resource._search() - assert search_results == resource._search_fn.return_value - resource._search_fn.assert_called_with(resource.api_instance, resource.search_payload, _check_return_type=False) + assert resource._get_remote_comparable_configuration() == {"foo": "bar"} + resource.remote_resource.to_dict.assert_called_once() def test_create(self, mocker, mock_api_client, connection_configuration): mocker.patch.object(resources.Connection, "_create_or_update") resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") create_result = resource.create() assert create_result == resource._create_or_update.return_value - resource._create_or_update.assert_called_with(resource._create_fn, resource.create_payload, _check_return_type=False) + resource._create_or_update.assert_called_with(resource._create_fn, resource.create_payload) def test_update(self, mocker, mock_api_client, connection_configuration): mocker.patch.object(resources.Connection, "_create_or_update") resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + resource.state = mocker.Mock(resource_id="foo") update_result = resource.update() assert update_result == resource._create_or_update.return_value - resource._create_or_update.assert_called_with(resource._update_fn, resource.update_payload, _check_return_type=False) + resource._create_or_update.assert_called_with(resource._update_fn, resource.update_payload) + + def test__deserialize_raw_configuration(self, mock_api_client, connection_configuration): + resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + configuration = resource._deserialize_raw_configuration() + assert isinstance(configuration["sync_catalog"], AirbyteCatalog) + assert configuration["namespace_definition"] == NamespaceDefinitionType( + connection_configuration["configuration"]["namespace_definition"] + ) + assert configuration["schedule"] == ConnectionSchedule(**connection_configuration["configuration"]["schedule"]) + assert configuration["resource_requirements"] == ResourceRequirements( + **connection_configuration["configuration"]["resource_requirements"] + ) + assert configuration["status"] == ConnectionStatus(connection_configuration["configuration"]["status"]) + assert list(configuration.keys()) == [ + "namespace_definition", + "namespace_format", + "prefix", + "sync_catalog", + "schedule", + "status", + "resource_requirements", + ] + + def test__create_configured_catalog(self, mock_api_client, connection_configuration): + resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + created_catalog = resource._create_configured_catalog(connection_configuration["configuration"]["sync_catalog"]) + stream, config = ( + connection_configuration["configuration"]["sync_catalog"]["streams"][0]["stream"], + connection_configuration["configuration"]["sync_catalog"]["streams"][0]["config"], + ) + + assert len(created_catalog.streams) == len(connection_configuration["configuration"]["sync_catalog"]["streams"]) + assert created_catalog.streams[0].stream.name == stream["name"] + assert created_catalog.streams[0].stream.json_schema == stream["json_schema"] + assert created_catalog.streams[0].stream.supported_sync_modes == stream["supported_sync_modes"] + assert created_catalog.streams[0].stream.source_defined_cursor == stream["source_defined_cursor"] + assert created_catalog.streams[0].stream.namespace == stream["namespace"] + assert created_catalog.streams[0].stream.source_defined_primary_key == stream["source_defined_primary_key"] + assert created_catalog.streams[0].stream.default_cursor_field == stream["default_cursor_field"] + + assert created_catalog.streams[0].config.sync_mode == config["sync_mode"] + assert created_catalog.streams[0].config.cursor_field == config["cursor_field"] + assert created_catalog.streams[0].config.destination_sync_mode == config["destination_sync_mode"] + assert created_catalog.streams[0].config.primary_key == config["primary_key"] + assert created_catalog.streams[0].config.alias_name == config["alias_name"] + assert created_catalog.streams[0].config.selected == config["selected"] + + def test__check_for_legacy_connection_configuration_keys( + self, mock_api_client, connection_configuration, legacy_connection_configurations + ): + resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + assert resource._check_for_legacy_connection_configuration_keys(connection_configuration["configuration"]) is None + for legacy_configuration in legacy_connection_configurations: + with pytest.raises(resources.InvalidConfigurationError): + resource._check_for_legacy_connection_configuration_keys(legacy_configuration["configuration"]) @pytest.mark.parametrize(