Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 193 additions & 1 deletion src/check_datapackage/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from dataclasses import dataclass, field
from functools import reduce
from types import TracebackType
from typing import Any, Callable, Iterator, Optional
from typing import Any, Callable, Iterator, Optional, cast

from jsonpath import findall, resolve
from jsonschema import Draft7Validator, FormatChecker, ValidationError

from check_datapackage.config import Config
Expand All @@ -16,8 +17,10 @@
from check_datapackage.exclusion import exclude
from check_datapackage.extensions import apply_extensions
from check_datapackage.internals import (
PropertyField,
_filter,
_flat_map,
_get_fields_at_jsonpath,
_map,
)
from check_datapackage.issue import Issue
Expand Down Expand Up @@ -126,6 +129,7 @@ class for more details, especially about the default values.
_set_should_fields_to_required(schema)

issues = _check_object_against_json_schema(properties, schema)
issues += _check_keys(properties, _map(issues, lambda issue: issue.jsonpath))
issues += apply_extensions(properties, config.extensions)
issues = exclude(issues, config.exclusions, properties)
issues = sorted(set(issues))
Expand All @@ -136,6 +140,194 @@ class for more details, especially about the default values.
return issues


def _check_keys(properties: dict[str, Any], issue_jsonpaths: list[str]) -> list[Issue]:
"""Check that primary and foreign keys exist."""
# Primary keys
issues = []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in #218


# Foreign keys
resources_with_fk = _get_fields_at_jsonpath(
"$.resources[?(length(@.schema.foreignKeys) > 0)]",
properties,
)
resources_with_fk = _only_resources_with_correct_property_at(
"schema.foreignKeys", resources_with_fk, issue_jsonpaths
)
issues += _flat_map(
resources_with_fk,
lambda resource: _check_foreign_keys(resource, properties),
)
return issues


def _only_resources_with_correct_property_at(
jsonpath: str, resources: list[PropertyField], issue_jsonpaths: list[str]
) -> list[PropertyField]:
"""Filter out resources that have an error at or under the given `jsonpath`."""
return _filter(
resources,
lambda resource: not _filter(
issue_jsonpaths,
lambda path: f"{resource.jsonpath}.{jsonpath}" in path,
),
)


def _check_foreign_keys(
resource: PropertyField, properties: dict[str, Any]
) -> list[Issue]:
"""Check that foreign key source and destination fields exist."""
# Safe, as only FKs of the correct type here
foreign_keys = cast(
list[dict[str, Any]], resolve("/schema/foreignKeys", resource.value)
)
foreign_keys_diff_resource = _filter(
foreign_keys,
lambda fk: "resource" in fk["reference"] and fk["reference"]["resource"] != "",
)
foreign_keys_same_resource = _filter(
foreign_keys, lambda fk: fk not in foreign_keys_diff_resource
)

issues = _flat_map(foreign_keys, lambda fk: _check_fk_source_fields(fk, resource))
issues += _flat_map(
foreign_keys_same_resource,
lambda fk: _check_fk_dest_fields_same_resource(fk, resource),
)
issues += _flat_map(
foreign_keys_diff_resource,
lambda fk: _check_fk_dest_fields_diff_resource(fk, resource, properties),
)

return issues


def _key_fields_as_str_list(key_fields: Any) -> list[str]:
"""Returns the list representation of primary and foreign key fields.

Key fields can be represented either as a string (containing one field name)
or a list of strings.

The input should contain a correctly typed `key_fields` object.
"""
if not isinstance(key_fields, list):
key_fields = [key_fields]
return cast(list[str], key_fields)


def _get_unknown_key_fields(
key_fields: list[str], properties: dict[str, Any], resource_path: str = ""
) -> str:
"""Return the key fields that don't exist on the specified resource."""
known_fields = findall(f"{resource_path}schema.fields[*].name", properties)
unknown_fields = _filter(key_fields, lambda field: field not in known_fields)
unknown_fields = _map(unknown_fields, lambda field: f"{field!r}")
return ", ".join(unknown_fields)


def _check_fk_source_fields(
foreign_key: dict[str, Any], resource: PropertyField
) -> list[Issue]:
"""Check that foreign key source fields exist and have the correct number."""
issues = []
source_fields = resolve("/fields", foreign_key)
source_field_list = _key_fields_as_str_list(source_fields)
unknown_fields = _get_unknown_key_fields(source_field_list, resource.value)
if unknown_fields:
issues.append(
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
type="foreign-key-source-fields",
message=(
"No fields found in resource for foreign key source fields: "
f"{unknown_fields}."
),
instance=source_fields,
)
)

dest_fields = _key_fields_as_str_list(resolve("/reference/fields", foreign_key))
if len(source_field_list) != len(dest_fields):
issues.append(
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
type="foreign-key-source-fields",
message=(
"The number of foreign key source fields must be the same as "
"the number of foreign key destination fields."
),
instance=source_fields,
)
)
return issues


def _check_fk_dest_fields_same_resource(
foreign_key: dict[str, Any],
resource: PropertyField,
) -> list[Issue]:
"""Check that foreign key destination fields exist on the same resource."""
dest_fields = resolve("/reference/fields", foreign_key)
dest_field_list = _key_fields_as_str_list(dest_fields)
unknown_fields = _get_unknown_key_fields(dest_field_list, resource.value)
if not unknown_fields:
return []

return [
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
type="foreign-key-destination-fields",
message=(
"No fields found in resource for foreign key "
f"destination fields: {unknown_fields}."
),
instance=dest_fields,
)
]


def _check_fk_dest_fields_diff_resource(
foreign_key: dict[str, Any], resource: PropertyField, properties: dict[str, Any]
) -> list[Issue]:
"""Check that foreign key destination fields exist on the destination resource."""
dest_fields = resolve("/reference/fields", foreign_key)
dest_field_list = _key_fields_as_str_list(dest_fields)
# Safe, as only keys of the correct type here
dest_resource_name = cast(str, resolve("/reference/resource", foreign_key))

dest_resource_path = f"resources[?(@.name == '{dest_resource_name}')]"
if not findall(dest_resource_path, properties):
return [
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.resource",
type="foreign-key-destination-resource",
message=(
f"The destination resource {dest_resource_name!r} of this foreign "
"key doesn't exist in the package."
),
instance=dest_resource_name,
)
]

unknown_fields = _get_unknown_key_fields(
dest_field_list, properties, f"{dest_resource_path}."
)
if not unknown_fields:
return []

return [
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
type="foreign-key-destination-fields",
message=(
f"No fields found in destination resource {dest_resource_name!r} "
f"for foreign key destination fields: {unknown_fields}."
),
instance=dest_fields,
)
]


def _set_should_fields_to_required(schema: dict[str, Any]) -> dict[str, Any]:
"""Set 'SHOULD' fields to 'REQUIRED' in the schema."""
should_fields = ("name", "id", "licenses")
Expand Down
Loading