Skip to content

Commit

Permalink
Remove timeout for Redis commands. (ray-project#649)
Browse files Browse the repository at this point in the history
* update

* Remove interaction between callback data identifier and event loop.

* Remove tests that no longer apply.
  • Loading branch information
ericl authored and robertnishihara committed Jun 9, 2017
1 parent ee1d4e5 commit d4d2c03
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 25 deletions.
15 changes: 7 additions & 8 deletions src/common/state/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,7 @@ void object_table_redis_subscribe_to_notifications_callback(
}
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
event_loop_remove_timer(callback_data->db_handle->loop,
callback_data->timer_id);
remove_timer_callback(callback_data->db_handle->loop, callback_data);
} else {
LOG_FATAL(
"Unexpected reply type from object table subscribe to notifications.");
Expand Down Expand Up @@ -1106,7 +1105,7 @@ void redis_task_table_subscribe_callback(redisAsyncContext *c,
}
/* Note that we do not destroy the callback data yet because the
* subscription callback needs this data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);
remove_timer_callback(db->loop, callback_data);
} else {
LOG_FATAL(
"Unexpected reply type from task table subscribe. Message type is %s.",
Expand Down Expand Up @@ -1247,7 +1246,7 @@ void redis_db_client_table_subscribe_callback(redisAsyncContext *c,
}
/* Note that we do not destroy the callback data yet because the
* subscription callback needs this data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);
remove_timer_callback(db->loop, callback_data);

/* Get the current db client table entries, in case we missed notifications
* before the initial subscription. This must be done before we process any
Expand Down Expand Up @@ -1343,7 +1342,7 @@ void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c,
CHECK(callback_data->done_callback == NULL);
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);
remove_timer_callback(db->loop, callback_data);

} else {
LOG_FATAL("Unexpected reply type from local scheduler subscribe.");
Expand Down Expand Up @@ -1431,7 +1430,7 @@ void redis_driver_table_subscribe_callback(redisAsyncContext *c,
CHECK(callback_data->done_callback == NULL);
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);
remove_timer_callback(db->loop, callback_data);

} else {
LOG_FATAL("Unexpected reply type from driver subscribe.");
Expand Down Expand Up @@ -1541,7 +1540,7 @@ void redis_actor_notification_table_subscribe_callback(redisAsyncContext *c,
CHECK(callback_data->done_callback == NULL);
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);
remove_timer_callback(db->loop, callback_data);

} else {
LOG_FATAL("Unexpected reply type from actor notification subscribe.");
Expand Down Expand Up @@ -1582,7 +1581,7 @@ void redis_object_info_subscribe_callback(redisAsyncContext *c,
}
/* Note that we do not destroy the callback data yet because the
* subscription callback needs this data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);
remove_timer_callback(db->loop, callback_data);
return;
}
/* Otherwise, parse the payload and call the callback. */
Expand Down
20 changes: 15 additions & 5 deletions src/common/state/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ static const RetryInfo default_retry = {.num_retries = -1,
.timeout = 10000,
.fail_callback = NULL};

static int64_t callback_data_id = 0;

TableCallbackData *init_table_callback(DBHandle *db_handle,
UniqueID id,
const char *label,
Expand Down Expand Up @@ -36,10 +38,10 @@ TableCallbackData *init_table_callback(DBHandle *db_handle,
callback_data->requests_info = NULL;
callback_data->user_context = user_context;
callback_data->db_handle = db_handle;
/* Add timer and initialize it. */
callback_data->timer_id = event_loop_add_timer(
db_handle->loop, retry->timeout,
(event_loop_timer_handler) table_timeout_handler, callback_data);
/* TODO(ekl) set a retry timer once we've figured out the retry conditions
* and have a solution to the O(n^2) ae timers issue. For now, use a dummy
* timer id to uniquely id this callback. */
callback_data->timer_id = callback_data_id++;
outstanding_callbacks_add(callback_data);

LOG_DEBUG("Initializing table command %s with timer ID %" PRId64,
Expand All @@ -51,10 +53,18 @@ TableCallbackData *init_table_callback(DBHandle *db_handle,

void destroy_timer_callback(event_loop *loop,
TableCallbackData *callback_data) {
event_loop_remove_timer(loop, callback_data->timer_id);
/* This is commented out because we no longer add timers to the event loop for
* each Redis command. */
// event_loop_remove_timer(loop, callback_data->timer_id);
destroy_table_callback(callback_data);
}

void remove_timer_callback(event_loop *loop, TableCallbackData *callback_data) {
/* This is commented out because we no longer add timers to the event loop for
* each Redis command. */
// event_loop_remove_timer(loop, callback_data->timer_id);
}

void destroy_table_callback(TableCallbackData *callback_data) {
CHECK(callback_data != NULL);

Expand Down
10 changes: 10 additions & 0 deletions src/common/state/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,22 @@ void destroy_table_callback(TableCallbackData *callback_data);
* Destroy all state events associated with the callback data, including memory
* and timer events.
*
* @param loop The event loop.
* @param callback_data The pointer to the data structure of the callback we
* want to remove.
* @return Void.
*/
void destroy_timer_callback(event_loop *loop, TableCallbackData *callback_data);

/**
* Remove the callback timer without destroying the callback data.
*
* @param loop The event loop.
* @param callback_data The pointer to the data structure of the callback.
* @return Void.
*/
void remove_timer_callback(event_loop *loop, TableCallbackData *callback_data);

/**
* Add an outstanding callback entry.
*
Expand Down
12 changes: 6 additions & 6 deletions src/common/test/object_table_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,14 +890,14 @@ TEST subscribe_object_available_subscribe_all(void) {
SUITE(object_table_tests) {
RUN_REDIS_TEST(new_object_test);
RUN_REDIS_TEST(new_object_no_task_test);
RUN_REDIS_TEST(lookup_timeout_test);
RUN_REDIS_TEST(add_timeout_test);
RUN_REDIS_TEST(subscribe_timeout_test);
// RUN_REDIS_TEST(lookup_timeout_test);
// RUN_REDIS_TEST(add_timeout_test);
// RUN_REDIS_TEST(subscribe_timeout_test);
RUN_REDIS_TEST(add_lookup_test);
RUN_REDIS_TEST(add_remove_lookup_test);
RUN_REDIS_TEST(lookup_late_test);
RUN_REDIS_TEST(add_late_test);
RUN_REDIS_TEST(subscribe_late_test);
// RUN_REDIS_TEST(lookup_late_test);
// RUN_REDIS_TEST(add_late_test);
// RUN_REDIS_TEST(subscribe_late_test);
RUN_REDIS_TEST(subscribe_success_test);
RUN_REDIS_TEST(subscribe_object_not_present_test);
RUN_REDIS_TEST(subscribe_object_available_later_test);
Expand Down
12 changes: 6 additions & 6 deletions src/common/test/task_table_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,12 @@ TEST publish_late_test(void) {
SUITE(task_table_tests) {
RUN_REDIS_TEST(lookup_nil_test);
RUN_REDIS_TEST(add_lookup_test);
RUN_REDIS_TEST(subscribe_timeout_test);
RUN_REDIS_TEST(publish_timeout_test);
RUN_REDIS_TEST(subscribe_retry_test);
RUN_REDIS_TEST(publish_retry_test);
RUN_REDIS_TEST(subscribe_late_test);
RUN_REDIS_TEST(publish_late_test);
// RUN_REDIS_TEST(subscribe_timeout_test);
// RUN_REDIS_TEST(publish_timeout_test);
// RUN_REDIS_TEST(subscribe_retry_test);
// RUN_REDIS_TEST(publish_retry_test);
// RUN_REDIS_TEST(subscribe_late_test);
// RUN_REDIS_TEST(publish_late_test);
}

GREATEST_MAIN_DEFS();
Expand Down
17 changes: 17 additions & 0 deletions test/stress_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ def g(*xs):
self.assertTrue(ray.services.all_processes_alive())
ray.worker.cleanup()

def testSubmittingManyTasks(self):
ray.init()

@ray.remote
def f(x):
return 1

def g(n):
x = 1
for i in range(n):
x = f.remote(x)
return x

ray.get([g(1000) for _ in range(100)])
self.assertTrue(ray.services.all_processes_alive())
ray.worker.cleanup()

def testGettingAndPutting(self):
ray.init(num_workers=1)

Expand Down

0 comments on commit d4d2c03

Please sign in to comment.