Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Object Spilling] Remove retries and use a timer instead. #13175

Merged
merged 10 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Remove retries and use a timer instead.
  • Loading branch information
rkooo567 committed Jan 4, 2021
commit b4e4bc6a5f200fb2252f48a618471ed4dd0800d1
2 changes: 1 addition & 1 deletion python/ray/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(self,
raise Exception(
"Object pinning cannot be enabled if using LRU eviction.")
self._system_config["object_pinning_enabled"] = False
self._system_config["object_store_full_max_retries"] = -1
self._system_config["oom_grace_period_ns"] = -1
self._system_config["free_objects_period_milliseconds"] = 1000

# Set the internal config options for object reconstruction.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def get_default_fixure_system_config():
system_config = {
"object_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
"object_store_full_max_retries": 3,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
}
return system_config
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ def test_connect_with_disconnected_node(shutdown_only):
"num_cpus": 5,
"object_store_memory": 10**8,
"_system_config": {
"object_store_full_max_retries": 0
"oom_grace_period_ns": 0
}
}],
indirect=True)
Expand All @@ -1045,7 +1045,7 @@ def test_fill_object_store_exception(shutdown_only):
ray.init(
num_cpus=2,
object_store_memory=10**8,
_system_config={"object_store_full_max_retries": 0})
_system_config={"oom_grace_period_ns": 0})

@ray.remote
def expensive_task():
Expand Down
20 changes: 10 additions & 10 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only):
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
Expand Down Expand Up @@ -116,7 +116,7 @@ def is_dir_empty():
"object_store_memory": 75 * 1024 * 1024,
"_system_config": {
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"max_io_workers": 4,
"object_spilling_config": json.dumps({
Expand Down Expand Up @@ -169,7 +169,7 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0
Expand Down Expand Up @@ -210,7 +210,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
# NOTE(swang): Use infinite retries because the OOM timer can still
# get accidentally triggered when objects are released too slowly
# (see github.com/ray-project/ray/issues/12040).
"object_store_full_max_retries": -1,
"oom_grace_period_ns": -1,
"max_io_workers": 1,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
Expand Down Expand Up @@ -242,7 +242,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
_system_config={
"max_io_workers": 1,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
Expand Down Expand Up @@ -276,7 +276,7 @@ def test_delete_objects(tmp_path, shutdown_only):
"max_io_workers": 1,
"min_spilling_size": 0,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
Expand Down Expand Up @@ -319,7 +319,7 @@ def test_delete_objects_delete_while_creating(tmp_path, shutdown_only):
"max_io_workers": 4,
"min_spilling_size": 0,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
Expand Down Expand Up @@ -369,7 +369,7 @@ def test_delete_objects_on_worker_failure(tmp_path, shutdown_only):
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
Expand Down Expand Up @@ -445,7 +445,7 @@ def test_delete_objects_multi_node(tmp_path, ray_start_cluster):
"max_io_workers": 2,
"min_spilling_size": 20 * 1024 * 1024,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
Expand Down Expand Up @@ -519,7 +519,7 @@ def test_fusion_objects(tmp_path, shutdown_only):
_system_config={
"max_io_workers": 3,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"oom_grace_period_ns": 1e9,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_reference_counting_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@pytest.fixture
def one_worker_100MiB(request):
config = {
"object_store_full_max_retries": 2,
"oom_grace_period_ns": 1e9,
"task_retry_delay_ms": 0,
"object_timeout_milliseconds": 1000,
}
Expand Down
7 changes: 4 additions & 3 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,6 @@ RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000)
/// The interval at which the gcs server will print debug info.
RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1)

/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
RAY_CONFIG(int32_t, object_store_full_max_retries, 1000)
/// Duration to sleep after failing to put an object in plasma because it is full.
RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)

Expand Down Expand Up @@ -363,3 +360,7 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)
/// When it is true, manual (force) spilling is not available.
/// TODO(sang): Fix it.
RAY_CONFIG(bool, automatic_object_deletion_enabled, true)

/// Grace period until we throw the OOM error to the application.
/// -1 means grace period is infinite.
RAY_CONFIG(int64_t, oom_grace_period_ns, 10e9)
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
23 changes: 7 additions & 16 deletions src/ray/object_manager/plasma/create_request_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,38 +83,32 @@ std::pair<PlasmaObject, PlasmaError> CreateRequestQueue::TryRequestImmediately(
bool CreateRequestQueue::ProcessRequest(std::unique_ptr<CreateRequest> &request) {
// Return an OOM error to the client if we have hit the maximum number of
// retries.
// TODO(sang): Delete this logic?
bool evict_if_full = evict_if_full_;
if (max_retries_ == 0) {
// If we cannot retry, then always evict on the first attempt.
evict_if_full = true;
} else if (num_retries_ > 0) {
// Always try to evict after the first attempt.
evict_if_full = true;
}
request->error = request->create_callback(evict_if_full, &request->result);
request->error =
request->create_callback(/*evict_if_full=*/evict_if_full_, &request->result);
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
return request->error != PlasmaError::OutOfMemory;
}

Status CreateRequestQueue::ProcessRequests() {
while (!queue_.empty()) {
auto request_it = queue_.begin();
auto create_ok = ProcessRequest(*request_it);
auto now = timer_func_();
if (create_ok) {
FinishRequest(request_it);
last_success_ns_ = now;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to assign success here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise it can OOM without any grace period if the spilling is not invoked. (since the last success_ns is not renewed at all)

Then

We need a grace period since (1) global GC takes a bit of time to
        // kick in

is not reflected. Let me know if I am missing something!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be setting last_success_ns_ in the else branch? It seems like we need another state like "waiting for OOM" so we know whether to set vs check the last_success_ns_.

And we should add a unit test for this case because I think it's currently broken:

  1. Add a create request that succeeds.
  2. More than the oom_grace_period later, add another create request that fails. Check that this doesn't raise OOM <-- I think this will throw OOM right now.

Copy link
Contributor

@ericl ericl Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also set it if spilling is successful, so it's set whenever a creation/spill succeeds. If neither has succeeded for the grace period, OOM is raised.

So the case raised above is fine--- spilling is attempted in the second create case, and that resets the timer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the spill reset is enough because it won't happen if the spill callback is unsuccessful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want to reset it on OOM right? Otherwise, it will take a long time to OOM many objects (e.g., 10 seconds * N objects)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check code again (I reflected the comment.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note; I am using -1 as a default value of oom_start_time_ns_ instead of 0 because otherwise, tests will fail (since the initial fake clock time is 0).

Copy link
Contributor Author

@rkooo567 rkooo567 Jan 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, added a test case that checks if objects are successfully created after there was no new object created for a long time.

} else {
if (trigger_global_gc_) {
trigger_global_gc_();
}

if (spill_objects_callback_()) {
last_success_ns_ = timer_func_();
return Status::TransientObjectStoreFull("Waiting for spilling.");
} else if (num_retries_ < max_retries_ || max_retries_ == -1) {
} else if (oom_grace_period_ns_ == -1 // meaning "wait unlimitedly".
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
|| now - last_success_ns_ < oom_grace_period_ns_) {
// We need a grace period since (1) global GC takes a bit of time to
// kick in, and (2) there is a race between spilling finishing and space
// actually freeing up in the object store.
// If max_retries == -1, we retry infinitely.
num_retries_ += 1;
return Status::ObjectStoreFull("Waiting for grace period.");
} else {
// Raise OOM. In this case, the request will be marked as OOM.
Expand All @@ -135,9 +129,6 @@ void CreateRequestQueue::FinishRequest(
RAY_CHECK(it->second == nullptr);
it->second = std::move(request);
queue_.erase(request_it);

// Reset the number of retries since we are no longer trying this request.
num_retries_ = 0;
}

void CreateRequestQueue::RemoveDisconnectedClientRequests(
Expand Down
33 changes: 20 additions & 13 deletions src/ray/object_manager/plasma/create_request_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ class CreateRequestQueue {
using CreateObjectCallback =
std::function<PlasmaError(bool evict_if_full, PlasmaObject *result)>;

CreateRequestQueue(int32_t max_retries, bool evict_if_full,
CreateRequestQueue(bool evict_if_full, int64_t oom_grace_period_ns,
ray::SpillObjectsCallback spill_objects_callback,
std::function<void()> trigger_global_gc)
: max_retries_(max_retries),
evict_if_full_(evict_if_full),
std::function<void()> trigger_global_gc,
std::function<int64_t()> timer_func)
: evict_if_full_(evict_if_full),
oom_grace_period_ns_(oom_grace_period_ns),
spill_objects_callback_(spill_objects_callback),
trigger_global_gc_(trigger_global_gc) {
RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with " << max_retries_
<< " retries on OOM, evict if full? " << (evict_if_full_ ? 1 : 0);
trigger_global_gc_(trigger_global_gc),
timer_func_(timer_func) {
RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with OOM grace period "
<< oom_grace_period_ns_ << " retries on OOM, evict if full? "
<< (evict_if_full_ ? 1 : 0);
}

/// Add a request to the queue. The caller should use the returned request ID
Expand Down Expand Up @@ -148,17 +151,15 @@ class CreateRequestQueue {
/// a request by retrying. Start at 1 because 0 means "do not retry".
uint64_t next_req_id_ = 1;

/// The maximum number of times to retry each request upon OOM.
const int32_t max_retries_;

/// The number of times the request at the head of the queue has been tried.
int32_t num_retries_ = 0;

/// On the first attempt to create an object, whether to evict from the
/// object store to make space. If the first attempt fails, then we will
/// always try to evict.
const bool evict_if_full_;

/// Grace period until we throw the OOM error to the application.
/// -1 means grace period is infinite.
const int64_t oom_grace_period_ns_ = 10e9;

/// A callback to trigger object spilling. It tries to spill objects upto max
/// throughput. It returns true if space is made by object spilling, and false if
/// there's no more space to be made.
Expand All @@ -168,6 +169,9 @@ class CreateRequestQueue {
/// full.
const std::function<void()> trigger_global_gc_;

/// A callback to return the current time in nano seconds.
const std::function<int64_t()> timer_func_;

/// Queue of object creation requests to respond to. Requests will be placed
/// on this queue if the object store does not have enough room at the time
/// that the client made the creation request, but space may be made through
Expand All @@ -189,6 +193,9 @@ class CreateRequestQueue {
/// Last time global gc was invoked in ms.
uint64_t last_global_gc_ms_;

/// Last successful object creation or spill invocation.
int64_t last_success_ns_ = 0;

friend class CreateRequestQueueTest;
};

Expand Down
5 changes: 3 additions & 2 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire
usage_log_interval_ns_(RayConfig::instance().object_store_usage_log_interval_s() *
1e9),
create_request_queue_(
RayConfig::instance().object_store_full_max_retries(),
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
spill_objects_callback, object_store_full_callback) {
/*oom_grace_period_ns=*/RayConfig::instance().oom_grace_period_ns(),
spill_objects_callback, object_store_full_callback,
/*timer_func=*/[]() { return absl::GetCurrentTimeNanos(); }) {
store_info_.directory = directory;
store_info_.hugepages_enabled = hugepages_enabled;
}
Expand Down