Skip to content
49 changes: 49 additions & 0 deletions modin/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import pytest
import os


def pytest_addoption(parser):
parser.addoption(
"--simulate-cloud",
action="store",
default="off",
help="simulate cloud for testing: off|normal|experimental",
)


@pytest.fixture(scope="session", autouse=True)
def simulate_cloud(request):
mode = request.config.getoption("--simulate-cloud").lower()
if mode == "off":
yield
return
if mode not in ("normal", "experimental"):
raise ValueError(f"Unsupported --simulate-cloud mode: {mode}")
os.environ["MODIN_EXPERIMENTAL"] = "True"

from modin.experimental.cloud import create_cluster, get_connection

with create_cluster("local", __spawner__="local"):

def set_env(mode):
import os

os.environ["MODIN_EXPERIMENTAL"] = (
"True" if mode == "experimental" else "False"
)

get_connection().teleport(set_env)(mode)
yield
4 changes: 4 additions & 0 deletions modin/data_management/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,7 @@ def wrap(*a, _original=getattr(self.__io_cls, name), **kw):

class ExperimentalPandasOnCloudrayFactory(ExperimentalRemoteFactory):
wrapped_factory = PandasOnRayFactory


class ExperimentalPandasOnCloudpythonFactory(ExperimentalRemoteFactory):
wrapped_factory = PandasOnPythonFactory
7 changes: 5 additions & 2 deletions modin/experimental/cloud/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class BaseCluster:

target_engine = None
target_partition = None
Connector = Connection

def __init__(
self,
Expand Down Expand Up @@ -140,7 +141,7 @@ def spawn(self, wait=False):
if wait:
# cluster is ready now
if self.connection is None:
self.connection = Connection(
self.connection = self.Connector(
self._get_connection_details(), self._get_main_python()
)

Expand Down Expand Up @@ -261,7 +262,7 @@ def create(

Using SOCKS proxy requires Ray newer than 0.8.6, which might need to be installed manually.
"""
if not isinstance(provider, Provider):
if not isinstance(provider, Provider) and __spawner__ != "local":
provider = Provider(
name=provider,
credentials_file=credentials,
Expand All @@ -277,6 +278,8 @@ def create(
)
if __spawner__ == "rayscale":
from .rayscale import RayCluster as Spawner
elif __spawner__ == "local":
from .local_cluster import LocalCluster as Spawner
else:
raise ValueError(f"Unknown spawner: {__spawner__}")
instance = Spawner(
Expand Down
38 changes: 22 additions & 16 deletions modin/experimental/cloud/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import os
import random
import time
import tempfile
import sys

from .base import ClusterError, ConnectionDetails, _get_ssh_proxy_command

Expand All @@ -41,8 +43,8 @@ def __init__(self, details: ConnectionDetails, main_python: str, log_rpyc=None):
self.proc = None

# find where rpyc_classic is located
locator = self.__run(
self.__build_sshcmd(details),
locator = self._run(
self._build_sshcmd(details),
[
main_python,
"-c",
Expand All @@ -67,14 +69,14 @@ def __init__(self, details: ConnectionDetails, main_python: str, log_rpyc=None):
cmd = [
main_python,
rpyc_classic,
"--port",
str(self.rpyc_port),
]
if log_rpyc:
cmd.extend(["--logfile", "/tmp/rpyc.log"])
cmd.extend(["--logfile", f"{tempfile.gettempdir()}/rpyc.log"])
for _ in range(self.tries):
proc = self.__run(
self.__build_sshcmd(details, forward_port=port), cmd, capture_out=False
proc = self._run(
self._build_sshcmd(details, forward_port=port),
cmd + ["--port", str(port)],
capture_out=False,
)
if self.__wait_noexc(proc, 1) is None:
# started successfully
Expand All @@ -101,17 +103,22 @@ def get(cls):

return cls.__current.__connection

@staticmethod
def _get_service():
from .rpyc_proxy import WrappingService

return WrappingService

def __try_connect(self):
import rpyc
from .rpyc_proxy import WrappingService

try:
stream = rpyc.SocketStream.connect(
host="127.0.0.1", port=self.rpyc_port, nodelay=True, keepalive=True
)
self.__connection = rpyc.connect_stream(
stream,
WrappingService,
self._get_service(),
config={"sync_request_timeout": RPYC_REQUEST_TIMEOUT},
)
except (ConnectionRefusedError, EOFError):
Expand All @@ -138,8 +145,9 @@ def deactivate(self):
if Connection.__current is self:
Connection.__current = None

def stop(self, sigint=signal.SIGINT):
# capture signal.SIGINT in closure so it won't get removed before __del__ is called
def stop(self, sigint=signal.SIGINT if sys.platform != "win32" else signal.SIGTERM):
# capture signal number in closure so it won't get removed before __del__ is called
# which might happen if connection is being destroyed during interpreter destruction
self.deactivate()
if self.proc and self.proc.poll() is None:
self.proc.send_signal(sigint)
Expand All @@ -152,7 +160,7 @@ def stop(self, sigint=signal.SIGINT):
def __del__(self):
self.stop()

def __build_sshcmd(self, details: ConnectionDetails, forward_port: int = None):
def _build_sshcmd(self, details: ConnectionDetails, forward_port: int = None):
opts = [
("ConnectTimeout", "{}s".format(self.connect_timeout)),
("StrictHostKeyChecking", "no"),
Expand All @@ -173,15 +181,13 @@ def __build_sshcmd(self, details: ConnectionDetails, forward_port: int = None):
for oname, ovalue in opts:
cmdline.extend(["-o", f"{oname}={ovalue}"])
if forward_port:
cmdline.extend(
["-L", f"127.0.0.1:{forward_port}:127.0.0.1:{self.rpyc_port}"]
)
cmdline.extend(["-L", f"127.0.0.1:{forward_port}:127.0.0.1:{forward_port}"])
cmdline.append(f"{details.user_name}@{details.address}")

return cmdline

@staticmethod
def __run(sshcmd: list, cmd: list, capture_out: bool = True):
def _run(sshcmd: list, cmd: list, capture_out: bool = True):
redirect = subprocess.PIPE if capture_out else subprocess.DEVNULL
return subprocess.Popen(
sshcmd + [subprocess.list2cmdline(cmd)],
Expand Down
114 changes: 114 additions & 0 deletions modin/experimental/cloud/local_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import subprocess
import sys
import warnings

from .base import ConnectionDetails
from .cluster import BaseCluster
from .connection import Connection
from .rpyc_proxy import WrappingConnection, WrappingService, _TRACE_RPYC
from .tracing.tracing_connection import TracingWrappingConnection


class LocalWrappingConnection(
TracingWrappingConnection if _TRACE_RPYC else WrappingConnection
):
def _init_deliver(self):
def ensure_modin(modin_init):
import sys
import os

modin_dir = os.path.abspath(os.path.join(os.path.dirname(modin_init), ".."))
# make sure "import modin" will be taken from current modin, not something potentially installed in the system
if modin_dir not in sys.path:
sys.path.insert(0, modin_dir)

import modin

self.teleport(ensure_modin)(modin.__file__)
super()._init_deliver()


class LocalWrappingService(WrappingService):
_protocol = LocalWrappingConnection


class LocalConnection(Connection):
def _build_sshcmd(self, details: ConnectionDetails, forward_port: int = None):
return []

@staticmethod
def _run(sshcmd: list, cmd: list, capture_out: bool = True):
assert not sshcmd, "LocalConnection does not support running things via ssh"
redirect = subprocess.PIPE if capture_out else subprocess.DEVNULL
return subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=redirect,
stderr=redirect,
)

@staticmethod
def _get_service():
return LocalWrappingService


_UNUSED = object()


class LocalCluster(BaseCluster):
target_engine = "Cloudpython"
target_partition = "Pandas"

Connector = LocalConnection

def __init__(
self,
provider,
project_name=_UNUSED,
cluster_name=_UNUSED,
worker_count=_UNUSED,
head_node_type=_UNUSED,
worker_node_type=_UNUSED,
):
assert (
provider == "local"
), "Local cluster can only be spawned with 'local' provider"
if any(
arg is not _UNUSED
for arg in (
project_name,
cluster_name,
worker_count,
head_node_type,
worker_node_type,
)
):
warnings.warn(
"All parameters except 'provider' are ignored for LocalCluster, do not pass them"
)
super().__init__(provider, "test-project", "test-cluster", 1, "head", "worker")

def _spawn(self, wait=False):
pass

def _destroy(self, wait=False):
pass

def _get_connection_details(self) -> ConnectionDetails:
return ConnectionDetails()

def _get_main_python(self) -> str:
return sys.executable
7 changes: 6 additions & 1 deletion modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
_is_first_update = {}
dask_client = None
_NOINIT_ENGINES = {
"Python"
"Python",
} # engines that don't require initialization, useful for unit tests


Expand Down Expand Up @@ -158,6 +158,11 @@ def init_remote_ray():
import modin.data_management.factories.dispatcher # noqa: F401

num_cpus = remote_ray.cluster_resources()["CPU"]
elif publisher.get() == "Cloudpython":
from modin.experimental.cloud import get_connection

get_connection().modules["modin"].set_backends("Python")

elif publisher.get() not in _NOINIT_ENGINES:
raise ImportError("Unrecognized execution engine: {}.".format(publisher.get()))

Expand Down