Skip to content

SQLToGCSOperators Add Support for Dumping Json value in CSV #26875

@sleepy-tiger

Description

@sleepy-tiger

Description

If output format is CSV, then any "dict" type object returned from a database, for example a Postgres JSON column, is not dumped to a string and is kept as a dict object.

Use case/motivation

Currently if export_format is CSV and data column in Postgres is defined as json or jsonb data type, the param stringify_dict in abstract method convert_type has been hardcoded to False, which results in param stringify_dict in subclass cannot be customised, for instance in subclass PostgresToGCSOperator.

Function convert_types in base class BaseSQLToGCSOperator:

def convert_types(self, schema, col_type_dict, row, stringify_dict=False) -> list:
        """Convert values from DBAPI to output-friendly formats."""
        return [
            self.convert_type(value, col_type_dict.get(name), stringify_dict=stringify_dict)
            for name, value in zip(schema, row)
        ]

Function convert_type in subclass PostgresToGCSOperator:

def convert_type(self, value, schema_type, stringify_dict=True):
        """
        Takes a value from Postgres, and converts it to a value that's safe for
        JSON/Google Cloud Storage/BigQuery.
        Timezone aware Datetime are converted to UTC seconds.
        Unaware Datetime, Date and Time are converted to ISO formatted strings.
        Decimals are converted to floats.

        :param value: Postgres column value.
        :param schema_type: BigQuery data type.
        :param stringify_dict: Specify whether to convert dict to string.
        """
        if isinstance(value, datetime.datetime):
            iso_format_value = value.isoformat()
            if value.tzinfo is None:
                return iso_format_value
            return pendulum.parse(iso_format_value).float_timestamp
        if isinstance(value, datetime.date):
            return value.isoformat()
        if isinstance(value, datetime.time):
            formatted_time = time.strptime(str(value), "%H:%M:%S")
            time_delta = datetime.timedelta(
                hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
            )
            return str(time_delta)
        if stringify_dict and isinstance(value, dict):
            return json.dumps(value)
        if isinstance(value, Decimal):
            return float(value)
        return value

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions