Skip to content

Commit

Permalink
FIX-#4017: correct engine subscribe process
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Myskov <alexander.myskov@intel.com>
  • Loading branch information
amyskov committed Jan 24, 2022
1 parent cb4e727 commit 63e07dd
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 23 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,6 @@ jobs:
run:
shell: bash -l {0}
env:
MODIN_EXPERIMENTAL: "True"
MODIN_ENGINE: "native"
MODIN_STORAGE_FORMAT: "omnisci"
name: Test OmniSci storage format, Python 3.7
steps:
Expand Down
2 changes: 2 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from modin.config import Engine, StorageFormat, IsExperimental
from modin.core.execution.dispatching.factories import factories
from modin.utils import get_current_execution, _inherit_docstrings
from modin.pandas import _update_engine


class FactoryNotFoundError(AttributeError):
Expand Down Expand Up @@ -294,5 +295,6 @@ def to_parquet(cls, *args, **kwargs):
return cls.__factory._to_parquet(*args, **kwargs)


Engine.subscribe(_update_engine)
Engine.subscribe(FactoryDispatcher._update_factory)
StorageFormat.subscribe(FactoryDispatcher._update_factory)
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
import pytest
import re

from modin.config import IsExperimental, Engine, StorageFormat
from modin.config import StorageFormat
from modin.pandas.test.utils import io_ops_bad_exc, default_to_pandas_ignore_string
from .utils import eval_io, ForceOmnisciImport, set_execution_mode, run_and_compare
from pandas.core.dtypes.common import is_list_like

IsExperimental.put(True)
Engine.put("native")
StorageFormat.put("omnisci")

import modin.pandas as pd
Expand Down
18 changes: 0 additions & 18 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def _read(**kwargs):
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
squeeze = kwargs.pop("squeeze", False)
pd_obj = FactoryDispatcher.read_csv(**kwargs)
# This happens when `read_csv` returns a TextFileReader object for iterating through
Expand Down Expand Up @@ -211,7 +210,6 @@ def read_parquet(
):
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(
query_compiler=FactoryDispatcher.read_parquet(
path=path,
Expand Down Expand Up @@ -248,7 +246,6 @@ def read_json(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_json(**kwargs))


Expand All @@ -273,7 +270,6 @@ def read_gbq(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_gbq(**kwargs))


Expand All @@ -299,7 +295,6 @@ def read_html(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_html(**kwargs))


Expand All @@ -310,7 +305,6 @@ def read_clipboard(sep=r"\s+", **kwargs): # pragma: no cover

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_clipboard(**kwargs))


Expand Down Expand Up @@ -347,7 +341,6 @@ def read_excel(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
intermediate = FactoryDispatcher.read_excel(**kwargs)
if isinstance(intermediate, (OrderedDict, dict)):
parsed = type(intermediate)()
Expand Down Expand Up @@ -377,7 +370,6 @@ def read_hdf(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_hdf(**kwargs))


Expand All @@ -392,7 +384,6 @@ def read_feather(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_feather(**kwargs))


Expand All @@ -415,7 +406,6 @@ def read_stata(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_stata(**kwargs))


Expand All @@ -432,7 +422,6 @@ def read_sas(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_sas(**kwargs))


Expand All @@ -446,7 +435,6 @@ def read_pickle(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_pickle(**kwargs))


Expand All @@ -465,7 +453,6 @@ def read_sql(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
if kwargs.get("chunksize") is not None:
ErrorMessage.default_to_pandas("Parameters provided [chunksize]")
df_gen = pandas.read_sql(**kwargs)
Expand All @@ -486,7 +473,6 @@ def read_fwf(
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
from pandas.io.parsers.base_parser import parser_defaults

Engine.subscribe(_update_engine)
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
kwargs.update(kwargs.pop("kwds", {}))
target_kwargs = parser_defaults.copy()
Expand Down Expand Up @@ -517,7 +503,6 @@ def read_sql_table(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_sql_table(**kwargs))


Expand All @@ -536,7 +521,6 @@ def read_sql_query(

from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(query_compiler=FactoryDispatcher.read_sql_query(**kwargs))


Expand All @@ -548,7 +532,6 @@ def read_spss(
):
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
return DataFrame(
query_compiler=FactoryDispatcher.read_spss(path, usecols, convert_categoricals)
)
Expand All @@ -564,7 +547,6 @@ def to_pickle(
):
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

Engine.subscribe(_update_engine)
if isinstance(obj, DataFrame):
obj = obj._query_compiler
return FactoryDispatcher.to_pickle(
Expand Down

0 comments on commit 63e07dd

Please sign in to comment.