diff --git a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md index 8d72333c0e74..c5fa6e820878 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 3.8.0 + +Add `TestDiscovery.test_primary_keys_data_type`, which validates that primary keys are not of type `array` or `object` in discovered catalog. +Add `validate_primary_keys_data_type` to `TestBasicRead.test_read`, which validates that primary keys are not of type `array` or `object` in records +and ensures that at least one primary key is non-null when there is a composite primary key. + ## 3.7.0 Add `validate_state_messages` to TestBasicRead.test_read:: Validate that all states contain neither legacy state emissions nor missing source stats in the state message. @@ -34,7 +40,7 @@ Add `test_certified_connector_has_allowed_hosts` and `test_certified_connector_h ## 3.2.0 -Add TestBasicRead.test_all_supported_file_types_present, which validates that all supported file types are present in the sandbox account for certified file-based connectors. +Add `TestBasicRead.test_all_supported_file_types_present`, which validates that all supported file types are present in the sandbox account for certified file-based connectors. ## 3.1.0 diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py index 8b6fa45eee32..b609706b8f12 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/config.py @@ -83,6 +83,7 @@ class DiscoveryTestConfig(BaseConfig): backward_compatibility_tests_config: BackwardCompatibilityTestsConfig = Field( description="Configuration for the backward compatibility tests.", default=BackwardCompatibilityTestsConfig() ) + validate_primary_keys_data_type: bool = Field(True, description="Ensure correct primary keys data type") class ExpectedRecordsConfig(BaseModel): @@ -168,6 +169,7 @@ class BasicReadTestConfig(BaseConfig): validate_schema: bool = Field(True, description="Ensure that records match the schema of the corresponding stream") validate_stream_statuses: bool = Field(None, description="Ensure that all streams emit status messages") validate_state_messages: bool = Field(True, description="Ensure that state messages emitted as expected") + validate_primary_keys_data_type: bool = Field(True, description="Ensure correct primary keys data type") fail_on_extra_columns: bool = Field(True, description="Fail if extra top-level properties (i.e. columns) are detected in records.") # TODO: remove this field after https://github.com/airbytehq/airbyte/issues/8312 is done validate_data_points: bool = Field( diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 50e495c32285..3eb8686b9a77 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -2,7 +2,6 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import functools import json import logging import re @@ -39,7 +38,6 @@ ) from connector_acceptance_test.base import BaseTest from connector_acceptance_test.config import ( - AllowedHostsConfiguration, BasicReadTestConfig, Config, ConnectionTestConfig, @@ -834,6 +832,32 @@ def test_catalog_has_supported_data_types(self, discovered_catalog: Mapping[str, f"Found unsupported type/format combination {type_format_combination} in {stream_name} stream on property {parent_path}" ) + def test_primary_keys_data_type(self, inputs: DiscoveryTestConfig, discovered_catalog: Mapping[str, Any]): + if not inputs.validate_primary_keys_data_type: + pytest.skip("Primary keys data type validation is disabled in config.") + + forbidden_primary_key_data_types: Set[str] = {"object", "array"} + errors: List[str] = [] + + for stream_name, stream in discovered_catalog.items(): + if not stream.source_defined_primary_key: + continue + + for primary_key_part in stream.source_defined_primary_key: + primary_key_path = "/properties/".join(primary_key_part) + try: + primary_key_definition = dpath.util.get(stream.json_schema["properties"], primary_key_path) + except KeyError: + errors.append(f"Stream {stream_name} does not have defined primary key in schema") + continue + + data_type = set(primary_key_definition.get("type", [])) + + if data_type.intersection(forbidden_primary_key_data_types): + errors.append(f"Stream {stream_name} contains primary key with forbidden type of {data_type}") + + assert not errors, "\n".join(errors) + def primary_keys_for_records(streams, records): streams_with_primary_key = [stream for stream in streams if stream.stream.source_defined_primary_key] @@ -1003,6 +1027,10 @@ def should_validate_stream_statuses_fixture(self, inputs: BasicReadTestConfig, i def should_validate_state_messages_fixture(self, inputs: BasicReadTestConfig): return inputs.validate_state_messages + @pytest.fixture(name="should_validate_primary_keys_data_type") + def should_validate_primary_keys_data_type_fixture(self, inputs: BasicReadTestConfig): + return inputs.validate_primary_keys_data_type + @pytest.fixture(name="should_fail_on_extra_columns") def should_fail_on_extra_columns_fixture(self, inputs: BasicReadTestConfig): # TODO (Ella): enforce this param once all connectors are passing @@ -1056,6 +1084,7 @@ async def test_read( should_validate_data_points: Boolean, should_validate_stream_statuses: Boolean, should_validate_state_messages: Boolean, + should_validate_primary_keys_data_type: Boolean, should_fail_on_extra_columns: Boolean, empty_streams: Set[EmptyStreamConfiguration], ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]], @@ -1078,11 +1107,9 @@ async def test_read( self._validate_schema(records=records, configured_catalog=configured_catalog) self._validate_empty_streams(records=records, configured_catalog=configured_catalog, allowed_empty_streams=empty_streams) - for pks, record in primary_keys_for_records(streams=configured_catalog.streams, records=records): - for pk_path, pk_value in pks.items(): - assert ( - pk_value is not None - ), f"Primary key subkeys {repr(pk_path)} have null values or not present in {record.stream} stream records." + + if should_validate_primary_keys_data_type: + self._validate_primary_keys_data_type(streams=configured_catalog.streams, records=records) # TODO: remove this condition after https://github.com/airbytehq/airbyte/issues/8312 is done if should_validate_data_points: @@ -1286,10 +1313,27 @@ def _validate_state_messages(state_messages: List[AirbyteMessage], configured_ca # Check if stats are of the correct type and present in state message assert isinstance(state.sourceStats, AirbyteStateStats), "Source stats should be in state message." + @staticmethod + def _validate_primary_keys_data_type(streams: List[ConfiguredAirbyteStream], records: List[AirbyteRecordMessage]): + data_types_mapping = {"dict": "object", "list": "array"} + for primary_keys, record in primary_keys_for_records(streams=streams, records=records): + stream_name = record.stream + non_nullable_key_part_found = False + for primary_key_path, primary_key_value in primary_keys.items(): + if primary_key_value is not None: + non_nullable_key_part_found = True + + assert not isinstance(primary_key_value, (list, dict)), ( + f"Stream {stream_name} contains primary key with forbidden type " + f"of '{data_types_mapping.get(primary_key_value.__class__.__name__)}'" + ) + + assert non_nullable_key_part_found, f"Stream {stream_name} contains primary key with null values in all its parts" + @pytest.mark.default_timeout(TEN_MINUTES) class TestConnectorAttributes(BaseTest): - # Overide from BaseTest! + # Override from BaseTest! # Used so that this is not part of the mandatory high strictness test suite yet MANDATORY_FOR_TEST_STRICTNESS_LEVELS = [] diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py index d883df76add5..d1dca6097bbd 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py @@ -6,7 +6,6 @@ from unittest.mock import MagicMock, patch import pytest -from _pytest.outcomes import Failed from airbyte_protocol.models import ( AirbyteErrorTraceMessage, AirbyteLogMessage, @@ -32,6 +31,7 @@ from connector_acceptance_test.config import ( BasicReadTestConfig, Config, + DiscoveryTestConfig, ExpectedRecordsConfig, FileTypesConfig, IgnoredFieldsConfiguration, @@ -555,6 +555,227 @@ def test_catalog_has_supported_data_types(discovered_catalog, expectation): t.test_catalog_has_supported_data_types(discovered_catalog) +@pytest.mark.parametrize( + "discovered_catalog, expectation", + [ + ( + { + "test_stream_1": AirbyteStream.parse_obj( + { + "name": "test_stream_1", + "json_schema": {"properties": {"id": {"type": ["string"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": None, + }, + ), + }, + does_not_raise(), + ), + ( + { + "test_stream_1": AirbyteStream.parse_obj( + { + "name": "test_stream_1", + "json_schema": {"properties": {"id": {"type": ["string"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["id"]], + }, + ), + }, + does_not_raise(), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": { + "properties": { + "data": {"type": ["object"], "properties": {"id": {"type": ["string"]}}}, + }, + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["data", "id"]], + }, + ), + }, + does_not_raise(), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": {"properties": {"id": {"type": ["string"]}, "timestamp": {"type": ["integer"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["timestamp"], ["id"]], + }, + ), + }, + does_not_raise(), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": {"properties": {"id": {"type": ["string"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["timestamp"]], + }, + ), + }, + pytest.raises(AssertionError, match="Stream stream_1 does not have defined primary key in schema"), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": {"properties": {"id": {"type": ["object"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["id"]], + }, + ), + }, + pytest.raises( + AssertionError, match="Stream stream_1 contains primary key with forbidden type of {'object'}" + ), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": { + "properties": { + "data": {"type": ["object"], "properties": {"id": {"type": ["object"]}}}, + }, + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["data", "id"]], + }, + ), + }, + pytest.raises( + AssertionError, match="Stream stream_1 contains primary key with forbidden type of {'object'}" + ), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": {"properties": {"id": {"type": ["object"]}, "timestamp": {"type": ["integer"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["timestamp"], ["id"]], + }, + ), + }, + pytest.raises( + AssertionError, match="Stream stream_1 contains primary key with forbidden type of {'object'}" + ), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": {"properties": {"id": {"type": ["array"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["id"]], + }, + ), + }, + pytest.raises( + AssertionError, match="Stream stream_1 contains primary key with forbidden type of {'array'}" + ), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": { + "properties": { + "data": {"type": ["object"], "properties": {"id": {"type": ["array"]}}}, + }, + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["data", "id"]], + }, + ), + }, + pytest.raises( + AssertionError, match="Stream stream_1 contains primary key with forbidden type of {'array'}" + ), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": {"properties": {"id": {"type": ["array"]}, "timestamp": {"type": ["integer"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["timestamp"], ["id"]], + }, + ), + }, + pytest.raises( + AssertionError, match="Stream stream_1 contains primary key with forbidden type of {'array'}" + ), + ), + ( + { + "stream_1": AirbyteStream.parse_obj( + { + "name": "stream_1", + "json_schema": {"properties": {"id": {"type": ["object", "null"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["timestamp"], ["id"]], + }, + ), + "stream_2": AirbyteStream.parse_obj( + { + "name": "stream_2", + "json_schema": {"properties": {"id": {"type": ["array"]}}}, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": [["id"]], + }, + ), + }, + pytest.raises( + AssertionError, + match=( + "Stream stream_1 does not have defined primary key in schema\n " + "Stream stream_1 contains primary key with forbidden type of .*\n " + "Stream stream_2 contains primary key with forbidden type of {'array'}" + ), + ), + ), + ], + ids=[ + "when_no_source_defined_primary_key_then_pass", + "when_correct_data_type_then_pass", + "when_nested_key_correct_data_type_then_pass", + "when_composite_key_correct_data_type_then_pass", + "when_no_key_found_in_schema_then_fail", + "when_data_type_is_object_then_fail", + "when_nested_key_data_type_is_object_then_fail", + "when_composite_key_data_type_is_object_then_fail", + "when_data_type_is_array_then_fail", + "when_nested_key_data_type_is_array_then_fail", + "when_composite_key_data_type_is_array_then_fail", + "when_multiple_errors_found_then_fail_all_errors_reported", + ], +) +def test_primary_keys_data_type(discovered_catalog, expectation): + t = test_core.TestDiscovery() + inputs = DiscoveryTestConfig() + with expectation: + t.test_primary_keys_data_type(inputs, discovered_catalog) + + @pytest.mark.parametrize( "test_strictness_level, configured_catalog_path", [ @@ -667,7 +888,7 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca None, does_not_raise(), ), - # Expected and Actual records are not equal but we ignore fast changing field + # Expected and Actual records are not equal, but we ignore fast changing field ( {"type": "object"}, {"test_stream": [IgnoredFieldsConfiguration(name="fast_changing_field/*/field", bypass_reason="test")]}, @@ -677,7 +898,7 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca None, does_not_raise(), ), - # Expected and Actual records are not equal but we ignore fast changing field (for case when exact_order=True) + # Expected and Actual records are not equal, but we ignore fast changing field (for case when exact_order=True) ( {"type": "object"}, {"test_stream": [IgnoredFieldsConfiguration(name="fast_changing_field/*/field", bypass_reason="test")]}, @@ -717,7 +938,7 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca [["primary_key"]], does_not_raise(), ), - # Match by primary key when non primary key field values differ + # Match by primary key when non-primary key field values differ ( {"type": "object"}, {}, @@ -789,6 +1010,7 @@ async def test_read(mocker, schema, ignored_fields, expect_records_config, recor should_validate_data_points=False, should_validate_stream_statuses=False, should_validate_state_messages=False, + should_validate_primary_keys_data_type=False, should_fail_on_extra_columns=False, empty_streams=set(), expected_records_by_stream=expected_records_by_stream, @@ -1489,6 +1711,7 @@ async def test_read_validate_async_output_stream_statuses(mocker): should_validate_data_points=False, should_validate_stream_statuses=True, should_validate_state_messages=False, + should_validate_primary_keys_data_type=False, should_fail_on_extra_columns=False, empty_streams=set(), expected_records_by_stream={}, @@ -1590,6 +1813,7 @@ async def test_read_validate_stream_statuses_exceptions(mocker, output): should_validate_data_points=False, should_validate_stream_statuses=True, should_validate_state_messages=False, + should_validate_primary_keys_data_type=False, should_fail_on_extra_columns=False, empty_streams=set(), expected_records_by_stream={}, @@ -1703,19 +1927,19 @@ async def test_all_supported_file_types_present(mocker, file_types_found, should config = BasicReadTestConfig(config_path="config_path", file_types=FileTypesConfig(skip_test=False)) if should_fail: - with pytest.raises(AssertionError) as e: + with pytest.raises(AssertionError): await t.test_all_supported_file_types_present(certified_file_based_connector=True, inputs=config) else: await t.test_all_supported_file_types_present(certified_file_based_connector=True, inputs=config) + @pytest.mark.parametrize( ("state_message_params", "should_fail"), ( - ({"type": AirbyteStateType.STREAM, "sourceStats": AirbyteStateStats(recordCount=1.0)}, False), - ({"type": AirbyteStateType.STREAM}, True), - ({"type": AirbyteStateType.LEGACY}, True), - ({}, True), # Case where state was not emitted - + ({"type": AirbyteStateType.STREAM, "sourceStats": AirbyteStateStats(recordCount=1.0)}, False), + ({"type": AirbyteStateType.STREAM}, True), + ({"type": AirbyteStateType.LEGACY}, True), + ({}, True), # Case where state was not emitted ), ) async def test_read_validate_async_output_state_messages(mocker, state_message_params, should_fail): @@ -1784,13 +2008,14 @@ async def test_read_validate_async_output_state_messages(mocker, state_message_p should_validate_data_points=False, should_validate_stream_statuses=True, should_validate_state_messages=True, + should_validate_primary_keys_data_type=False, should_fail_on_extra_columns=False, empty_streams=set(), expected_records_by_stream={}, docker_runner=docker_runner_mock, ignored_fields=None, detailed_logger=MagicMock(), - certified_file_based_connector=False + certified_file_based_connector=False, ) else: await t.test_read( @@ -1801,11 +2026,125 @@ async def test_read_validate_async_output_state_messages(mocker, state_message_p should_validate_data_points=False, should_validate_stream_statuses=True, should_validate_state_messages=True, + should_validate_primary_keys_data_type=False, + should_fail_on_extra_columns=False, + empty_streams=set(), + expected_records_by_stream={}, + docker_runner=docker_runner_mock, + ignored_fields=None, + detailed_logger=MagicMock(), + certified_file_based_connector=False, + ) + + +@pytest.mark.parametrize( + ("primary_key", "record", "expectation"), + ( + ([["id"]], {"id": "1"}, does_not_raise()), + ([["data", "id"]], {"data": {"id": "1"}}, does_not_raise()), + ([["id"], ["timestamp"]], {"id": "1", "timestamp": 1}, does_not_raise()), + (None, {"id": "1"}, does_not_raise()), + ( + [["id"]], + {"id": {"_id": "1"}}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with forbidden type of 'object'"), + ), + ( + [["data", "id"]], + {"data": {"id": {"_id": "1"}}}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with forbidden type of 'object'"), + ), + ( + [["id"], ["timestamp"]], + {"id": {"_id": "1"}, "timestamp": 1}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with forbidden type of 'object'"), + ), + ( + [["id"]], + {"id": ["1"]}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with forbidden type of 'array'"), + ), + ( + [["data", "id"]], + {"data": {"id": ["1"]}}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with forbidden type of 'array'"), + ), + ( + [["id"], ["timestamp"]], + {"id": ["1"], "timestamp": 1}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with forbidden type of 'array'"), + ), + ( + [["id"]], + {"id": None}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with null values in all"), + ), + ( + [["data", "id"]], + {"data": {"id": None}}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with null values in all"), + ), + ( + [["id"], ["timestamp"]], + {"id": None, "timestamp": None}, + pytest.raises(AssertionError, match="Stream stream_1 contains primary key with null values in all"), + ), + ([["id"], ["timestamp"]], {"id": None, "timestamp": 1}, does_not_raise()), + ), + ids=( + "when_correct_data_type_then_pass", + "when_nested_key_correct_data_type_then_pass", + "when_composite_key_correct_data_type_then_pass", + "when_no_key_defined_then_pass", + "when_data_type_is_object_then_fail", + "when_nested_key_data_type_is_object_then_fail", + "when_composite_key_data_type_is_object_then_fail", + "when_data_type_is_array_then_fail", + "when_nested_key_data_type_is_array_then_fail", + "when_composite_key_data_type_is_array_then_fail", + "when_key_is_null_then_fail", + "when_nested_key_is_null_then_fail", + "when_composite_key_all_sub_keys_are_null_then_fail", + "when_composite_key_one_sub_key_is_null_then_pass", + ), +) +async def test_read_validate_primary_keys_data_type(mocker, primary_key, record, expectation): + stream_name = "stream_1" + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name=stream_name, + json_schema={}, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=primary_key, + ), + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ), + ], + ) + stream_output = [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream=stream_name, data=record, emitted_at=1)), + ] + docker_runner_mock = mocker.MagicMock(call_read=mocker.AsyncMock(return_value=stream_output)) + + t = test_core.TestBasicRead() + with expectation: + await t.test_read( + connector_config=None, + configured_catalog=configured_catalog, + expect_records_config=_DEFAULT_RECORD_CONFIG, + should_validate_schema=False, + should_validate_data_points=False, + should_validate_stream_statuses=False, + should_validate_state_messages=False, + should_validate_primary_keys_data_type=True, should_fail_on_extra_columns=False, empty_streams=set(), expected_records_by_stream={}, docker_runner=docker_runner_mock, ignored_fields=None, detailed_logger=MagicMock(), - certified_file_based_connector=False + certified_file_based_connector=False, ) diff --git a/docs/connector-development/testing-connectors/connector-acceptance-tests-reference.md b/docs/connector-development/testing-connectors/connector-acceptance-tests-reference.md index 998126731c59..2f06431a5cb4 100644 --- a/docs/connector-development/testing-connectors/connector-acceptance-tests-reference.md +++ b/docs/connector-development/testing-connectors/connector-acceptance-tests-reference.md @@ -170,13 +170,14 @@ Verifies when a `discover` operation is run on the connector using the given con Additional tests are validating the backward compatibility of the discovered catalog compared to the catalog of the previous connector version. If no previous connector version is found (by default the test looks for a docker image with the same name but with the `latest` tag), this test is skipped. These backward compatibility tests can be bypassed by changing the value of the `backward_compatibility_tests_config.disable_for_version` input in `acceptance-test-config.yml` (see below). -| Input | Type | Default | Note | -| :--------------------------------------------------------------- | :----- | :------------------------------------------ | :-------------------------------------------------------------------------------------------------------------------- | -| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | -| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | -| `timeout_seconds` | int | 30 | Test execution timeout in seconds | -| `backward_compatibility_tests_config.previous_connector_version` | string | `latest` | Previous connector version to use for backward compatibility tests (expects a version following semantic versioning). | -| `backward_compatibility_tests_config.disable_for_version` | string | None | Disable the backward compatibility test for a specific version (expects a version following semantic versioning). | +| Input | Type | Default | Note | +|:-----------------------------------------------------------------|:--------|:--------------------------------------------|:----------------------------------------------------------------------------------------------------------------------| +| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | +| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | +| `timeout_seconds` | int | 30 | Test execution timeout in seconds | +| `backward_compatibility_tests_config.previous_connector_version` | string | `latest` | Previous connector version to use for backward compatibility tests (expects a version following semantic versioning). | +| `backward_compatibility_tests_config.disable_for_version` | string | None | Disable the backward compatibility test for a specific version (expects a version following semantic versioning). | +| `validate_primary_keys_data_type` | boolean | True | Verify that primary keys data types are correct | ## Test Basic Read @@ -184,7 +185,7 @@ Configuring all streams in the input catalog to full refresh mode verifies that Set `validate_data_points=True` if possible. This validation is going to be enabled by default and won't be configurable in future releases. | Input | Type | Default | Note | -| :---------------------------------------------- | :--------------- | :------------------------------------------ | :----------------------------------------------------------------------------------------------------------- | +|:------------------------------------------------|:-----------------|:--------------------------------------------|:-------------------------------------------------------------------------------------------------------------| | `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | | `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | | `empty_streams` | array of objects | \[\] | List of streams that might be empty with a `bypass_reason` | @@ -193,6 +194,9 @@ Set `validate_data_points=True` if possible. This validation is going to be enab | `ignored_fields[stream][0].name` | string | | Name of the ignored field | | `ignored_fields[stream][0].bypass_reason` | string | None | Reason why this field is ignored | | `validate_schema` | boolean | True | Verify that structure and types of records matches the schema from discovery command | +| `validate_stream_statuses` | boolean | False | Ensure that all streams emit status messages | +| `validate_state_messages` | boolean | True | Ensure that state messages emitted as expected | +| `validate_primary_keys_data_type` | boolean | True | Verify that primary keys data types are correct | | `fail_on_extra_columns` | boolean | True | Fail schema validation if undeclared columns are found in records. Only relevant when `validate_schema=True` | | `validate_data_points` | boolean | False | Validate that all fields in all streams contained at least one data point | | `timeout_seconds` | int | 5\*60 | Test execution timeout in seconds |