From af6df7040ecd672850f24718eb06837f44e02213 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 1 Mar 2023 14:26:51 +0800 Subject: [PATCH] Be compatible with lower versions of ray. (#72) We make that RayFed is able to be compatible with the lower versions of Ray. In this PR, we tested it on Ray1.13.0, that means RayFed is able to be run on Ray1.13.0 officially. --- .github/workflows/test_on_ray1.13.0.yml | 37 +++++++++++ ...{ubuntu_basic.yml => test_on_ray2.0.0.yml} | 2 +- fed/_private/compatible_utils.py | 62 +++++++++++++++++++ fed/_private/constants.py | 2 + fed/api.py | 14 +++-- fed/utils.py | 16 +++++ tests/test_transport_proxy.py | 3 +- tests/test_transport_proxy_tls.py | 3 +- 8 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 .github/workflows/test_on_ray1.13.0.yml rename .github/workflows/{ubuntu_basic.yml => test_on_ray2.0.0.yml} (97%) create mode 100644 fed/_private/compatible_utils.py diff --git a/.github/workflows/test_on_ray1.13.0.yml b/.github/workflows/test_on_ray1.13.0.yml new file mode 100644 index 0000000..2f228a6 --- /dev/null +++ b/.github/workflows/test_on_ray1.13.0.yml @@ -0,0 +1,37 @@ +name: test-on-ray1.13.0 + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + run-unit-tests: + timeout-minutes: 60 + runs-on: ubuntu-latest + container: docker.io/library/ubuntu:latest + + steps: + - uses: actions/checkout@v2 + + - name: Install bazel + run: | + apt-get update + apt-get install -yq wget gcc g++ python3.9 zlib1g-dev zip libuv1.dev + apt-get install -yq pip + + - name: Install dependencies + run: | + python3 -m pip install virtualenv + python3 -m virtualenv -p python3 py3 + . py3/bin/activate + which python + pip install pytest torch cloudpickle cryptography + pip install ray==1.13.0 + + - name: Build and test + run: | + . py3/bin/activate + python3 setup.py install + sh test.sh diff --git a/.github/workflows/ubuntu_basic.yml b/.github/workflows/test_on_ray2.0.0.yml similarity index 97% rename from .github/workflows/ubuntu_basic.yml rename to .github/workflows/test_on_ray2.0.0.yml index d249f45..55d064e 100644 --- a/.github/workflows/ubuntu_basic.yml +++ b/.github/workflows/test_on_ray2.0.0.yml @@ -1,4 +1,4 @@ -name: ubuntu-basic +name: test-on-ray2.0.0 on: push: diff --git a/fed/_private/compatible_utils.py b/fed/_private/compatible_utils.py new file mode 100644 index 0000000..d921cf6 --- /dev/null +++ b/fed/_private/compatible_utils.py @@ -0,0 +1,62 @@ +# Copyright 2022 The RayFed Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ray +import fed._private.constants as fed_constants + + +def _compare_version_strings(version1, version2): + """ + This utility function compares two version strings and returns + True if version1 is greater, and False if they're equal, and + False if version2 is greater. + """ + v1_list = version1.split('.') + v2_list = version2.split('.') + len1 = len(v1_list) + len2 = len(v2_list) + + for i in range(min(len1, len2)): + if v1_list[i] == v2_list[i]: + continue + else: + break + + return int(v1_list[i]) > int(v2_list[i]) + + +def _ray_version_less_than_2_0_0(): + """ Whther the current ray version is less 2.0.0. + """ + return _compare_version_strings( + fed_constants.RAY_VERSION_2_0_0_STR, ray.__version__) + + +def init_ray(address: str = None, **kwargs): + """A compatible API to init Ray. + """ + if address == 'local' and _ray_version_less_than_2_0_0(): + # Ignore the `local` when ray < 2.0.0 + ray.init(**kwargs) + else: + ray.init(address=address, **kwargs) + + +def get_gcs_address_from_ray_worker(): + """A compatible API to get the gcs address from the ray worker module. + """ + try: + return ray._private.worker._global_node.gcs_address + except AttributeError: + return ray.worker._global_node.gcs_address diff --git a/fed/_private/constants.py b/fed/_private/constants.py index d35d067..fabbfe1 100644 --- a/fed/_private/constants.py +++ b/fed/_private/constants.py @@ -24,3 +24,5 @@ RAYFED_LOG_FMT = "%(asctime)s %(levelname)s %(filename)s:%(lineno)s [%(party)s] -- %(message)s" # noqa RAYFED_DATE_FMT = "%Y-%m-%d %H:%M:%S" + +RAY_VERSION_2_0_0_STR = "2.0.0" diff --git a/fed/api.py b/fed/api.py index e49ae9f..1e3bfe8 100644 --- a/fed/api.py +++ b/fed/api.py @@ -21,7 +21,8 @@ import ray import ray.experimental.internal_kv as internal_kv from ray._private.gcs_utils import GcsClient -from ray._private.inspect_util import is_cython +import fed.utils as fed_utils +import fed._private.compatible_utils as compatible_utils from fed._private.constants import ( RAYFED_CLUSTER_KEY, @@ -152,16 +153,18 @@ def init( assert cluster, "Cluster should be provided." assert party, "Party should be provided." assert party in cluster, f"Party {party} is not in cluster {cluster}." - ray.init(address=address, **kwargs) + compatible_utils.init_ray(address=address, **kwargs) tls_config = {} if tls_config is None else tls_config if tls_config: assert ( 'cert' in tls_config and 'key' in tls_config ), 'Cert or key are not in tls_config.' # A Ray private accessing, should be replaced in public API. - gcs_address = ray._private.worker._global_node.gcs_address - gcs_client = GcsClient(address=gcs_address, nums_reconnect_retry=10) + + gcs_client = GcsClient( + address=compatible_utils.get_gcs_address_from_ray_worker(), + nums_reconnect_retry=10) internal_kv._initialize_internal_kv(gcs_client) internal_kv._internal_kv_put(RAYFED_CLUSTER_KEY, cloudpickle.dumps(cluster)) internal_kv._internal_kv_put(RAYFED_PARTY_KEY, cloudpickle.dumps(party)) @@ -309,7 +312,8 @@ def remote(self, *cls_args, **cls_kwargs): # This is the decorator `@fed.remote` def remote(*args, **kwargs): def _make_fed_remote(function_or_class, **options): - if inspect.isfunction(function_or_class) or is_cython(function_or_class): + if (inspect.isfunction(function_or_class) + or fed_utils.is_cython(function_or_class)): return FedRemoteFunction(function_or_class).options(**options) if inspect.isclass(function_or_class): diff --git a/fed/utils.py b/fed/utils.py index 0085a0b..a83c1d6 100644 --- a/fed/utils.py +++ b/fed/utils.py @@ -122,3 +122,19 @@ def load_cert_config(cert_config): cert_chain = file.read() return ca_cert, private_key, cert_chain + + +def is_cython(obj): + """Check if an object is a Cython function or method""" + + # TODO(suo): We could split these into two functions, one for Cython + # functions and another for Cython methods. + # TODO(suo): There doesn't appear to be a Cython function 'type' we can + # check against via isinstance. Please correct me if I'm wrong. + def check_cython(x): + return type(x).__name__ == "cython_function_or_method" + + # Check if function or method, respectively + return check_cython(obj) or ( + hasattr(obj, "__func__") and check_cython(obj.__func__) + ) diff --git a/tests/test_transport_proxy.py b/tests/test_transport_proxy.py index d13fcb3..3ca8981 100644 --- a/tests/test_transport_proxy.py +++ b/tests/test_transport_proxy.py @@ -18,6 +18,7 @@ from fed.barriers import RecverProxyActor, send, start_send_proxy from fed.cleanup import wait_sending +import fed._private.compatible_utils as compatible_utils def test_n_to_1_transport(): @@ -25,7 +26,7 @@ def test_n_to_1_transport(): sending data to the target recver proxy, and there also have N receivers to `get_data` from Recver proxy at that time. """ - ray.init(address='local') + compatible_utils.init_ray(address='local') NUM_DATA = 10 SERVER_ADDRESS = "127.0.0.1:12344" recver_proxy_actor = RecverProxyActor.options( diff --git a/tests/test_transport_proxy_tls.py b/tests/test_transport_proxy_tls.py index 3e4ff81..c360856 100644 --- a/tests/test_transport_proxy_tls.py +++ b/tests/test_transport_proxy_tls.py @@ -18,6 +18,7 @@ import ray import cloudpickle import ray.experimental.internal_kv as internal_kv +import fed._private.compatible_utils as compatible_utils from fed.barriers import RecverProxyActor, send, start_send_proxy from fed.cleanup import wait_sending @@ -29,7 +30,7 @@ def test_n_to_1_transport(): sending data to the target recver proxy, and there also have N receivers to `get_data` from Recver proxy at that time. """ - ray.init(address='local') + compatible_utils.init_ray(address='local') cert_dir = os.path.join( os.path.dirname(os.path.abspath(__file__)), "/tmp/rayfed/test-certs/"