From 95d8b820d3f2ba3dd957d979c3ad4a158cec44ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Thu, 26 Sep 2024 14:55:53 -0600 Subject: [PATCH] Implement schema discovery --- samples/sample_tap_csv/client.py | 15 +++++ singer_sdk/contrib/filesystem/stream.py | 73 +++++++++++++++++++------ singer_sdk/contrib/filesystem/tap.py | 26 +++------ tests/samples/test_tap_csv.py | 12 +--- 4 files changed, 79 insertions(+), 47 deletions(-) diff --git a/samples/sample_tap_csv/client.py b/samples/sample_tap_csv/client.py index 296558a4f..b0ff13923 100644 --- a/samples/sample_tap_csv/client.py +++ b/samples/sample_tap_csv/client.py @@ -12,6 +12,21 @@ class CSVStream(FileStream): """CSV stream class.""" + def get_schema(self, path: str) -> dict[str, t.Any]: + with self.filesystem.open(path, mode="r") as file: + reader = csv.DictReader( + file, + delimiter=self.config["delimiter"], + quotechar=self.config["quotechar"], + escapechar=self.config.get("escapechar"), + doublequote=self.config["doublequote"], + lineterminator=self.config["lineterminator"], + ) + return { + "type": "object", + "properties": {key: {"type": "string"} for key in reader.fieldnames}, + } + def read_file(self, path: str) -> t.Iterable[Record]: with self.filesystem.open(path, mode="r") as file: reader = csv.DictReader( diff --git a/singer_sdk/contrib/filesystem/stream.py b/singer_sdk/contrib/filesystem/stream.py index 2d2d0b618..4fba49376 100644 --- a/singer_sdk/contrib/filesystem/stream.py +++ b/singer_sdk/contrib/filesystem/stream.py @@ -3,9 +3,12 @@ from __future__ import annotations import abc +import enum +import functools import typing as t from singer_sdk import Stream +from singer_sdk.exceptions import ConfigValidationError from singer_sdk.helpers._util import utc_now from singer_sdk.streams.core import REPLICATION_INCREMENTAL @@ -17,22 +20,23 @@ from singer_sdk.helpers.types import Context, Record from singer_sdk.tap_base import Tap - SDC_META_FILEPATH = "_sdc_path" SDC_META_MODIFIED_AT = "_sdc_modified_at" +class ReadMode(str, enum.Enum): + """Sync mode for the tap.""" + + one_stream_per_file = "one_stream_per_file" + merge = "merge" + + class FileStream(Stream, metaclass=abc.ABCMeta): """Abstract base class for file streams.""" - BASE_SCHEMA: t.ClassVar[dict[str, t.Any]] = { - "type": ["object"], - "properties": { - SDC_META_FILEPATH: {"type": "string"}, - SDC_META_MODIFIED_AT: {"type": ["string", "null"], "format": "date-time"}, - }, - "required": [], - "additionalProperties": {"type": "string"}, + SDC_PROPERTIES: t.ClassVar[dict[str, dict]] = { + SDC_META_FILEPATH: {"type": "string"}, + SDC_META_MODIFIED_AT: {"type": ["string", "null"], "format": "date-time"}, } def __init__( @@ -40,34 +44,58 @@ def __init__( tap: Tap, name: str, *, + filepaths: t.Sequence[str], filesystem: fsspec.AbstractFileSystem, - partitions: list[dict[str, t.Any]] | None = None, ) -> None: """Create a new FileStream instance. Args: tap: The tap for this stream. name: The name of the stream. + filepaths: List of file paths to read. filesystem: The filesystem implementation object to use. - partitions: List of partitions for this stream. + mode: The read mode for the stream. + + Raises: + ConfigValidationError: If no file paths are provided. """ - # TODO(edgarmondragon): Build schema from file. - super().__init__(tap, self.BASE_SCHEMA, name) + if not filepaths: + msg = "Configuration error" + raise ConfigValidationError(msg, errors=["No file paths provided"]) + + self._filepaths = filepaths + self.filesystem = filesystem + + super().__init__(tap, schema=None, name=name) # TODO(edgarrmondragon): Make this None if the filesytem does not support it. self.replication_key = SDC_META_MODIFIED_AT - self.filesystem = filesystem self._sync_start_time = utc_now() - self._partitions = partitions or [] + self._partitions = [{SDC_META_FILEPATH: path} for path in self._filepaths] @property def partitions(self) -> list[dict[str, t.Any]]: """Return the list of partitions for this stream.""" return self._partitions - @abc.abstractmethod - def read_file(self, path: str) -> t.Iterable[Record]: - """Return a generator of records from the file.""" + def _get_full_schema(self) -> dict[str, t.Any]: + """Return the full schema for the stream. + + Args: + context: Stream partition or context dictionary. + + Returns: + The full schema for the stream. + """ + path: str = self._filepaths[0] + schema = self.get_schema(path) + schema["properties"].update(self.SDC_PROPERTIES) + return schema + + @functools.cached_property + def schema(self) -> dict[str, t.Any]: + """Return the schema for the stream.""" + return self._get_full_schema() def get_records( self, @@ -109,4 +137,13 @@ def get_records( for record in self.read_file(path): record[SDC_META_MODIFIED_AT] = mtime or self._sync_start_time + record[SDC_META_FILEPATH] = path yield record + + @abc.abstractmethod + def read_file(self, path: str) -> t.Iterable[Record]: + """Return a generator of records from the file.""" + + @abc.abstractmethod + def get_schema(self, path: str) -> dict[str, t.Any]: + """Return the schema for the file.""" diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index b0c25647e..19d14c211 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -2,7 +2,6 @@ from __future__ import annotations -import enum import functools import os import typing as t @@ -12,18 +11,10 @@ import singer_sdk.typing as th from singer_sdk import Tap -from singer_sdk.contrib.filesystem.stream import SDC_META_FILEPATH, FileStream +from singer_sdk.contrib.filesystem.stream import FileStream, ReadMode DEFAULT_MERGE_STREAM_NAME = "files" - -class ReadMode(str, enum.Enum): - """Sync mode for the tap.""" - - one_stream_per_file = "one_stream_per_file" - merge = "merge" - - BASE_CONFIG_SCHEMA = th.PropertiesList( th.Property( "filesystem", @@ -144,26 +135,23 @@ def discover_streams(self) -> list: self.default_stream_class( tap=self, name=file_path_to_stream_name(member), + filepaths=[os.path.join(path, member)], # noqa: PTH118 filesystem=self.fs, - partitions=[{SDC_META_FILEPATH: os.path.join(path, member)}], # noqa: PTH118 ) for member in os.listdir(path) if member.endswith(self.valid_extensions) ] # Merge - contexts = [ - { - SDC_META_FILEPATH: os.path.join(path, member), # noqa: PTH118 - } - for member in os.listdir(path) - if member.endswith(self.valid_extensions) - ] return [ self.default_stream_class( tap=self, name=self.config["stream_name"], + filepaths=[ + os.path.join(path, member) # noqa: PTH118 + for member in os.listdir(path) + if member.endswith(self.valid_extensions) + ], filesystem=self.fs, - partitions=contexts, ) ] diff --git a/tests/samples/test_tap_csv.py b/tests/samples/test_tap_csv.py index b75bb5609..cb16e0e0e 100644 --- a/tests/samples/test_tap_csv.py +++ b/tests/samples/test_tap_csv.py @@ -19,9 +19,7 @@ class TestCSVMerge(_TestCSVMerge): - @pytest.mark.xfail(reason="Schema generation not implemented", strict=True) - def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str): - super().test_tap_stream_record_schema_matches_transformed_catalog(stream) + pass _TestCSVOneStreamPerFile = get_tap_test_class( @@ -35,9 +33,7 @@ def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str) class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): - @pytest.mark.xfail(reason="Schema generation not implemented", strict=True) - def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str): - super().test_tap_stream_record_schema_matches_transformed_catalog(stream) + pass # Three days into the future. @@ -80,10 +76,6 @@ def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str) class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental): - @pytest.mark.xfail(reason="Schema generation not implemented", strict=True) - def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str): - super().test_tap_stream_record_schema_matches_transformed_catalog(stream) - @pytest.mark.xfail(reason="No records are extracted", strict=True) def test_tap_stream_transformed_catalog_schema_matches_record(self, stream: str): super().test_tap_stream_transformed_catalog_schema_matches_record(stream)