Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create RemoteFunction class, remove FunctionProperties, simplify worker Python code. #2052

Merged
merged 10 commits into from
May 14, 2018
4 changes: 2 additions & 2 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
e.args += (helpful_message, )
raise

from ray.local_scheduler import _config # noqa: E402
from ray.local_scheduler import ObjectID, _config # noqa: E402
from ray.worker import (error_info, init, connect, disconnect, get, put, wait,
remote, log_event, log_span, flush_log, get_gpu_ids,
get_webui_url,
Expand All @@ -68,7 +68,7 @@
"remote", "log_event", "log_span", "flush_log", "actor", "method",
"get_gpu_ids", "get_webui_url", "register_custom_serializer",
"SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE", "global_state",
"_config", "__version__"
"ObjectID", "_config", "__version__"
]

import ctypes # noqa: E402
Expand Down
379 changes: 165 additions & 214 deletions python/ray/actor.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def dtypes(self):

if isinstance(self._dtypes_cache, list) and \
isinstance(self._dtypes_cache[0],
ray.local_scheduler.ObjectID):
ray.ObjectID):
self._dtypes_cache = pd.concat(ray.get(self._dtypes_cache))
self._dtypes_cache.index = self.columns

Expand Down
18 changes: 9 additions & 9 deletions python/ray/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None,
self._cached_index = False

def _get__lengths(self):
if isinstance(self._lengths_cache, ray.local_scheduler.ObjectID) or \
if isinstance(self._lengths_cache, ray.ObjectID) or \
(isinstance(self._lengths_cache, list) and
isinstance(self._lengths_cache[0], ray.local_scheduler.ObjectID)):
isinstance(self._lengths_cache[0], ray.ObjectID)):
self._lengths_cache = ray.get(self._lengths_cache)
return self._lengths_cache

Expand All @@ -72,7 +72,7 @@ def _get__coord_df(self):
Since we may have had an index set before our coord_df was
materialized, we'll have to apply it to the newly materialized df
"""
if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID):
if isinstance(self._coord_df_cache, ray.ObjectID):
self._coord_df_cache = ray.get(self._coord_df_cache)
if self._cached_index:
self._coord_df_cache.index = self._index_cache
Expand All @@ -89,7 +89,7 @@ def _set__coord_df(self, coord_df):
If the set _IndexMetadata is an OID instead (due to a copy or whatever
reason), we fall back relying on `_index_cache`.
"""
if not isinstance(coord_df, ray.local_scheduler.ObjectID):
if not isinstance(coord_df, ray.ObjectID):
self._index_cache = coord_df.index
self._coord_df_cache = coord_df

Expand All @@ -102,7 +102,7 @@ def _get_index(self):
_IndexMetadata object without a specified `index` parameter (See the
_IndexMetadata constructor for more details)
"""
if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID):
if isinstance(self._coord_df_cache, ray.ObjectID):
return self._index_cache
else:
return self._coord_df_cache.index
Expand All @@ -119,7 +119,7 @@ def _set_index(self, new_index):
assert len(new_index) == len(self)

self._index_cache = new_index
if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID):
if isinstance(self._coord_df_cache, ray.ObjectID):
self._cached_index = True
else:
self._coord_df_cache.index = new_index
Expand All @@ -140,7 +140,7 @@ def _get_index_cache(self):
if self._index_cache_validator is None:
self._index_cache_validator = pd.RangeIndex(len(self))
elif isinstance(self._index_cache_validator,
ray.local_scheduler.ObjectID):
ray.ObjectID):
self._index_cache_validator = ray.get(self._index_cache_validator)

return self._index_cache_validator
Expand Down Expand Up @@ -296,11 +296,11 @@ def squeeze(self, partition, index_within_partition):
def copy(self):
# TODO: Investigate copy-on-write wrapper for metadata objects
coord_df_copy = self._coord_df_cache
if not isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID):
if not isinstance(self._coord_df_cache, ray.ObjectID):
coord_df_copy = self._coord_df_cache.copy()

lengths_copy = self._lengths_cache
if not isinstance(self._lengths_cache, ray.local_scheduler.ObjectID):
if not isinstance(self._lengths_cache, ray.ObjectID):
lengths_copy = self._lengths_cache.copy()

index_copy = self._index_cache
Expand Down
15 changes: 7 additions & 8 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ def _object_table(self, object_id):
A dictionary with information about the object ID in question.
"""
# Allow the argument to be either an ObjectID or a hex string.
if not isinstance(object_id, ray.local_scheduler.ObjectID):
object_id = ray.local_scheduler.ObjectID(hex_to_binary(object_id))
if not isinstance(object_id, ray.ObjectID):
object_id = ray.ObjectID(hex_to_binary(object_id))

# Return information about a single object ID.
object_locations = self._execute_command(object_id,
Expand Down Expand Up @@ -297,7 +297,7 @@ def _task_table(self, task_id):
TaskExecutionDependencies.GetRootAsTaskExecutionDependencies(
task_table_message.ExecutionDependencies(), 0))
execution_dependencies = [
ray.local_scheduler.ObjectID(
ray.ObjectID(
execution_dependencies_message.ExecutionDependencies(i))
for i in range(
execution_dependencies_message.ExecutionDependenciesLength())
Expand Down Expand Up @@ -335,15 +335,15 @@ def task_table(self, task_id=None):
"""
self._check_connected()
if task_id is not None:
task_id = ray.local_scheduler.ObjectID(hex_to_binary(task_id))
task_id = ray.ObjectID(hex_to_binary(task_id))
return self._task_table(task_id)
else:
task_table_keys = self._keys(TASK_PREFIX + "*")
results = {}
for key in task_table_keys:
task_id_binary = key[len(TASK_PREFIX):]
results[binary_to_hex(task_id_binary)] = self._task_table(
ray.local_scheduler.ObjectID(task_id_binary))
ray.ObjectID(task_id_binary))
return results

def function_table(self, function_id=None):
Expand Down Expand Up @@ -628,8 +628,7 @@ def micros_rel(ts):
# modify it in place since we will use the original values later.
total_info = copy.copy(task_table[task_id]["TaskSpec"])
total_info["Args"] = [
oid.hex()
if isinstance(oid, ray.local_scheduler.ObjectID) else oid
oid.hex() if isinstance(oid, ray.ObjectID) else oid
for oid in task_t_info["TaskSpec"]["Args"]
]
total_info["ReturnObjectIDs"] = [
Expand Down Expand Up @@ -855,7 +854,7 @@ def micros_rel(ts):
args = task_table[task_id]["TaskSpec"]["Args"]
for arg in args:
# Don't visualize arguments that are not object IDs.
if isinstance(arg, ray.local_scheduler.ObjectID):
if isinstance(arg, ray.ObjectID):
object_info = self._object_table(arg)
# Don't visualize objects that were created by calls to
# put.
Expand Down
158 changes: 158 additions & 0 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import copy
import hashlib
import inspect

import ray.signature

# Default parameters for remote functions.
DEFAULT_REMOTE_FUNCTION_CPUS = 1
DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS = 1
DEFAULT_REMOTE_FUNCTION_MAX_CALLS = 0


def in_ipython():
"""Return true if we are in an IPython interpreter and false otherwise."""
try:
__IPYTHON__
return True
except NameError:
return False


def compute_function_id(function):
"""Compute an function ID for a function.

Args:
func: The actual function.

Returns:
This returns the function ID.
"""
function_id_hash = hashlib.sha1()
# Include the function module and name in the hash.
function_id_hash.update(function.__module__.encode("ascii"))
function_id_hash.update(function.__name__.encode("ascii"))
# If we are running a script or are in IPython, include the source code in
# the hash. If we are in a regular Python interpreter we skip this part
# because the source code is not accessible. If the function is a built-in
# (e.g., Cython), the source code is not accessible.
import __main__ as main
if (hasattr(main, "__file__") or in_ipython()) \
and inspect.isfunction(function):
function_id_hash.update(inspect.getsource(function).encode("ascii"))
# Compute the function ID.
function_id = function_id_hash.digest()
assert len(function_id) == 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 20 a ray constant?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's the size of our IDs. I started fixing this, by replacing it with a variable ray._ID_SIZE. However, this touches many many files so I think I'll submit a separate PR for that (the 20 isn't introduced here, it is just moved around from somewhere else).

function_id = ray.ObjectID(function_id)

return function_id


class RemoteFunction(object):
"""A remote function.

This is a decorated function. It can be used to spawn tasks.

Attributes:
_function: The original function.
_function_id: The ID of the function.
_function_name: The module and function name.
_num_cpus: The default number of CPUs to use for invocations of this
remote function.
_num_gpus: The default number of GPUs to use for invocations of this
remote function.
_resources: The default custom resource requirements for invocations of
this remote function.
_num_return_vals: The default number of return values for invocations
of this remote function.
_max_calls: The number of times a worker can execute this function
before executing.
_function_signature: The function signature.
"""

def __init__(self, function, num_cpus, num_gpus, resources,
num_return_vals, max_calls):
self._function = function
# TODO(rkn): We store the function ID as a string, so that
# RemoteFunction objects can be pickled. We should undo this when
# we allow ObjectIDs to be pickled.
self._function_id = compute_function_id(self._function).id()
self._function_name = (
self._function.__module__ + '.' + self._function.__name__)
self._num_cpus = (DEFAULT_REMOTE_FUNCTION_CPUS
if num_cpus is None else num_cpus)
self._num_gpus = num_gpus
self._resources = resources
self._num_return_vals = (DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS if
num_return_vals is None else num_return_vals)
self._max_calls = (DEFAULT_REMOTE_FUNCTION_MAX_CALLS
if max_calls is None else max_calls)

ray.signature.check_signature_supported(self._function)
self._function_signature = ray.signature.extract_signature(
self._function)

# # Export the function.
worker = ray.worker.get_global_worker()
if worker.mode in [ray.worker.SCRIPT_MODE, ray.worker.SILENT_MODE]:
self._export()
elif worker.mode is None:
worker.cached_remote_functions_and_actors.append(
("remote_function", self))

def __call__(self, *args, **kwargs):
raise Exception("Remote functions cannot be called directly. Instead "
"of running '{}()', try '{}.remote()'.".format(
self._function_name, self._function_name))

def remote(self, *args, **kwargs):
"""This runs immediately when a remote function is called."""
return self._submit(args=args, kwargs=kwargs)

def _submit(self,
args=None,
kwargs=None,
num_return_vals=None,
num_cpus=None,
num_gpus=None,
resources=None):
"""An experimental alternate way to submit remote functions."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that this is marked experimental, is this used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in Pandas on Ray (to change the number of return values), and I think it will be used more in the future (e.g., to specify different resource requirements at runtime).

worker = ray.worker.get_global_worker()
worker.check_connected()
ray.worker.check_main_thread()
kwargs = {} if kwargs is None else kwargs
args = ray.signature.extend_args(self._function_signature, args,
kwargs)

if num_return_vals is None:
num_return_vals = self._num_return_vals

resources = ray.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._resources, num_cpus,
num_gpus, resources)
if worker.mode == ray.worker.PYTHON_MODE:
# In PYTHON_MODE, remote calls simply execute the function.
# We copy the arguments to prevent the function call from
# mutating them and to match the usual behavior of
# immutable remote objects.
result = self._function(*copy.deepcopy(args))
return result
object_ids = worker.submit_task(
ray.ObjectID(self._function_id),
args,
num_return_vals=num_return_vals,
resources=resources)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids

def _export(self):
worker = ray.worker.get_global_worker()
worker.export_remote_function(
ray.ObjectID(self._function_id), self._function_name,
self._function, self._max_calls, self)
53 changes: 46 additions & 7 deletions python/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import print_function

import binascii
import collections
import hashlib
import numpy as np
import os
Expand Down Expand Up @@ -125,7 +124,7 @@ def decode(byte_str):


def binary_to_object_id(binary_object_id):
return ray.local_scheduler.ObjectID(binary_object_id)
return ray.ObjectID(binary_object_id)


def binary_to_hex(identifier):
Expand All @@ -139,11 +138,6 @@ def hex_to_binary(hex_identifier):
return binascii.unhexlify(hex_identifier)


FunctionProperties = collections.namedtuple(
"FunctionProperties", ["num_return_vals", "resources", "max_calls"])
"""FunctionProperties: A named tuple storing remote functions information."""


def get_cuda_visible_devices():
"""Get the device IDs in the CUDA_VISIBLE_DEVICES environment variable.

Expand All @@ -169,3 +163,48 @@ def set_cuda_visible_devices(gpu_ids):
gpu_ids: This is a list of integers representing GPU IDs.
"""
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids])


def resources_from_resource_arguments(default_num_cpus, default_num_gpus,
default_resources, runtime_num_cpus,
runtime_num_gpus, runtime_resources):
"""Determine a task's resource requirements.

Args:
default_num_cpus: The default number of CPUs required by this function
or actor method.
default_num_gpus: The default number of GPUs required by this function
or actor method.
default_resources: The default custom resources required by this
function or actor method.
runtime_num_cpus: The number of CPUs requested when the task was
invoked.
runtime_num_gpus: The number of GPUs requested when the task was
invoked.
runtime_resources: The custom resources requested when the task was
invoked.

Returns:
A dictionary of the resource requirements for the task.
"""
if runtime_resources is not None:
resources = runtime_resources.copy()
elif default_resources is not None:
resources = default_resources.copy()
else:
resources = {}

if "CPU" in resources or "GPU" in resources:
raise ValueError("The resources dictionary must not "
"contain the key 'CPU' or 'GPU'")

assert default_num_cpus is not None
resources["CPU"] = (default_num_cpus
if runtime_num_cpus is None else runtime_num_cpus)

if runtime_num_gpus is not None:
resources["GPU"] = runtime_num_gpus
elif default_num_gpus is not None:
resources["GPU"] = default_num_gpus

return resources
Loading