-
-
Notifications
You must be signed in to change notification settings - Fork 309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancement: drop invalid rows on validate with new param #1189
Changes from 30 commits
72b3eed
40f851f
e18aa6c
de6e211
a9c8a40
2e210fa
b626cb8
212fdff
096afbb
5acb3dd
8b709de
e66abbd
91e6250
c2b6e6e
5905b19
f86f279
5efc041
87bce7c
fa24980
039fd1c
7686b07
478fc5e
bf80ef2
1458f6b
b5de710
5935b32
0b2f6fb
3180d31
95c4413
2140396
0a304e9
39072ff
94394f9
1f14cca
abc0324
75b3cc7
9dfba4e
e721458
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,42 @@ def validate( | |
except SchemaError as exc: | ||
error_handler.collect_error(exc.reason_code, exc) | ||
|
||
# run the core checks | ||
error_handler = self.run_checks_and_handle_errors( | ||
error_handler, | ||
schema, | ||
check_obj, | ||
head, | ||
tail, | ||
sample, | ||
random_state, | ||
) | ||
|
||
if lazy and error_handler.collected_errors: | ||
if hasattr(schema, "drop_invalid") and schema.drop_invalid: | ||
check_obj = self.drop_invalid_data(check_obj, error_handler) | ||
return check_obj | ||
else: | ||
raise SchemaErrors( | ||
schema=schema, | ||
schema_errors=error_handler.collected_errors, | ||
data=check_obj, | ||
) | ||
|
||
return check_obj | ||
|
||
def run_checks_and_handle_errors( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cosmicBboy this method could be moved into the parent class to remove the duplication, but I'm not sure this would be the right move. They are quite different implementations, and don't want to abstract it to the parent for some vain DRYness 😅 edit: I will move |
||
self, | ||
error_handler, | ||
schema, | ||
check_obj, | ||
head, | ||
tail, | ||
sample, | ||
random_state, | ||
): | ||
"""Run checks on schema""" | ||
# pylint: disable=too-many-locals | ||
field_obj_subsample = self.subsample( | ||
check_obj if is_field(check_obj) else check_obj[schema.name], | ||
head, | ||
|
@@ -71,14 +107,15 @@ def validate( | |
random_state, | ||
) | ||
|
||
# run the core checks | ||
for core_check, args in ( | ||
core_checks = [ | ||
(self.check_name, (field_obj_subsample, schema)), | ||
(self.check_nullable, (field_obj_subsample, schema)), | ||
(self.check_unique, (field_obj_subsample, schema)), | ||
(self.check_dtype, (field_obj_subsample, schema)), | ||
(self.run_checks, (check_obj_subsample, schema)), | ||
): | ||
] | ||
|
||
for core_check, args in core_checks: | ||
results = core_check(*args) | ||
if isinstance(results, CoreCheckResult): | ||
results = [results] | ||
|
@@ -106,13 +143,7 @@ def validate( | |
original_exc=result.original_exc, | ||
) | ||
|
||
if lazy and error_handler.collected_errors: | ||
raise SchemaErrors( | ||
schema=schema, | ||
schema_errors=error_handler.collected_errors, | ||
data=check_obj, | ||
) | ||
return check_obj | ||
return error_handler | ||
|
||
def coerce_dtype( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
"""Backend implementation for pandas schema components.""" | ||
# pylint: disable=too-many-locals | ||
|
||
import traceback | ||
from copy import copy, deepcopy | ||
|
@@ -51,10 +52,10 @@ def validate( | |
"method.", | ||
) | ||
|
||
def validate_column(check_obj, column_name): | ||
def validate_column(check_obj, column_name, return_check_obj=False): | ||
try: | ||
# pylint: disable=super-with-arguments | ||
super(ColumnBackend, self).validate( | ||
validated_check_obj = super(ColumnBackend, self).validate( | ||
check_obj, | ||
copy(schema).set_name(column_name), | ||
head=head, | ||
|
@@ -64,6 +65,10 @@ def validate_column(check_obj, column_name): | |
lazy=lazy, | ||
inplace=inplace, | ||
) | ||
|
||
if return_check_obj: | ||
return validated_check_obj | ||
|
||
except SchemaErrors as err: | ||
for err in err.schema_errors: | ||
error_handler.collect_error( | ||
|
@@ -95,14 +100,24 @@ def validate_column(check_obj, column_name): | |
check_obj[column_name].iloc[:, [i]], column_name | ||
) | ||
else: | ||
validate_column(check_obj, column_name) | ||
if hasattr(schema, "drop_invalid") and schema.drop_invalid: | ||
# replace the check_obj with the validated check_obj | ||
check_obj = validate_column( | ||
check_obj, column_name, return_check_obj=True | ||
) | ||
else: | ||
validate_column(check_obj, column_name) | ||
|
||
if lazy and error_handler.collected_errors: | ||
raise SchemaErrors( | ||
schema=schema, | ||
schema_errors=error_handler.collected_errors, | ||
data=check_obj, | ||
) | ||
if hasattr(schema, "drop_invalid") and schema.drop_invalid: | ||
check_obj = self.drop_invalid_data(check_obj, error_handler) | ||
return check_obj | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. weird, it seems like the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The hasattr? That was just some defensive code as there was a recent issue with the new default attr not being available on pickled schemas. Can drop if you think it is overkill There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, it's okay to have that in there, it's just that codecov is complaining that this part of the code wasn't executed, meaning that lines 113 weren't executed during CI. But it seems like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah no it doesn't appear so locally! Codecov saving us 😆 This was my bad. So this code in this file: if is_table(check_obj[column_name]):
for i in range(check_obj[column_name].shape[1]):
validate_column(
check_obj[column_name].iloc[:, [i]], column_name
)
else:
if hasattr(schema, "drop_invalid") and schema.drop_invalid:
# replace the check_obj with the validated check_obj
check_obj = validate_column(
check_obj, column_name, return_check_obj=True
)
else:
validate_column(check_obj, column_name)
if lazy and error_handler.collected_errors:
if hasattr(schema, "drop_invalid") and schema.drop_invalid: # the line in question!
check_obj = self.drop_invalid_data(check_obj, error_handler)
return check_obj
else:
raise SchemaErrors(
schema=schema,
schema_errors=error_handler.collected_errors,
data=check_obj,
) Raises an error on I've removed these lines and pushed the changes👌 |
||
else: | ||
raise SchemaErrors( | ||
schema=schema, | ||
schema_errors=error_handler.collected_errors, | ||
data=check_obj, | ||
) | ||
|
||
return check_obj | ||
|
||
|
@@ -381,16 +396,8 @@ def validate( | |
otherwise creates a copy of the data. | ||
:returns: validated DataFrame or Series. | ||
""" | ||
# pylint: disable=too-many-locals | ||
if schema.coerce: | ||
try: | ||
check_obj.index = self.coerce_dtype( | ||
check_obj.index, schema=schema # type: ignore [arg-type] | ||
) | ||
except SchemaErrors as err: | ||
if lazy: | ||
raise | ||
raise err.schema_errors[0] from err | ||
check_obj.index = self.__coerce_index(check_obj, schema, lazy) | ||
|
||
# Prevent data type coercion when the validate method is called because | ||
# it leads to some weird behavior when calling coerce_dtype within the | ||
|
@@ -419,32 +426,9 @@ def validate( | |
): | ||
columns[name] = column.set_name(name) | ||
schema_copy.columns = columns | ||
|
||
def to_dataframe(multiindex): | ||
""" | ||
Emulate the behavior of pandas.MultiIndex.to_frame, but preserve | ||
duplicate index names if they exist. | ||
""" | ||
# NOTE: this is a hack to support pyspark.pandas | ||
if type(multiindex).__module__.startswith("pyspark.pandas"): | ||
df = multiindex.to_frame() | ||
else: | ||
df = pd.DataFrame( | ||
{ | ||
i: multiindex.get_level_values(i) | ||
for i in range(multiindex.nlevels) | ||
} | ||
) | ||
df.columns = [ | ||
i if name is None else name | ||
for i, name in enumerate(multiindex.names) | ||
] | ||
df.index = multiindex | ||
return df | ||
|
||
try: | ||
validation_result = super().validate( | ||
to_dataframe(check_obj.index), | ||
self.__to_dataframe(check_obj.index), | ||
schema_copy, | ||
head=head, | ||
tail=tail, | ||
|
@@ -480,3 +464,36 @@ def to_dataframe(multiindex): | |
|
||
assert is_table(validation_result) | ||
return check_obj | ||
|
||
def __to_dataframe(self, multiindex): | ||
""" | ||
Emulate the behavior of pandas.MultiIndex.to_frame, but preserve | ||
duplicate index names if they exist. | ||
""" | ||
# NOTE: this is a hack to support pyspark.pandas | ||
if type(multiindex).__module__.startswith("pyspark.pandas"): | ||
df = multiindex.to_frame() | ||
else: | ||
df = pd.DataFrame( | ||
{ | ||
i: multiindex.get_level_values(i) | ||
for i in range(multiindex.nlevels) | ||
} | ||
) | ||
df.columns = [ | ||
i if name is None else name | ||
for i, name in enumerate(multiindex.names) | ||
] | ||
df.index = multiindex | ||
return df | ||
|
||
def __coerce_index(self, check_obj, schema, lazy): | ||
"""Coerce index""" | ||
try: | ||
return self.coerce_dtype( | ||
check_obj.index, schema=schema # type: ignore [arg-type] | ||
) | ||
except SchemaErrors as err: | ||
if lazy: | ||
raise | ||
raise err.schema_errors[0] from err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To guard against a repeat of #1188