Skip to content

Commit

Permalink
Experimental utils for copying directories to other machines in the c… (
Browse files Browse the repository at this point in the history
ray-project#150)

* Experimental utils for copying directories to other machines in the cluster using Ray.

* Test copying directory functionality.

* Small fix.
  • Loading branch information
robertnishihara authored and pcmoritz committed Dec 23, 2016
1 parent 86b211f commit 8d90c9f
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# This is done by associating all child processes with a "job" object that imposes this behavior.
(lambda kernel32: (lambda job: (lambda n: kernel32.SetInformationJobObject(job, 9, "\0" * 17 + chr(0x8 | 0x4 | 0x20) + "\0" * (n - 18), n))(0x90 if ctypes.sizeof(ctypes.c_void_p) > ctypes.sizeof(ctypes.c_int) else 0x70) and kernel32.AssignProcessToJobObject(job, ctypes.c_void_p(kernel32.GetCurrentProcess())))(ctypes.c_void_p(kernel32.CreateJobObjectW(None, None))) if kernel32 is not None else None)(ctypes.windll.kernel32)

import ray.experimental
import ray.serialization
from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote
from ray.worker import Reusable, reusables
Expand Down
5 changes: 5 additions & 0 deletions lib/python/ray/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from .utils import copy_directory
74 changes: 74 additions & 0 deletions lib/python/ray/experimental/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import io
import os
import tarfile
import sys

import ray

def tarred_directory_as_bytes(source_dir):
"""Tar a directory and return it as a byte string.
Args:
source_dir (str): The name of the directory to tar.
Returns:
A byte string representing the tarred file.
"""
# Get a BytesIO object.
string_file = io.BytesIO()
# Create an in-memory tarfile of the source directory.
with tarfile.open(mode="w:gz", fileobj=string_file) as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
string_file.seek(0)
return string_file.read()

def tarred_bytes_to_directory(tarred_bytes, target_dir):
"""Take a byte string and untar it.
Args:
tarred_bytes (str): A byte string representing the tarred file. This should
be the output of tarred_directory_as_bytes.
target_dir (str): The directory to create the untarred files in.
"""
string_file = io.BytesIO(tarred_bytes)
with tarfile.open(fileobj=string_file) as tar:
tar.extractall(path=target_dir)

def copy_directory(source_dir, target_dir=None):
"""Copy a local directory to each machine in the Ray cluster.
Note that both source_dir and target_dir must have the same basename). For
example, source_dir can be /a/b/c and target_dir can be /d/e/c. In this case,
the directory /d/e will be added to the Python path of each worker.
Note that this method is not completely safe to use. For example, workers that
do not do the copying and only set their paths (only one worker per node does
the copying) may try to execute functions that use the files in the directory
being copied before the directory being copied has finished untarring.
Args:
source_dir (str): The directory to copy.
target_dir (str): The location to copy it to on the other machines. If this
is not provided, the source_dir will be used. If it is provided and is
different from source_dir, the source_dir also be copied to the target_dir
location on this machine.
"""
target_dir = source_dir if target_dir is None else target_dir
source_dir = os.path.abspath(source_dir)
target_dir = os.path.abspath(target_dir)
source_basename = os.path.basename(source_dir)
target_basename = os.path.basename(target_dir)
if source_basename != target_basename:
raise Exception("The source_dir and target_dir must have the same base name, {} != {}".format(source_basename, target_basename))
tarred_bytes = tarred_directory_as_bytes(source_dir)
def f(worker_info):
if worker_info["counter"] == 0:
tarred_bytes_to_directory(tarred_bytes, os.path.dirname(target_dir))
sys.path.append(os.path.dirname(target_dir))
# Run this function on all workers to copy the directory to all nodes and to
# add the directory to the Python path of each worker.
ray.worker.global_worker.run_function_on_all_workers(f)
50 changes: 50 additions & 0 deletions test/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from __future__ import division
from __future__ import print_function

import os
import unittest
import ray
import numpy as np
import time
import shutil
import string
import sys
from collections import namedtuple
Expand Down Expand Up @@ -673,5 +675,53 @@ def use_foo():

ray.worker.cleanup()

class UtilsTest(unittest.TestCase):

def testCopyingDirectory(self):
# The functionality being tested here is really multi-node functionality,
# but this test just uses a single node.

ray.init(start_ray_local=True, num_workers=1)

source_text = "hello world"

temp_dir1 = os.path.join(os.path.dirname(__file__), "temp_dir1")
source_dir = os.path.join(temp_dir1, "dir")
source_file = os.path.join(source_dir, "file.txt")
temp_dir2 = os.path.join(os.path.dirname(__file__), "temp_dir2")
target_dir = os.path.join(temp_dir2, "dir")
target_file = os.path.join(target_dir, "file.txt")

def remove_temporary_files():
if os.path.exists(temp_dir1):
shutil.rmtree(temp_dir1)
if os.path.exists(temp_dir2):
shutil.rmtree(temp_dir2)

# Remove the relevant files if they are left over from a previous run of
# this test.
remove_temporary_files()

# Create the source files.
os.mkdir(temp_dir1)
os.mkdir(source_dir)
with open(source_file, "w") as f:
f.write(source_text)

# Copy the source directory to the target directory.
ray.experimental.copy_directory(source_dir, target_dir)
time.sleep(0.5)

# Check that the target files exist and are the same as the source files.
self.assertTrue(os.path.exists(target_dir))
self.assertTrue(os.path.exists(target_file))
with open(target_file, "r") as f:
self.assertEqual(f.read(), source_text)

# Remove the relevant files to clean up.
remove_temporary_files()

ray.worker.cleanup()

if __name__ == "__main__":
unittest.main(verbosity=2)

0 comments on commit 8d90c9f

Please sign in to comment.