Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add sql-datatype to the SDK discovery and catalog #1872

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
16 changes: 14 additions & 2 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ class StreamMetadata(Metadata):
schema_name: str | None = None


@dataclass
class SqlMetadata(Metadata):
sql_datatype: str | None = None


AnyMetadata: TypeAlias = t.Union[Metadata, StreamMetadata]


Expand Down Expand Up @@ -167,6 +172,7 @@ def get_standard_metadata(
valid_replication_keys: list[str] | None = None,
replication_method: str | None = None,
selected_by_default: bool | None = None,
sql_datatypes: dict | None = None,
) -> MetadataMapping:
"""Get default metadata for a stream.

Expand All @@ -177,6 +183,7 @@ def get_standard_metadata(
valid_replication_keys: Stream valid replication keys.
replication_method: Stream replication method.
selected_by_default: Whether the stream is selected by default.
sql_datatypes: SQL datatypes present in the stream.

Returns:
Metadata mapping.
Expand All @@ -196,14 +203,19 @@ def get_standard_metadata(
root.schema_name = schema_name

for field_name in schema.get("properties", {}):
if sql_datatypes is None:
entry = Metadata()
else:
entry = SqlMetadata(sql_datatype=sql_datatypes.get(field_name))

if (
key_properties
and field_name in key_properties
or (valid_replication_keys and field_name in valid_replication_keys)
):
entry = Metadata(inclusion=Metadata.InclusionType.AUTOMATIC)
entry.inclusion = Metadata.InclusionType.AUTOMATIC
else:
entry = Metadata(inclusion=Metadata.InclusionType.AVAILABLE)
entry.inclusion = Metadata.InclusionType.AVAILABLE

mapping[("properties", field_name)] = entry

Expand Down
67 changes: 54 additions & 13 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,33 @@
view_names = []
return [(t, False) for t in table_names] + [(v, True) for v in view_names]

def discover_catalog_entry_sql_datatype(
self,
data_type: sqlalchemy.types.TypeEngine,
) -> str:
"""Retrun SQL Datatype as a string to utilize in the catalog.

Args:
data_type: given data type as sqlalchemy.types.TypeEngine

Returns:
A string description the given data type example "VARCHAR(length=15)".
"""
datatype_attributes = ("length", "scale", "precision")

catalog_format = f"{type(data_type).__name__}("

for attribute in datatype_attributes:
if hasattr(data_type, attribute) and getattr(data_type, attribute):
catalog_format += f"{attribute}={(getattr(data_type, attribute))}, "

if catalog_format.endswith(", "):
catalog_format = catalog_format[:-2]

catalog_format += ")"

return catalog_format

# TODO maybe should be splitted into smaller parts?
def discover_catalog_entry(
self,
Expand Down Expand Up @@ -441,21 +468,34 @@

# Initialize columns list
table_schema = th.PropertiesList()
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(
t.cast(sqlalchemy.types.TypeEngine, column_def["type"]),
)
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
with warnings.catch_warnings(record=True) as inspection_warnings:
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(
t.cast(sqlalchemy.types.TypeEngine, column_def["type"]),
)
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
if len(inspection_warnings) > 0:
for line in inspection_warnings:
expanded_msg: str = (

Check warning on line 487 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L487

Added line #L487 was not covered by tests
Comment on lines +471 to +487
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this bits, wdyt about using the builtin logging.captureWarnings?

We'd probably want to call it whenever a plugin's CLI is invoked, so in the body of

@classmethod
def invoke(
cls,
*,
about: bool = False,
about_format: str | None = None,
**kwargs: t.Any, # noqa: ARG003
) -> None:
"""Invoke the plugin.
Args:
about: Display package metadata and settings.
about_format: Specify output style for `--about`.
kwargs: Plugin keyword arguments.
"""
if about:
cls.print_about(about_format)
sys.exit(0)

f"Discovery warning: {line.message} in '{unique_stream_id}'"
)
self.logger.info(expanded_msg)

Check warning on line 490 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L490

Added line #L490 was not covered by tests
schema = table_schema.to_dict()

sql_datatypes = {}
for column_def in inspected.get_columns(table_name, schema=schema_name):
sql_datatypes[
str(column_def["name"])
] = self.discover_catalog_entry_sql_datatype(column_def["type"])
Comment on lines +494 to +497
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you feel comfortable adding tests for this? A good place might be in tests/core/test_connector_sql.py since this is a connector method. I think it could even be parametrized with many examples of column types.


# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
Expand All @@ -480,6 +520,7 @@
replication_method=replication_method,
key_properties=key_properties,
valid_replication_keys=None, # Must be defined by user
sql_datatypes=sql_datatypes,
),
database=None, # Expects single-database context
row_count=None,
Expand Down