Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4017: Fix OmniSci engine enabling for IO functions #4037

Merged
merged 2 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,6 @@ jobs:
run:
shell: bash -l {0}
env:
MODIN_EXPERIMENTAL: "True"
MODIN_ENGINE: "native"
MODIN_STORAGE_FORMAT: "omnisci"
name: Test OmniSci storage format, Python 3.7
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
import pytest
import re

from modin.config import IsExperimental, Engine, StorageFormat
from modin.config import StorageFormat
from modin.pandas.test.utils import io_ops_bad_exc, default_to_pandas_ignore_string
from .utils import eval_io, ForceOmnisciImport, set_execution_mode, run_and_compare
from pandas.core.dtypes.common import is_list_like

IsExperimental.put(True)
Engine.put("native")
StorageFormat.put("omnisci")

import modin.pandas as pd
Expand Down
36 changes: 18 additions & 18 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def _read(**kwargs):
-------
modin.pandas.DataFrame
"""
Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
squeeze = kwargs.pop("squeeze", False)
pd_obj = FactoryDispatcher.read_csv(**kwargs)
# This happens when `read_csv` returns a TextFileReader object for iterating through
Expand Down Expand Up @@ -209,9 +209,9 @@ def read_parquet(
use_nullable_dtypes: bool = False,
**kwargs,
):
Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(
query_compiler=FactoryDispatcher.read_parquet(
path=path,
Expand Down Expand Up @@ -246,9 +246,9 @@ def read_json(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_json(**kwargs))


Expand All @@ -271,9 +271,9 @@ def read_gbq(
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
kwargs.update(kwargs.pop("kwargs", {}))

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_gbq(**kwargs))


Expand All @@ -297,9 +297,9 @@ def read_html(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_html(**kwargs))


Expand All @@ -308,9 +308,9 @@ def read_clipboard(sep=r"\s+", **kwargs): # pragma: no cover
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
kwargs.update(kwargs.pop("kwargs", {}))

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_clipboard(**kwargs))


Expand Down Expand Up @@ -345,9 +345,9 @@ def read_excel(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
intermediate = FactoryDispatcher.read_excel(**kwargs)
if isinstance(intermediate, (OrderedDict, dict)):
parsed = type(intermediate)()
Expand Down Expand Up @@ -375,9 +375,9 @@ def read_hdf(
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
kwargs.update(kwargs.pop("kwargs", {}))

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_hdf(**kwargs))


Expand All @@ -390,9 +390,9 @@ def read_feather(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_feather(**kwargs))


Expand All @@ -413,9 +413,9 @@ def read_stata(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_stata(**kwargs))


Expand All @@ -430,9 +430,9 @@ def read_sas(
): # pragma: no cover
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_sas(**kwargs))


Expand All @@ -444,9 +444,9 @@ def read_pickle(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_pickle(**kwargs))


Expand All @@ -463,9 +463,9 @@ def read_sql(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
if kwargs.get("chunksize") is not None:
ErrorMessage.default_to_pandas("Parameters provided [chunksize]")
df_gen = pandas.read_sql(**kwargs)
Expand All @@ -483,10 +483,10 @@ def read_fwf(
infer_nrows=100,
**kwds,
):
Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
from pandas.io.parsers.base_parser import parser_defaults

Engine.subscribe(_update_engine)
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
kwargs.update(kwargs.pop("kwds", {}))
target_kwargs = parser_defaults.copy()
Expand Down Expand Up @@ -515,9 +515,9 @@ def read_sql_table(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_sql_table(**kwargs))


Expand All @@ -534,9 +534,9 @@ def read_sql_query(
):
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())

Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_sql_query(**kwargs))


Expand All @@ -546,9 +546,9 @@ def read_spss(
usecols: Union[Sequence[str], type(None)] = None,
convert_categoricals: bool = True,
):
Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(
query_compiler=FactoryDispatcher.read_spss(path, usecols, convert_categoricals)
)
Expand All @@ -562,9 +562,9 @@ def to_pickle(
protocol: int = pickle.HIGHEST_PROTOCOL,
storage_options: StorageOptions = None,
):
Engine.subscribe(_update_engine)
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
if isinstance(obj, DataFrame):
obj = obj._query_compiler
return FactoryDispatcher.to_pickle(
Expand Down