Skip to content

Commit

Permalink
[SPARK-46975][PS] Support dedicated fallback methods
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Support dedicated fallback methods

### Why are the changes needed?
apply this protocol: `new_key = f"_{key}_fallback"`, so that we can switch to a separate fallback implementation for a missing method if needed

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#45026 from zhengruifeng/mv_to_fallback.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Xinrong Meng <xinrong@apache.org>
  • Loading branch information
zhengruifeng authored and xinrong-meng committed Feb 23, 2024
1 parent 06c741a commit d20650b
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13567,15 +13567,14 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da

return psdf

def _fall_back_frame(self, method: str) -> Callable:
def _internal_fall_back_function(*inputs: Any, **kwargs: Any) -> "DataFrame":
def _build_fallback_method(self, method: str) -> Callable:
def _internal_fallback_function(*args: Any, **kwargs: Any) -> "DataFrame":
log_advice(
f"`{method}` is executed in fallback mode. It loads partial data into the "
f"driver's memory to infer the schema, and loads all data into one executor's "
f"memory to compute. It should only be used if the pandas DataFrame is expected "
f"to be small."
)

input_df = self.copy()
index_names = input_df.index.names

Expand All @@ -13595,7 +13594,7 @@ def compute_function(pdf: pd.DataFrame): # type: ignore[no-untyped-def]
pdf = pdf.drop(columns=[tmp_agg_column_name])
pdf = pdf.set_index(tmp_idx_column_name, drop=True)
pdf = pdf.sort_index()
pdf = getattr(pdf, method)(*inputs, **kwargs)
pdf = getattr(pdf, method)(*args, **kwargs)
pdf[tmp_idx_column_name] = pdf.index
return pdf.reset_index(drop=True)

Expand All @@ -13605,20 +13604,44 @@ def compute_function(pdf: pd.DataFrame): # type: ignore[no-untyped-def]

return output_df

return _internal_fall_back_function
return _internal_fallback_function

def _asfreq_fallback(self, *args: Any, **kwargs: Any) -> "DataFrame":
_f = self._build_fallback_method("asfreq")
return _f(*args, **kwargs)

def _asof_fallback(self, *args: Any, **kwargs: Any) -> "DataFrame":
_f = self._build_fallback_method("asof")
return _f(*args, **kwargs)

def _convert_dtypes_fallback(self, *args: Any, **kwargs: Any) -> "DataFrame":
_f = self._build_fallback_method("convert_dtypes")
return _f(*args, **kwargs)

def _infer_objects_fallback(self, *args: Any, **kwargs: Any) -> "DataFrame":
_f = self._build_fallback_method("infer_objects")
return _f(*args, **kwargs)

def _set_axis_fallback(self, *args: Any, **kwargs: Any) -> "DataFrame":
_f = self._build_fallback_method("set_axis")
return _f(*args, **kwargs)

def _to_feather_fallback(self, *args: Any, **kwargs: Any) -> None:
_f = self._build_fallback_driver_method("to_feather")
return _f(*args, **kwargs)

def _to_stata_fallback(self, *args: Any, **kwargs: Any) -> None:
_f = self._build_fallback_driver_method("to_stata")
return _f(*args, **kwargs)

def __getattr__(self, key: str) -> Any:
if key.startswith("__"):
raise AttributeError(key)
if hasattr(MissingPandasLikeDataFrame, key):
if key in [
"asfreq",
"asof",
"convert_dtypes",
"infer_objects",
"set_axis",
] and get_option("compute.pandas_fallback"):
return self._fall_back_frame(key)
if get_option("compute.pandas_fallback"):
new_key = f"_{key}_fallback"
if hasattr(self, new_key):
return getattr(self, new_key)

property_or_func = getattr(MissingPandasLikeDataFrame, key)
if isinstance(property_or_func, property):
Expand Down

0 comments on commit d20650b

Please sign in to comment.