Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Object Spilling] Remove retries and use a timer instead. #13175

Merged
merged 10 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed code review.
  • Loading branch information
rkooo567 committed Jan 18, 2021
commit cfe5c588df8141f99808558b89924be6610bebd7
21 changes: 15 additions & 6 deletions src/ray/object_manager/plasma/create_request_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@ std::pair<PlasmaObject, PlasmaError> CreateRequestQueue::TryRequestImmediately(
}

bool CreateRequestQueue::ProcessRequest(std::unique_ptr<CreateRequest> &request) {
// Return an OOM error to the client if we have hit the maximum number of
// retries.
// TODO(sang): Delete this logic when lru evict is removed.
bool evict_if_full = evict_if_full_;
if (oom_start_time_ns_ != -1) {
// If the first attempt fails, we set the evict_if_full true.
// We need this logic because if lru_evict flag is on, this is false because we
// shouldn't evict objects in the first attempt.
evict_if_full = true;
}
request->error =
request->create_callback(/*evict_if_full=*/evict_if_full_, &request->result);
request->create_callback(/*evict_if_full=*/evict_if_full, &request->result);
return request->error != PlasmaError::OutOfMemory;
}

Expand All @@ -95,16 +101,19 @@ Status CreateRequestQueue::ProcessRequests() {
auto now = get_time_();
if (create_ok) {
FinishRequest(request_it);
last_success_ns_ = now;
// Reset the oom start time since the creation succeeds.
oom_start_time_ns_ = -1;
} else {
if (trigger_global_gc_) {
trigger_global_gc_();
}

if (oom_start_time_ns_ == -1) {
oom_start_time_ns_ = now;
}
if (spill_objects_callback_()) {
last_success_ns_ = now;
return Status::TransientObjectStoreFull("Waiting for spilling.");
} else if (now - last_success_ns_ < oom_grace_period_ns_) {
} else if (now - oom_start_time_ns_ < oom_grace_period_ns_) {
// We need a grace period since (1) global GC takes a bit of time to
// kick in, and (2) there is a race between spilling finishing and space
// actually freeing up in the object store.
Expand Down
4 changes: 2 additions & 2 deletions src/ray/object_manager/plasma/create_request_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ class CreateRequestQueue {
/// Last time global gc was invoked in ms.
uint64_t last_global_gc_ms_;

/// Last successful object creation or spill invocation.
int64_t last_success_ns_ = 0;
/// The time OOM timer first starts. It becomes -1 upon every creation success.
int64_t oom_start_time_ns_ = -1;

friend class CreateRequestQueueTest;
};
Expand Down
30 changes: 26 additions & 4 deletions src/ray/object_manager/test/create_request_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class CreateRequestQueueTest : public ::testing::Test {
ASSERT_TRUE(queue_.fulfilled_requests_.empty());
}

void TearDown() { current_time_ns_ = 0; }

int64_t oom_grace_period_s_;
int64_t current_time_ns_;
CreateRequestQueue queue_;
Expand All @@ -71,6 +73,8 @@ TEST_F(CreateRequestQueueTest, TestSimple) {
result->data_size = 1234;
return PlasmaError::OK;
};
// Advance the clock without processing objects. This shouldn't have an impact.
current_time_ns_ += 10e9;
auto client = std::make_shared<MockClient>();
auto req_id = queue_.AddRequest(ObjectID::Nil(), client, request);
ASSERT_REQUEST_UNFINISHED(queue_, req_id);
Expand Down Expand Up @@ -135,13 +139,14 @@ TEST_F(CreateRequestQueueTest, TestOom) {

TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) {
int num_global_gc_ = 0;
int64_t current_time_ns;
CreateRequestQueue queue(
/*evict_if_full=*/true,
/*oom_grace_period_s=*/100,
// Spilling is failing.
/*spill_object_callback=*/[&]() { return false; },
/*on_global_gc=*/[&]() { num_global_gc_++; },
/*get_time=*/[&]() { return 0; });
/*get_time=*/[&]() { return current_time_ns; });

auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we advance the clock during the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just advanced a clock 1 second for 10 seconds.

return PlasmaError::OutOfMemory;
Expand All @@ -155,7 +160,9 @@ TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) {
auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request);
auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request);

for (int i = 0; i < 3; i++) {
for (int i = 0; i < 10; i++) {
// Advance 1 second.
current_time_ns += 1e9;
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
ASSERT_EQ(num_global_gc_, i + 1);
}
Expand Down Expand Up @@ -190,7 +197,10 @@ TEST_F(CreateRequestQueueTest, TestTransientOom) {
auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request);

// Transient OOM should happen until the grace period.
for (int i = 0; i < 3; i++) {
for (int i = 0; i < 9; i++) {
// Advance 0.1 seconds. OOM grace period is 1 second, so it should return transient
// error.
current_time_ns_ += 1e8;
ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue, req_id1);
ASSERT_REQUEST_UNFINISHED(queue, req_id2);
Expand Down Expand Up @@ -234,6 +244,8 @@ TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) {

// Transient OOM should not use up any until grace period is done.
for (int i = 0; i < 3; i++) {
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
// Advance 0.1 seconds. OOM grace period is 1 second.
current_time_ns_ += 1e8;
ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue, req_id1);
ASSERT_REQUEST_UNFINISHED(queue, req_id2);
Expand Down Expand Up @@ -272,20 +284,30 @@ TEST_F(CreateRequestQueueTest, TestEvictIfFull) {
}

TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) {
int64_t current_time_ns = 0;
CreateRequestQueue queue(
/*evict_if_full=*/false,
/*oom_grace_period_s=*/1,
/*spill_object_callback=*/[&]() { return false; },
/*on_global_gc=*/[&]() {},
/*get_time=*/[&]() { return 0; });
/*get_time=*/[&]() { return current_time_ns; });

bool first_try = true;

auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
if (first_try) {
RAY_CHECK(!evict_if_full);
first_try = false;
} else {
RAY_CHECK(evict_if_full);
}
return PlasmaError::OutOfMemory;
};

auto client = std::make_shared<MockClient>();
static_cast<void>(queue.AddRequest(ObjectID::Nil(), client, oom_request));
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
current_time_ns += 1e8;
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
}

Expand Down