-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
kind:featureFeature RequestsFeature Requests
Description
Description
If output format is CSV, then any "dict" type object returned from a database, for example a Postgres JSON column, is not dumped to a string and is kept as a dict object.
Use case/motivation
Currently if export_format is CSV and data column in Postgres is defined as json or jsonb data type, the param stringify_dict in abstract method convert_type has been hardcoded to False, which results in param stringify_dict in subclass cannot be customised, for instance in subclass PostgresToGCSOperator.
Function convert_types in base class BaseSQLToGCSOperator:
def convert_types(self, schema, col_type_dict, row, stringify_dict=False) -> list:
"""Convert values from DBAPI to output-friendly formats."""
return [
self.convert_type(value, col_type_dict.get(name), stringify_dict=stringify_dict)
for name, value in zip(schema, row)
]
Function convert_type in subclass PostgresToGCSOperator:
def convert_type(self, value, schema_type, stringify_dict=True):
"""
Takes a value from Postgres, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Timezone aware Datetime are converted to UTC seconds.
Unaware Datetime, Date and Time are converted to ISO formatted strings.
Decimals are converted to floats.
:param value: Postgres column value.
:param schema_type: BigQuery data type.
:param stringify_dict: Specify whether to convert dict to string.
"""
if isinstance(value, datetime.datetime):
iso_format_value = value.isoformat()
if value.tzinfo is None:
return iso_format_value
return pendulum.parse(iso_format_value).float_timestamp
if isinstance(value, datetime.date):
return value.isoformat()
if isinstance(value, datetime.time):
formatted_time = time.strptime(str(value), "%H:%M:%S")
time_delta = datetime.timedelta(
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
)
return str(time_delta)
if stringify_dict and isinstance(value, dict):
return json.dumps(value)
if isinstance(value, Decimal):
return float(value)
return value
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
kind:featureFeature RequestsFeature Requests