Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ingestion/src/metadata/readers/dataframe/dsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def read_from_pandas(
chunksize=CHUNKSIZE,
storage_options=storage_options,
compression=compression,
encoding_errors="ignore",
) as reader:
for chunks in reader:
chunk_list.append(chunks)
Expand Down
18 changes: 15 additions & 3 deletions ingestion/src/metadata/readers/dataframe/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,12 @@ def _(self, _: GCSConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
)
return dataframe_to_chunks(dataframe_response)

except Exception:
except Exception as exc:
# Fallback to regular reading if size check fails
logger.warning(
f"Error reading parquet file from GCS '{file_path}': {exc}. "
f"Falling back to regular reading"
)
file = gcs.open(file_path)
parquet_file = ParquetFile(file)
dataframe_response = parquet_file.read().to_pandas(
Expand Down Expand Up @@ -275,8 +279,12 @@ def _(self, _: AzureConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper
)
return dataframe_to_chunks(dataframe)

except Exception:
except Exception as exc:
# Fallback to regular pandas reading if size check or batching fails
logger.warning(
f"Error reading parquet file from Azure '{account_url}': {exc}. "
f"Falling back to pandas reading"
)
dataframe = pd.read_parquet(account_url, storage_options=storage_options)
return dataframe_to_chunks(dataframe)

Expand Down Expand Up @@ -305,8 +313,12 @@ def _(
dataframe = pd.read_parquet(key)
return dataframe_to_chunks(dataframe)

except Exception:
except Exception as exc:
# Fallback to regular pandas reading if size check fails
logger.warning(
f"Error reading parquet file from local path '{key}': {exc}. "
f"Falling back to pandas reading"
)
dataframe = pd.read_parquet(key)
return dataframe_to_chunks(dataframe)

Expand Down
32 changes: 26 additions & 6 deletions ingestion/src/metadata/utils/datalake/datalake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def fetch_dataframe(
f"[{config_source.__class__.__name__}] due to: [{err}]"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error fetching file [{bucket_name}/{key}] using [{config_source.__class__.__name__}] due to: [{err}]"
)
Expand Down Expand Up @@ -300,8 +301,12 @@ def fetch_col_types(cls, data_frame, column_name):
data_type = max(parsed_object_datatype_list)
# Determine the data type of the parsed object

except (ValueError, SyntaxError):
except (ValueError, SyntaxError) as exc:
# Handle any exceptions that may occur
logger.debug(
f"ValueError/SyntaxError while parsing column '{column_name}' datatype: {exc}. "
f"Falling back to string."
)
data_type = "string"

data_type = cls._data_formats.get(
Expand Down Expand Up @@ -378,10 +383,13 @@ def get_children(cls, json_column) -> List[Dict]:
json_column = cast(Series, json_column)
try:
json_column = json_column.apply(json.loads)
except TypeError:
except TypeError as exc:
# if values are not strings, we will assume they are already json objects
# based on the read class logic
pass
logger.debug(
f"TypeError while parsing JSON column children: {exc}. "
f"Assuming values are already JSON objects."
)
json_structure = cls.unique_json_structure(json_column.values.tolist())

return cls.construct_json_column_children(json_structure)
Expand Down Expand Up @@ -453,8 +461,12 @@ def get_columns(self):
try:
item_field = column.type.value_field
parsed_column["arrayDataType"] = self._get_pq_data_type(item_field)
except AttributeError:
except AttributeError as exc:
# if the value field is not specified, we will set it to UNKNOWN
logger.debug(
f"Could not extract array item type for column '{column.name}': {exc}. "
f"Setting arrayDataType to UNKNOWN."
)
parsed_column["arrayDataType"] = DataType.UNKNOWN

if parsed_column["dataType"] == DataType.BINARY:
Expand Down Expand Up @@ -592,8 +604,12 @@ def _parse_iceberg_delta_schema(self, data: dict) -> List[Column]:
if isinstance(type_str, str)
else DataType.STRING
)
except (ValueError, AttributeError):
except (ValueError, AttributeError) as exc:
# If the type is not recognized, default to STRING
logger.debug(
f"Unrecognized data type '{type_str}' for column '{column_name}': {exc}. "
f"Defaulting to STRING."
)
data_type = DataType.STRING

column = Column(
Expand Down Expand Up @@ -644,7 +660,11 @@ def _parse_struct_fields(self, fields: list) -> List[dict]:
if isinstance(type_str, str)
else DataType.STRING
)
except (ValueError, AttributeError):
except (ValueError, AttributeError) as exc:
logger.debug(
f"Unrecognized data type '{type_str}' for nested field '{child_name}': {exc}. "
f"Defaulting to STRING."
)
data_type = DataType.STRING

child = {
Expand Down
Loading