Skip to content

Commit

Permalink
[Object Spilling] Share the same S3 session for smart_open spilling. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rkooo567 authored Feb 16, 2021
1 parent c43a642 commit b05f87d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
19 changes: 14 additions & 5 deletions python/ray/external_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,25 @@ def __init__(self,
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"Smart open is chosen to be a object spilling "
"external storage, but smart_open "
"external storage, but smart_open and boto3 "
f"is not downloaded. Original error: {e}")

self.uri = uri.strip("/")
self.prefix = prefix
self.override_transport_params = override_transport_params or {}
# smart_open always seek to 0 if we don't set this argument.
# This will lead us to call a Object.get when it is not necessary,
# so defer seek and call seek before reading objects instead.
self.transport_params = {"defer_seek": True}
self.is_for_s3 = uri.startswith("s3")

if self.is_for_s3:
import boto3 # noqa
# Setup boto3. It is essential because if we don't create boto
# session, smart_open will create a new session for every
# open call.
self.s3 = boto3.resource(service_name="s3")

# smart_open always seek to 0 if we don't set this argument.
# This will lead us to call a Object.get when it is not necessary,
# so defer seek and call seek before reading objects instead.
self.transport_params = {"defer_seek": True, "resource": self.s3}
self.transport_params.update(self.override_transport_params)

def spill_objects(self, object_refs, owner_addresses) -> List[str]:
Expand Down
9 changes: 7 additions & 2 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
from ray.test_utils import wait_for_condition, run_string_as_driver
from ray.internal.internal_api import memory_summary

# -- Smart open param --
bucket_name = "object-spilling-test"

# -- File system param --
spill_local_path = "/tmp/spill"

# -- Spilling configs --
file_system_object_spilling_config = {
"type": "filesystem",
"params": {
Expand All @@ -40,10 +45,10 @@


def create_object_spilling_config(request, tmp_path):
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
if (request.param["type"] == "filesystem"
or request.param["type"] == "mock_distributed_fs"):
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
request.param["params"]["directory_path"] = str(temp_folder)
return json.dumps(request.param), temp_folder

Expand Down

0 comments on commit b05f87d

Please sign in to comment.