Skip to content

Commit

Permalink
FEAT-modin-project#2627: fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Aug 24, 2021
1 parent 8501f2c commit 826f478
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 3 deletions.
7 changes: 7 additions & 0 deletions modin/data_management/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ def read_csv(cls, **kwargs):
def read_csv_glob(cls, **kwargs):
return cls.__factory._read_csv_glob(**kwargs)

@classmethod
@_inherit_docstrings(
factories.ExperimentalPandasOnRayFactory._read_pickle_distributed
)
def read_pickle_distributed(cls, **kwargs):
return cls.__factory._read_pickle_distributed(**kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._read_json)
def read_json(cls, **kwargs):
Expand Down
9 changes: 9 additions & 0 deletions modin/data_management/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,15 @@ def prepare(cls):
def _read_csv_glob(cls, **kwargs):
return cls.io_cls.read_csv_glob(**kwargs)

@classmethod
@doc(
_doc_io_method_raw_template,
source="Pickle files",
params=_doc_io_method_kwargs_params,
)
def _read_pickle_distributed(cls, **kwargs):
return cls.io_cls.read_pickle_distributed(**kwargs)


@doc(_doc_factory_class, backend_name="experimental PandasOnPython")
class ExperimentalPandasOnPythonFactory(ExperimentalBaseFactory, PandasOnPythonFactory):
Expand Down
8 changes: 7 additions & 1 deletion modin/experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@
# in the user code
from .numpy_wrap import _CAUGHT_NUMPY # noqa F401
from modin.pandas import * # noqa F401, F403
from .io_exp import read_sql, read_csv_glob # noqa F401
from .io_exp import ( # noqa F401
read_sql,
read_csv_glob,
read_pickle_distributed,
to_pickle_distributed,
)
import warnings

setattr(DataFrame, "to_pickle_distributed", to_pickle_distributed) # noqa: F405

warnings.warn(
"Thank you for using the Modin Experimental pandas API."
Expand Down
65 changes: 63 additions & 2 deletions modin/experimental/pandas/io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

import inspect
import pathlib
from typing import Union, IO, AnyStr, Callable, Optional
import pickle
from typing import Any, Union, IO, AnyStr, Callable, Optional

import pandas
import pandas._libs.lib as lib
from pandas._typing import StorageOptions
from pandas._typing import CompressionOptions, FilePathOrBuffer, StorageOptions

from . import DataFrame
from modin.config import IsExperimental, Engine
Expand Down Expand Up @@ -220,3 +221,63 @@ def _read(**kwargs) -> DataFrame:


read_csv_glob = _make_parser_func(sep=",")


def read_pickle_distributed(
filepath_or_buffer: FilePathOrBuffer,
compression: Optional[str] = "infer",
storage_options: StorageOptions = None,
):
"""
Load pickled pandas object from file.
We can pass a list of files as an input parameter.
Note: the number of partitions is equal to the number of input files.
Parameters
----------
filepath_or_buffer : str, path object or file-like object
File path, URL, or buffer where the pickled object will be loaded from.
Accept URL. URL is not limited to S3 and GCS.
compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default 'infer'
If 'infer' and 'path_or_url' is path-like, then detect compression from
the following extensions: '.gz', '.bz2', '.zip', or '.xz' (otherwise no
compression) If 'infer' and 'path_or_url' is not path-like, then use
None (= no decompression).
storage_options : dict, optional
Extra options that make sense for a particular storage connection, e.g.
host, port, username, password, etc., if using a URL that will be parsed by
fsspec, e.g., starting “s3://”, “gcs://”. An error will be raised if providing
this argument with a non-fsspec URL. See the fsspec and backend storage
implementation docs for the set of allowed keys and values.
Returns
-------
unpickled : same type as object stored in file
"""

Engine.subscribe(_update_engine)
assert IsExperimental.get(), "This only works in experimental mode"
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
return DataFrame(query_compiler=FactoryDispatcher.read_pickle_distributed(**kwargs))


def to_pickle_distributed(
obj: Any,
filepath_or_buffer: FilePathOrBuffer,
compression: CompressionOptions = "infer",
protocol: int = pickle.HIGHEST_PROTOCOL,
storage_options: StorageOptions = None,
):
from modin.data_management.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
if isinstance(obj, DataFrame):
obj = obj._query_compiler
return FactoryDispatcher.to_pickle_distributed(
obj,
filepath_or_buffer=filepath_or_buffer,
compression=compression,
protocol=protocol,
storage_options=storage_options,
)

0 comments on commit 826f478

Please sign in to comment.