Skip to content

Commit

Permalink
✨CAT: add test_primary_keys_data_type (#38755)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
  • Loading branch information
Anton Karpets and lazebnyi authored Jun 11, 2024
1 parent 9f0ce4f commit 04e776f
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 3.8.0

Add `TestDiscovery.test_primary_keys_data_type`, which validates that primary keys are not of type `array` or `object` in discovered catalog.
Add `validate_primary_keys_data_type` to `TestBasicRead.test_read`, which validates that primary keys are not of type `array` or `object` in records
and ensures that at least one primary key is non-null when there is a composite primary key.

## 3.7.0

Add `validate_state_messages` to TestBasicRead.test_read:: Validate that all states contain neither legacy state emissions nor missing source stats in the state message.
Expand Down Expand Up @@ -34,7 +40,7 @@ Add `test_certified_connector_has_allowed_hosts` and `test_certified_connector_h

## 3.2.0

Add TestBasicRead.test_all_supported_file_types_present, which validates that all supported file types are present in the sandbox account for certified file-based connectors.
Add `TestBasicRead.test_all_supported_file_types_present`, which validates that all supported file types are present in the sandbox account for certified file-based connectors.

## 3.1.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class DiscoveryTestConfig(BaseConfig):
backward_compatibility_tests_config: BackwardCompatibilityTestsConfig = Field(
description="Configuration for the backward compatibility tests.", default=BackwardCompatibilityTestsConfig()
)
validate_primary_keys_data_type: bool = Field(True, description="Ensure correct primary keys data type")


class ExpectedRecordsConfig(BaseModel):
Expand Down Expand Up @@ -168,6 +169,7 @@ class BasicReadTestConfig(BaseConfig):
validate_schema: bool = Field(True, description="Ensure that records match the schema of the corresponding stream")
validate_stream_statuses: bool = Field(None, description="Ensure that all streams emit status messages")
validate_state_messages: bool = Field(True, description="Ensure that state messages emitted as expected")
validate_primary_keys_data_type: bool = Field(True, description="Ensure correct primary keys data type")
fail_on_extra_columns: bool = Field(True, description="Fail if extra top-level properties (i.e. columns) are detected in records.")
# TODO: remove this field after https://github.com/airbytehq/airbyte/issues/8312 is done
validate_data_points: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import functools
import json
import logging
import re
Expand Down Expand Up @@ -39,7 +38,6 @@
)
from connector_acceptance_test.base import BaseTest
from connector_acceptance_test.config import (
AllowedHostsConfiguration,
BasicReadTestConfig,
Config,
ConnectionTestConfig,
Expand Down Expand Up @@ -834,6 +832,32 @@ def test_catalog_has_supported_data_types(self, discovered_catalog: Mapping[str,
f"Found unsupported type/format combination {type_format_combination} in {stream_name} stream on property {parent_path}"
)

def test_primary_keys_data_type(self, inputs: DiscoveryTestConfig, discovered_catalog: Mapping[str, Any]):
if not inputs.validate_primary_keys_data_type:
pytest.skip("Primary keys data type validation is disabled in config.")

forbidden_primary_key_data_types: Set[str] = {"object", "array"}
errors: List[str] = []

for stream_name, stream in discovered_catalog.items():
if not stream.source_defined_primary_key:
continue

for primary_key_part in stream.source_defined_primary_key:
primary_key_path = "/properties/".join(primary_key_part)
try:
primary_key_definition = dpath.util.get(stream.json_schema["properties"], primary_key_path)
except KeyError:
errors.append(f"Stream {stream_name} does not have defined primary key in schema")
continue

data_type = set(primary_key_definition.get("type", []))

if data_type.intersection(forbidden_primary_key_data_types):
errors.append(f"Stream {stream_name} contains primary key with forbidden type of {data_type}")

assert not errors, "\n".join(errors)


def primary_keys_for_records(streams, records):
streams_with_primary_key = [stream for stream in streams if stream.stream.source_defined_primary_key]
Expand Down Expand Up @@ -1003,6 +1027,10 @@ def should_validate_stream_statuses_fixture(self, inputs: BasicReadTestConfig, i
def should_validate_state_messages_fixture(self, inputs: BasicReadTestConfig):
return inputs.validate_state_messages

@pytest.fixture(name="should_validate_primary_keys_data_type")
def should_validate_primary_keys_data_type_fixture(self, inputs: BasicReadTestConfig):
return inputs.validate_primary_keys_data_type

@pytest.fixture(name="should_fail_on_extra_columns")
def should_fail_on_extra_columns_fixture(self, inputs: BasicReadTestConfig):
# TODO (Ella): enforce this param once all connectors are passing
Expand Down Expand Up @@ -1056,6 +1084,7 @@ async def test_read(
should_validate_data_points: Boolean,
should_validate_stream_statuses: Boolean,
should_validate_state_messages: Boolean,
should_validate_primary_keys_data_type: Boolean,
should_fail_on_extra_columns: Boolean,
empty_streams: Set[EmptyStreamConfiguration],
ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]],
Expand All @@ -1078,11 +1107,9 @@ async def test_read(
self._validate_schema(records=records, configured_catalog=configured_catalog)

self._validate_empty_streams(records=records, configured_catalog=configured_catalog, allowed_empty_streams=empty_streams)
for pks, record in primary_keys_for_records(streams=configured_catalog.streams, records=records):
for pk_path, pk_value in pks.items():
assert (
pk_value is not None
), f"Primary key subkeys {repr(pk_path)} have null values or not present in {record.stream} stream records."

if should_validate_primary_keys_data_type:
self._validate_primary_keys_data_type(streams=configured_catalog.streams, records=records)

# TODO: remove this condition after https://github.com/airbytehq/airbyte/issues/8312 is done
if should_validate_data_points:
Expand Down Expand Up @@ -1286,10 +1313,27 @@ def _validate_state_messages(state_messages: List[AirbyteMessage], configured_ca
# Check if stats are of the correct type and present in state message
assert isinstance(state.sourceStats, AirbyteStateStats), "Source stats should be in state message."

@staticmethod
def _validate_primary_keys_data_type(streams: List[ConfiguredAirbyteStream], records: List[AirbyteRecordMessage]):
data_types_mapping = {"dict": "object", "list": "array"}
for primary_keys, record in primary_keys_for_records(streams=streams, records=records):
stream_name = record.stream
non_nullable_key_part_found = False
for primary_key_path, primary_key_value in primary_keys.items():
if primary_key_value is not None:
non_nullable_key_part_found = True

assert not isinstance(primary_key_value, (list, dict)), (
f"Stream {stream_name} contains primary key with forbidden type "
f"of '{data_types_mapping.get(primary_key_value.__class__.__name__)}'"
)

assert non_nullable_key_part_found, f"Stream {stream_name} contains primary key with null values in all its parts"


@pytest.mark.default_timeout(TEN_MINUTES)
class TestConnectorAttributes(BaseTest):
# Overide from BaseTest!
# Override from BaseTest!
# Used so that this is not part of the mandatory high strictness test suite yet
MANDATORY_FOR_TEST_STRICTNESS_LEVELS = []

Expand Down
Loading

0 comments on commit 04e776f

Please sign in to comment.