Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
e.args += (helpful_message,)
raise

from ray.local_scheduler import _config # noqa: E402
from ray.worker import (error_info, init, connect, disconnect,
get, put, wait, remote, log_event, log_span,
flush_log, get_gpu_ids, get_webui_url,
Expand All @@ -59,7 +60,7 @@
"remote", "log_event", "log_span", "flush_log", "actor",
"get_gpu_ids", "get_webui_url", "register_custom_serializer",
"SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE",
"global_state", "__version__"]
"global_state", "_config", "__version__"]

import ctypes # noqa: E402
# Windows only
Expand Down
5 changes: 3 additions & 2 deletions python/ray/local_scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

from ray.core.src.local_scheduler.liblocal_scheduler_library import (
Task, LocalSchedulerClient, ObjectID, check_simple_value, task_from_string,
task_to_string)
task_to_string, _config)
from .local_scheduler_services import start_local_scheduler

__all__ = ["Task", "LocalSchedulerClient", "ObjectID", "check_simple_value",
"task_from_string", "task_to_string", "start_local_scheduler"]
"task_from_string", "task_to_string", "start_local_scheduler",
"_config"]
6 changes: 2 additions & 4 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

# These variables must be kept in sync with the C codebase.
# common/common.h
HEARTBEAT_TIMEOUT_MILLISECONDS = 100
NUM_HEARTBEATS_TIMEOUT = 100
DB_CLIENT_ID_SIZE = 20
NIL_ID = b"\xff" * DB_CLIENT_ID_SIZE

Expand Down Expand Up @@ -580,7 +578,7 @@ def run(self):
plasma_manager_ids = list(self.live_plasma_managers.keys())
for plasma_manager_id in plasma_manager_ids:
if ((self.live_plasma_managers[plasma_manager_id]) >=
NUM_HEARTBEATS_TIMEOUT):
ray._config.num_heartbeats_timeout()):
log.warn("Timed out {}".format(PLASMA_MANAGER_CLIENT_TYPE))
# Remove the plasma manager from the managers whose
# heartbeats we're tracking.
Expand All @@ -599,7 +597,7 @@ def run(self):

# Wait for a heartbeat interval before processing the next round of
# messages.
time.sleep(HEARTBEAT_TIMEOUT_MILLISECONDS * 1e-3)
time.sleep(ray._config.heartbeat_timeout_milliseconds() * 1e-3)


if __name__ == "__main__":
Expand Down
29 changes: 15 additions & 14 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@
NIL_FUNCTION_ID = NIL_ID
NIL_ACTOR_ID = NIL_ID

# When performing ray.get, wait 1 second before attemping to reconstruct and
# fetch the object again.
GET_TIMEOUT_MILLISECONDS = 1000

# This must be kept in sync with the `error_types` array in
# common/state/error_table.h.
OBJECT_HASH_MISMATCH_ERROR_TYPE = b"object_hash_mismatch"
Expand Down Expand Up @@ -372,10 +368,11 @@ def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10):
# long time, if the store is blocked, it can block the manager
# as well as a consequence.
results = []
get_request_size = 10000
for i in range(0, len(object_ids), get_request_size):
for i in range(0, len(object_ids),
ray._config.worker_get_request_size()):
results += self.plasma_client.get(
object_ids[i:(i + get_request_size)],
object_ids[i:(i +
ray._config.worker_get_request_size())],
timeout,
self.serialization_context)
return results
Expand Down Expand Up @@ -420,12 +417,13 @@ def get_object(self, object_ids):
# Do an initial fetch for remote objects. We divide the fetch into
# smaller fetches so as to not block the manager for a prolonged period
# of time in a single call.
fetch_request_size = 10000
plain_object_ids = [plasma.ObjectID(object_id.id())
for object_id in object_ids]
for i in range(0, len(object_ids), fetch_request_size):
for i in range(0, len(object_ids),
ray._config.worker_fetch_request_size()):
self.plasma_client.fetch(
plain_object_ids[i:(i + fetch_request_size)])
plain_object_ids[i:(i +
ray._config.worker_fetch_request_size())])

# Get the objects. We initially try to get the objects immediately.
final_results = self.retrieve_and_deserialize(plain_object_ids, 0)
Expand All @@ -436,7 +434,7 @@ def get_object(self, object_ids):
if val is plasma.ObjectNotAvailable)
was_blocked = (len(unready_ids) > 0)
# Try reconstructing any objects we haven't gotten yet. Try to get them
# until at least GET_TIMEOUT_MILLISECONDS milliseconds passes, then
# until at least get_timeout_milliseconds milliseconds passes, then
# repeat.
while len(unready_ids) > 0:
for unready_id in unready_ids:
Expand All @@ -447,12 +445,15 @@ def get_object(self, object_ids):
# prolonged period of time in a single call.
object_ids_to_fetch = list(map(
plasma.ObjectID, unready_ids.keys()))
for i in range(0, len(object_ids_to_fetch), fetch_request_size):
for i in range(0, len(object_ids_to_fetch),
ray._config.worker_fetch_request_size()):
self.plasma_client.fetch(
object_ids_to_fetch[i:(i + fetch_request_size)])
object_ids_to_fetch[i:(
i + ray._config.worker_fetch_request_size())])
results = self.retrieve_and_deserialize(
object_ids_to_fetch,
max([GET_TIMEOUT_MILLISECONDS, int(0.01 * len(unready_ids))]))
max([ray._config.get_timeout_milliseconds(),
int(0.01 * len(unready_ids))]))
# Remove any entries for objects we received during this iteration
# so we don't retrieve the same object twice.
for i, val in enumerate(results):
Expand Down
8 changes: 1 addition & 7 deletions src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@ extern "C" {
#include "plasma/common.h"
#include "arrow/util/macros.h"

/** The duration between heartbeats. These are sent by the plasma manager and
* local scheduler. */
#define HEARTBEAT_TIMEOUT_MILLISECONDS 100
/** If a component has not sent a heartbeat in the last NUM_HEARTBEATS_TIMEOUT
* heartbeat intervals, the global scheduler or monitor process will report it
* as dead to the db_client table. */
#define NUM_HEARTBEATS_TIMEOUT 100
#include "state/ray_config.h"

/** Definitions for Ray logging levels. */
#define RAY_COMMON_DEBUG 0
Expand Down
14 changes: 7 additions & 7 deletions src/common/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ int connect_ipc_sock_retry(const char *socket_pathname,
int64_t timeout) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
num_retries = RayConfig::instance().num_connect_attempts();
}
if (timeout < 0) {
timeout = CONNECT_TIMEOUT_MS;
timeout = RayConfig::instance().connect_timeout_milliseconds();
}

CHECK(socket_pathname);
Expand Down Expand Up @@ -163,10 +163,10 @@ int connect_inet_sock_retry(const char *ip_addr,
int64_t timeout) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
num_retries = RayConfig::instance().num_connect_attempts();
}
if (timeout < 0) {
timeout = CONNECT_TIMEOUT_MS;
timeout = RayConfig::instance().connect_timeout_milliseconds();
}

CHECK(ip_addr);
Expand Down Expand Up @@ -251,7 +251,7 @@ int write_bytes(int fd, uint8_t *cursor, size_t length) {
}

int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) {
int64_t version = RAY_PROTOCOL_VERSION;
int64_t version = RayConfig::instance().ray_protocol_version();
int closed;
closed = write_bytes(fd, (uint8_t *) &version, sizeof(version));
if (closed) {
Expand Down Expand Up @@ -302,7 +302,7 @@ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) {
if (closed) {
goto disconnected;
}
CHECK(version == RAY_PROTOCOL_VERSION);
CHECK(version == RayConfig::instance().ray_protocol_version());
closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
if (closed) {
goto disconnected;
Expand Down Expand Up @@ -359,7 +359,7 @@ int64_t read_vector(int fd, int64_t *type, std::vector<uint8_t> &buffer) {
if (closed) {
goto disconnected;
}
CHECK(version == RAY_PROTOCOL_VERSION);
CHECK(version == RayConfig::instance().ray_protocol_version());
int64_t length;
closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
if (closed) {
Expand Down
20 changes: 6 additions & 14 deletions src/common/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,6 @@

#include <vector>

#define RAY_PROTOCOL_VERSION 0x0000000000000000

/* Number of times we try binding to a socket. */
#define NUM_BIND_ATTEMPTS 5
#define BIND_TIMEOUT_MS 100
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NUM_BIND_ATTEMPTS and BIND_TIMEOUT_MS were not used anywhere.


/* Number of times we try connecting to a socket. */
#define NUM_CONNECT_ATTEMPTS 50
#define CONNECT_TIMEOUT_MS 100

struct aeEventLoop;
typedef aeEventLoop event_loop;

Expand Down Expand Up @@ -74,9 +64,10 @@ int connect_ipc_sock(const char *socket_pathname);
* @param socket_pathname The pathname for the socket.
* @param num_retries The number of times to retry the connection
* before exiting. If -1 is provided, then this defaults to
* NUM_CONNECT_ATTEMPTS.
* num_connect_attempts.
* @param timeout The number of milliseconds to wait in between
* retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS.
* retries. If -1 is provided, then this defaults to
* connect_timeout_milliseconds.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_ipc_sock_retry(const char *socket_pathname,
Expand All @@ -102,9 +93,10 @@ int connect_inet_sock(const char *ip_addr, int port);
* @param port The port number to connect to.
* @param num_retries The number of times to retry the connection
* before exiting. If -1 is provided, then this defaults to
* NUM_CONNECT_ATTEMPTS.
* num_connect_attempts.
* @param timeout The number of milliseconds to wait in between
* retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS.
* retries. If -1 is provided, then this defaults to
* connect_timeout_milliseconds.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_inet_sock_retry(const char *ip_addr,
Expand Down
29 changes: 17 additions & 12 deletions src/common/lib/python/common_extension.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,6 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) {

/* Define the methods for the module. */

#define SIZE_LIMIT 100
#define NUM_ELEMENTS_LIMIT 1000

#if PY_MAJOR_VERSION >= 3
#define PyInt_Check PyLong_Check
#endif
Expand All @@ -531,7 +528,7 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) {
*/
int is_simple_value(PyObject *value, int *num_elements_contained) {
*num_elements_contained += 1;
if (*num_elements_contained >= NUM_ELEMENTS_LIMIT) {
if (*num_elements_contained >= RayConfig::instance().num_elements_limit()) {
return 0;
}
if (PyInt_Check(value) || PyLong_Check(value) || value == Py_False ||
Expand All @@ -540,21 +537,26 @@ int is_simple_value(PyObject *value, int *num_elements_contained) {
}
if (PyBytes_CheckExact(value)) {
*num_elements_contained += PyBytes_Size(value);
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyUnicode_CheckExact(value)) {
*num_elements_contained += PyUnicode_GET_SIZE(value);
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyList_CheckExact(value) && PyList_Size(value) < SIZE_LIMIT) {
if (PyList_CheckExact(value) &&
PyList_Size(value) < RayConfig::instance().size_limit()) {
for (Py_ssize_t i = 0; i < PyList_Size(value); ++i) {
if (!is_simple_value(PyList_GetItem(value, i), num_elements_contained)) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyDict_CheckExact(value) && PyDict_Size(value) < SIZE_LIMIT) {
if (PyDict_CheckExact(value) &&
PyDict_Size(value) < RayConfig::instance().size_limit()) {
PyObject *key, *val;
Py_ssize_t pos = 0;
while (PyDict_Next(value, &pos, &key, &val)) {
Expand All @@ -563,15 +565,18 @@ int is_simple_value(PyObject *value, int *num_elements_contained) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyTuple_CheckExact(value) && PyTuple_Size(value) < SIZE_LIMIT) {
if (PyTuple_CheckExact(value) &&
PyTuple_Size(value) < RayConfig::instance().size_limit()) {
for (Py_ssize_t i = 0; i < PyTuple_Size(value); ++i) {
if (!is_simple_value(PyTuple_GetItem(value, i), num_elements_contained)) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
return 0;
}
Expand Down
2 changes: 0 additions & 2 deletions src/common/lib/python/common_extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ PyObject *check_simple_value(PyObject *self, PyObject *args);
PyObject *PyTask_to_string(PyObject *, PyObject *args);
PyObject *PyTask_from_string(PyObject *, PyObject *args);

PyObject *compute_put_id(PyObject *self, PyObject *args);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because this was unused.

PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size);

#endif /* COMMON_EXTENSION_H */
Loading