Skip to content

Commit

Permalink
[core] Small optimizations from accelerated DAG experiment (#41418)
Browse files Browse the repository at this point in the history
This ports a few micro-optimizations from the accelerated DAG experiment branch (https://github.com/ray-project/ray/pull/40991/files). The optimizations here help the most for accelerated DAG programs, but also help a bit to reduce get()/put() call overheads for normal Ray programs:

Before:
55% of get() time in plasma:: code
54% of put() time in plasma:: code

After:
62% of get() time in plasma:: code
76% of put() time in plasma:: code
  • Loading branch information
ericl authored Nov 30, 2023
1 parent 53d5e2e commit 2287bea
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 28 deletions.
6 changes: 2 additions & 4 deletions python/ray/_private/auto_init_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
import threading

auto_init_lock = threading.Lock()
enable_auto_connect = os.environ.get("RAY_ENABLE_AUTO_CONNECT", "") != "0"


def auto_init_ray():
if (
os.environ.get("RAY_ENABLE_AUTO_CONNECT", "") != "0"
and not ray.is_initialized()
):
if enable_auto_connect and not ray.is_initialized():
auto_init_lock.acquire()
if not ray.is_initialized():
ray.init()
Expand Down
12 changes: 11 additions & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ def __init__(self):
self._filter_logs_by_job = True
# the debugger port for this worker
self._debugger_port = None
# Cache the job id from initialize_job_config() to optimize lookups.
# This is on the critical path of ray.get()/put() calls.
self._cached_job_id = None

@property
def connected(self):
Expand All @@ -488,7 +491,9 @@ def load_code_from_local(self):

@property
def current_job_id(self):
if hasattr(self, "core_worker"):
if self._cached_job_id is not None:
return self._cached_job_id
elif hasattr(self, "core_worker"):
return self.core_worker.get_current_job_id()
return JobID.nil()

Expand Down Expand Up @@ -554,6 +559,10 @@ def set_debugger_port(self, port):
worker_id = self.core_worker.get_worker_id()
ray._private.state.update_worker_debugger_port(worker_id, port)

def set_cached_job_id(self, job_id):
"""Set the cached job id to speed `current_job_id()`."""
self._cached_job_id = job_id

@contextmanager
def task_paused_by_debugger(self):
"""Use while the task is paused by debugger"""
Expand Down Expand Up @@ -1805,6 +1814,7 @@ def shutdown(_exiting_interpreter: bool = False):
# TODO(rkn): Instead of manually resetting some of the worker fields, we
# should simply set "global_worker" to equal "None" or something like that.
global_worker.set_mode(None)
global_worker.set_cached_job_id(None)


atexit.register(shutdown, True)
Expand Down
6 changes: 5 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2469,10 +2469,14 @@ def maybe_initialize_job_config():
for p in py_driver_sys_path:
sys.path.insert(0, p)

# Cache and set the current job id.
job_id = core_worker.get_current_job_id()
ray._private.worker.global_worker.set_cached_job_id(job_id)

# Record the task name via :task_name: magic token in the log file.
# This is used for the prefix in driver logs `(task_name pid=123) ...`
job_id_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_JOB_ID, core_worker.get_current_job_id().hex())
ray_constants.LOG_PREFIX_JOB_ID, job_id.hex())
# Print on both .out and .err
print(job_id_magic_token, end="")
print(job_id_magic_token, file=sys.stderr, end="")
Expand Down
9 changes: 6 additions & 3 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,12 +703,15 @@ def enable_pickle_debug():


@pytest.fixture
def set_enable_auto_connect(enable_auto_connect: str = "0"):
def set_enable_auto_connect(enable_auto_connect: bool = False):
from ray._private import auto_init_hook

try:
os.environ["RAY_ENABLE_AUTO_CONNECT"] = enable_auto_connect
old_value = auto_init_hook.enable_auto_connect
auto_init_hook.enable_auto_connect = enable_auto_connect
yield enable_auto_connect
finally:
del os.environ["RAY_ENABLE_AUTO_CONNECT"]
auto_init_hook.enable_auto_connect = old_value


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
pytest_timeout = None


@pytest.mark.parametrize("set_enable_auto_connect", ["1", "0"], indirect=True)
@pytest.mark.parametrize("set_enable_auto_connect", [True, False], indirect=True)
def test_caching_actors(shutdown_only, set_enable_auto_connect):
# Test defining actors before ray.init() has been called.

Expand All @@ -41,7 +41,7 @@ def __init__(self):
def get_val(self):
return 3

if set_enable_auto_connect == "0":
if not set_enable_auto_connect:
# Check that we can't actually create actors before ray.init() has
# been called.
with pytest.raises(Exception):
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import threading
import time
from unittest.mock import Mock, patch
from unittest.mock import Mock
from typing import Type

import numpy as np
Expand Down Expand Up @@ -671,8 +671,8 @@ def stop_server(server):
time.sleep(3)


@patch.dict(os.environ, {"RAY_ENABLE_AUTO_CONNECT": "0"})
def test_client_gpu_ids(call_ray_start_shared):
@pytest.mark.parametrize("set_enable_auto_connect", [True], indirect=True)
def test_client_gpu_ids(call_ray_start_shared, set_enable_auto_connect):
import ray

with enable_client_mode():
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_client_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ def get(idx):
# The following should be removed after
# https://github.com/ray-project/ray/issues/20355
# is fixed.
os.environ["RAY_ENABLE_AUTO_CONNECT"] = "0"
from ray._private import auto_init_hook

auto_init_hook.enable_auto_connect = False
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/workflow/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import functools
import logging
from typing import Dict, Set, List, Tuple, Union, Optional, Any
import os
import time
import uuid

Expand Down Expand Up @@ -101,8 +100,9 @@ def client_mode_wrap(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
from ray._private.client_mode_hook import client_mode_should_convert
from ray._private.auto_init_hook import enable_auto_connect

if os.environ.get("RAY_ENABLE_AUTO_CONNECT", "") != "0":
if enable_auto_connect:
_ensure_workflow_initialized()

# `is_client_mode_enabled_by_default` is used for testing with
Expand Down
24 changes: 18 additions & 6 deletions src/ray/common/ray_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include "msgpack.hpp"

namespace {

static const std::string kObjectInPlasmaStr =
std::to_string(ray::rpc::ErrorType::OBJECT_IN_PLASMA);

std::shared_ptr<ray::LocalMemoryBuffer> MakeBufferFromString(const uint8_t *data,
size_t data_size) {
auto metadata = const_cast<uint8_t *>(data);
Expand Down Expand Up @@ -102,12 +106,20 @@ RayObject::RayObject(rpc::ErrorType error_type, const rpc::RayErrorInfo *ray_err
}

bool RayObject::IsException(rpc::ErrorType *error_type) const {
if (metadata_ == nullptr) {
// For performance, assume metadata of >2 chars (e.g., "PYTHON"), is not an error.
static_assert(ray::rpc::ErrorType_MAX < 100);
if (metadata_ == nullptr || metadata_->Size() > 2) {
return false;
}
// TODO (kfstorm): metadata should be structured.
const std::string metadata(reinterpret_cast<const char *>(metadata_->Data()),
metadata_->Size());
const std::string_view metadata(reinterpret_cast<const char *>(metadata_->Data()),
metadata_->Size());
if (metadata == kObjectInPlasmaStr) {
if (error_type) {
*error_type = rpc::ErrorType::OBJECT_IN_PLASMA;
}
return true;
}
const auto error_type_descriptor = ray::rpc::ErrorType_descriptor();
for (int i = 0; i < error_type_descriptor->value_count(); i++) {
const auto error_type_number = error_type_descriptor->value(i)->number();
Expand All @@ -125,9 +137,9 @@ bool RayObject::IsInPlasmaError() const {
if (metadata_ == nullptr) {
return false;
}
const std::string metadata(reinterpret_cast<const char *>(metadata_->Data()),
metadata_->Size());
return metadata == std::to_string(ray::rpc::ErrorType::OBJECT_IN_PLASMA);
const std::string_view metadata(reinterpret_cast<const char *>(metadata_->Data()),
metadata_->Size());
return metadata == kObjectInPlasmaStr;
}

} // namespace ray
6 changes: 3 additions & 3 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,12 @@ void ReferenceCounter::RemoveLocalReferenceInternal(const ObjectID &object_id,
RAY_CHECK(!object_id.IsNil());
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to decrease ref count for nonexistent object ID: "
<< object_id;
RAY_LOG_EVERY_MS(WARNING, 5000)
<< "Tried to decrease ref count for nonexistent object ID: " << object_id;
return;
}
if (it->second.local_ref_count == 0) {
RAY_LOG(WARNING)
RAY_LOG_EVERY_MS(WARNING, 5000)
<< "Tried to decrease ref count for object ID that has count 0 " << object_id
<< ". This should only happen if ray.internal.free was called earlier.";
return;
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/store_provider/plasma_store_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
<< "is full. Object size is " << data_size << " bytes.";
status = Status::ObjectStoreFull(message.str());
} else if (status.IsObjectExists()) {
RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: "
<< object_id << ".";
RAY_LOG_EVERY_MS(WARNING, 5000)
<< "Trying to put an object that already existed in plasma: " << object_id << ".";
status = Status::OK();
} else {
RAY_RETURN_NOT_OK(status);
Expand Down
1 change: 1 addition & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ enum ErrorType {
// Indicates that the object has been placed in plasma. This error shouldn't
// ever be exposed to user code; it is only used internally to indicate the
// result of a direct call has been placed in plasma.
// IMPORTANT: Keep the enum index "4" in sync with ray_object.cc.
OBJECT_IN_PLASMA = 4;
// Indicates that an object has been cancelled.
TASK_CANCELLED = 5;
Expand Down

0 comments on commit 2287bea

Please sign in to comment.