-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: Object-oriented JSON-schema loading
- Loading branch information
Showing
30 changed files
with
776 additions
and
328 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
"""Working with JSON schemas.""" | ||
|
||
from . import from_file as from_file # noqa |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
"""Loading Schema from JSON files.""" | ||
|
||
from abc import ABC, abstractmethod | ||
from typing import Any, Dict | ||
|
||
Schema = Dict[str, Any] | ||
|
||
|
||
class SchemaCollection(ABC): | ||
@abstractmethod | ||
def get_schema_by_name(self, name: str) -> Schema: | ||
... | ||
|
||
@abstractmethod | ||
def get_schemas(self) -> Dict[str, Schema]: | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from . import Schema as Schema # noqa | ||
from .facade import Facade as Facade # noqa | ||
from .loader import Loader as Loader # noqa | ||
from .loader import LoaderError as LoaderError # noqa | ||
from .resolver import Resolver as Resolver # noqa |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from dataclasses import dataclass | ||
|
||
from . import Schema, SchemaCollection | ||
|
||
|
||
@dataclass | ||
class Facade: | ||
collection: SchemaCollection | ||
|
||
def campaign(self) -> Schema: | ||
return self.collection.get_schema_by_name("Campaign.json") | ||
|
||
def campaign_flat(self) -> Schema: | ||
return self.collection.get_schema_by_name("Campaign_Flat.json") | ||
|
||
def campaign_level_reports(self) -> Schema: | ||
return self.collection.get_schema_by_name("Row.json") | ||
|
||
def campaign_level_reports_extended_spend_row(self) -> Schema: | ||
return self.collection.get_schema_by_name("ExtendedSpendRow_campaignId.json") | ||
|
||
def campaign_level_reports_extended_spend_row_flat(self) -> Schema: | ||
return self.collection.get_schema_by_name( | ||
"ExtendedSpendRow_campaignId_Flat.json" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import json | ||
from dataclasses import InitVar, dataclass, field | ||
from pathlib import Path | ||
from typing import Dict, Union | ||
|
||
from . import Schema, SchemaCollection | ||
|
||
JSON = [".json"] | ||
|
||
|
||
@dataclass | ||
class Loader(SchemaCollection): | ||
path: Path = field(init=False) | ||
_schemas: Dict[str, Schema] = field(init=False, default_factory=dict) | ||
|
||
schemas_directory: InitVar[Union[str, Path]] | ||
|
||
def __post_init__(self, schemas_directory: Union[str, Path]) -> None: | ||
if isinstance(schemas_directory, str): | ||
schemas_directory = Path(schemas_directory) | ||
|
||
self.path = schemas_directory | ||
|
||
def get_schema_by_name(self, name: str) -> Schema: | ||
return self.schemas[name] | ||
|
||
@property | ||
def schemas(self) -> Dict[str, Schema]: | ||
if not self._schemas: | ||
self._schemas = load_json_files(self.path) | ||
|
||
return self._schemas | ||
|
||
def get_schemas(self) -> Dict[str, Schema]: | ||
return self.schemas | ||
|
||
|
||
def load_json_files(directory: Path) -> Dict[str, Schema]: | ||
if not directory.exists(): | ||
raise LoaderError("path {} does not exist".format(directory)) | ||
|
||
if not directory.is_dir(): | ||
raise LoaderError("path {} is not a directory".format(directory)) | ||
|
||
json_files: Dict[str, Schema] = {} | ||
for file in directory.iterdir(): | ||
if not file.is_file(): | ||
continue | ||
|
||
if file.suffix not in JSON: | ||
continue | ||
|
||
with open(file) as stream: | ||
json_files[file.name] = json.load(stream) | ||
|
||
if not json_files: | ||
raise LoaderError( | ||
"directory {} does not contain any JSON files".format(directory) | ||
) | ||
|
||
return json_files | ||
|
||
|
||
class LoaderError(Exception): | ||
"""Schema loading failed""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, Optional | ||
|
||
from singer.transform import RefResolver | ||
from singer.transform import ( | ||
_resolve_schema_references as singer_resolve_schema_references, | ||
) | ||
|
||
from . import Schema, SchemaCollection | ||
|
||
DEFS = "$defs" | ||
|
||
|
||
@dataclass | ||
class Resolver(SchemaCollection): | ||
collection: SchemaCollection | ||
|
||
def get_schema_by_name(self, name: str) -> Schema: | ||
schema = self.collection.get_schema_by_name(name) | ||
return self.resolve(schema) | ||
|
||
def resolve(self, schema: Schema) -> Schema: | ||
schema = resolve_schema_references(schema, self.parent_schemas) | ||
schema.pop(DEFS, None) | ||
return schema | ||
|
||
@property | ||
def parent_schemas(self) -> Dict[str, Schema]: | ||
return self.collection.get_schemas() | ||
|
||
def get_schemas(self) -> Dict[str, Schema]: | ||
resolved: Dict[str, Schema] = {} | ||
|
||
for key, schema in self.parent_schemas.items(): | ||
resolved[key] = self.resolve(schema) | ||
|
||
return resolved | ||
|
||
|
||
def resolve_schema_references( | ||
schema: Schema, refs: Optional[Dict[str, Schema]] = None | ||
) -> Dict[str, Any]: | ||
"""resolve_schema_references is a re-implementation of the same function from | ||
singer.transform. It allows resolution of "allOf" schema element. "allOf" element | ||
is missing from provided implementation for reasons unknown. | ||
""" | ||
|
||
refs = refs or {} | ||
return _resolve_schema_references(schema, RefResolver("", schema, store=refs)) | ||
|
||
|
||
def _resolve_schema_references(schema: Schema, resolver: RefResolver) -> Schema: | ||
if "allOf" in schema: | ||
for i, element in enumerate(schema["allOf"]): | ||
schema["allOf"][i] = _resolve_schema_references(element, resolver) | ||
|
||
return singer_resolve_schema_references(schema, resolver) |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.