diff --git a/modin/data_management/factories/dispatcher.py b/modin/data_management/factories/dispatcher.py index d59eccbefab..89ead53688d 100644 --- a/modin/data_management/factories/dispatcher.py +++ b/modin/data_management/factories/dispatcher.py @@ -176,6 +176,13 @@ def read_csv(cls, **kwargs): def read_csv_glob(cls, **kwargs): return cls.__factory._read_csv_glob(**kwargs) + @classmethod + @_inherit_docstrings( + factories.ExperimentalPandasOnRayFactory._read_pickle_distributed + ) + def read_pickle_distributed(cls, **kwargs): + return cls.__factory._read_pickle_distributed(**kwargs) + @classmethod @_inherit_docstrings(factories.BaseFactory._read_json) def read_json(cls, **kwargs): diff --git a/modin/data_management/factories/factories.py b/modin/data_management/factories/factories.py index e94769866ce..ff35b1d6ca7 100644 --- a/modin/data_management/factories/factories.py +++ b/modin/data_management/factories/factories.py @@ -491,6 +491,15 @@ def prepare(cls): def _read_csv_glob(cls, **kwargs): return cls.io_cls.read_csv_glob(**kwargs) + @classmethod + @doc( + _doc_io_method_raw_template, + source="Pickle files", + params=_doc_io_method_kwargs_params, + ) + def _read_pickle_distributed(cls, **kwargs): + return cls.io_cls.read_pickle_distributed(**kwargs) + @doc(_doc_factory_class, backend_name="experimental PandasOnPython") class ExperimentalPandasOnPythonFactory(ExperimentalBaseFactory, PandasOnPythonFactory): diff --git a/modin/experimental/pandas/__init__.py b/modin/experimental/pandas/__init__.py index efe74e316b5..251a7dc153b 100644 --- a/modin/experimental/pandas/__init__.py +++ b/modin/experimental/pandas/__init__.py @@ -40,9 +40,15 @@ # in the user code from .numpy_wrap import _CAUGHT_NUMPY # noqa F401 from modin.pandas import * # noqa F401, F403 -from .io_exp import read_sql, read_csv_glob # noqa F401 +from .io_exp import ( # noqa F401 + read_sql, + read_csv_glob, + read_pickle_distributed, + to_pickle_distributed, +) import warnings +setattr(DataFrame, "to_pickle_distributed", to_pickle_distributed) # noqa: F405 warnings.warn( "Thank you for using the Modin Experimental pandas API." diff --git a/modin/experimental/pandas/io_exp.py b/modin/experimental/pandas/io_exp.py index 8e0d414723f..116117d8942 100644 --- a/modin/experimental/pandas/io_exp.py +++ b/modin/experimental/pandas/io_exp.py @@ -15,11 +15,12 @@ import inspect import pathlib -from typing import Union, IO, AnyStr, Callable, Optional +import pickle +from typing import Any, Union, IO, AnyStr, Callable, Optional import pandas import pandas._libs.lib as lib -from pandas._typing import StorageOptions +from pandas._typing import CompressionOptions, FilePathOrBuffer, StorageOptions from . import DataFrame from modin.config import IsExperimental, Engine @@ -220,3 +221,63 @@ def _read(**kwargs) -> DataFrame: read_csv_glob = _make_parser_func(sep=",") + + +def read_pickle_distributed( + filepath_or_buffer: FilePathOrBuffer, + compression: Optional[str] = "infer", + storage_options: StorageOptions = None, +): + """ + Load pickled pandas object from file. + + We can pass a list of files as an input parameter. + Note: the number of partitions is equal to the number of input files. + + Parameters + ---------- + filepath_or_buffer : str, path object or file-like object + File path, URL, or buffer where the pickled object will be loaded from. + Accept URL. URL is not limited to S3 and GCS. + compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default 'infer' + If 'infer' and 'path_or_url' is path-like, then detect compression from + the following extensions: '.gz', '.bz2', '.zip', or '.xz' (otherwise no + compression) If 'infer' and 'path_or_url' is not path-like, then use + None (= no decompression). + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will be parsed by + fsspec, e.g., starting “s3://”, “gcs://”. An error will be raised if providing + this argument with a non-fsspec URL. See the fsspec and backend storage + implementation docs for the set of allowed keys and values. + + Returns + ------- + unpickled : same type as object stored in file + """ + + Engine.subscribe(_update_engine) + assert IsExperimental.get(), "This only works in experimental mode" + _, _, _, kwargs = inspect.getargvalues(inspect.currentframe()) + return DataFrame(query_compiler=FactoryDispatcher.read_pickle_distributed(**kwargs)) + + +def to_pickle_distributed( + obj: Any, + filepath_or_buffer: FilePathOrBuffer, + compression: CompressionOptions = "infer", + protocol: int = pickle.HIGHEST_PROTOCOL, + storage_options: StorageOptions = None, +): + from modin.data_management.factories.dispatcher import FactoryDispatcher + + Engine.subscribe(_update_engine) + if isinstance(obj, DataFrame): + obj = obj._query_compiler + return FactoryDispatcher.to_pickle_distributed( + obj, + filepath_or_buffer=filepath_or_buffer, + compression=compression, + protocol=protocol, + storage_options=storage_options, + )