From b05f87d7b27eb73e8612c249d9d93d0678aea73e Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 16 Feb 2021 10:40:55 -0800 Subject: [PATCH] [Object Spilling] Share the same S3 session for smart_open spilling. (#13904) --- python/ray/external_storage.py | 19 ++++++++++++++----- python/ray/tests/test_object_spilling.py | 9 +++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index 138561f432e2..0d24513d1a46 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -326,16 +326,25 @@ def __init__(self, except ModuleNotFoundError as e: raise ModuleNotFoundError( "Smart open is chosen to be a object spilling " - "external storage, but smart_open " + "external storage, but smart_open and boto3 " f"is not downloaded. Original error: {e}") self.uri = uri.strip("/") self.prefix = prefix self.override_transport_params = override_transport_params or {} - # smart_open always seek to 0 if we don't set this argument. - # This will lead us to call a Object.get when it is not necessary, - # so defer seek and call seek before reading objects instead. - self.transport_params = {"defer_seek": True} + self.is_for_s3 = uri.startswith("s3") + + if self.is_for_s3: + import boto3 # noqa + # Setup boto3. It is essential because if we don't create boto + # session, smart_open will create a new session for every + # open call. + self.s3 = boto3.resource(service_name="s3") + + # smart_open always seek to 0 if we don't set this argument. + # This will lead us to call a Object.get when it is not necessary, + # so defer seek and call seek before reading objects instead. + self.transport_params = {"defer_seek": True, "resource": self.s3} self.transport_params.update(self.override_transport_params) def spill_objects(self, object_refs, owner_addresses) -> List[str]: diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index e0e3033d255a..1509feb375b2 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -14,8 +14,13 @@ from ray.test_utils import wait_for_condition, run_string_as_driver from ray.internal.internal_api import memory_summary +# -- Smart open param -- bucket_name = "object-spilling-test" + +# -- File system param -- spill_local_path = "/tmp/spill" + +# -- Spilling configs -- file_system_object_spilling_config = { "type": "filesystem", "params": { @@ -40,10 +45,10 @@ def create_object_spilling_config(request, tmp_path): + temp_folder = tmp_path / "spill" + temp_folder.mkdir() if (request.param["type"] == "filesystem" or request.param["type"] == "mock_distributed_fs"): - temp_folder = tmp_path / "spill" - temp_folder.mkdir() request.param["params"]["directory_path"] = str(temp_folder) return json.dumps(request.param), temp_folder