From d4d2c03ac55cbd1a2674bdb33f0426c5bc5c4c48 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 9 Jun 2017 15:55:36 -0700 Subject: [PATCH] Remove timeout for Redis commands. (#649) * update * Remove interaction between callback data identifier and event loop. * Remove tests that no longer apply. --- src/common/state/redis.cc | 15 +++++++-------- src/common/state/table.cc | 20 +++++++++++++++----- src/common/state/table.h | 10 ++++++++++ src/common/test/object_table_tests.cc | 12 ++++++------ src/common/test/task_table_tests.cc | 12 ++++++------ test/stress_tests.py | 17 +++++++++++++++++ 6 files changed, 61 insertions(+), 25 deletions(-) diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index 0cdfa09d3d98..e0fed8430e47 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -732,8 +732,7 @@ void object_table_redis_subscribe_to_notifications_callback( } /* If the initial SUBSCRIBE was successful, clean up the timer, but don't * destroy the callback data. */ - event_loop_remove_timer(callback_data->db_handle->loop, - callback_data->timer_id); + remove_timer_callback(callback_data->db_handle->loop, callback_data); } else { LOG_FATAL( "Unexpected reply type from object table subscribe to notifications."); @@ -1106,7 +1105,7 @@ void redis_task_table_subscribe_callback(redisAsyncContext *c, } /* Note that we do not destroy the callback data yet because the * subscription callback needs this data. */ - event_loop_remove_timer(db->loop, callback_data->timer_id); + remove_timer_callback(db->loop, callback_data); } else { LOG_FATAL( "Unexpected reply type from task table subscribe. Message type is %s.", @@ -1247,7 +1246,7 @@ void redis_db_client_table_subscribe_callback(redisAsyncContext *c, } /* Note that we do not destroy the callback data yet because the * subscription callback needs this data. */ - event_loop_remove_timer(db->loop, callback_data->timer_id); + remove_timer_callback(db->loop, callback_data); /* Get the current db client table entries, in case we missed notifications * before the initial subscription. This must be done before we process any @@ -1343,7 +1342,7 @@ void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c, CHECK(callback_data->done_callback == NULL); /* If the initial SUBSCRIBE was successful, clean up the timer, but don't * destroy the callback data. */ - event_loop_remove_timer(db->loop, callback_data->timer_id); + remove_timer_callback(db->loop, callback_data); } else { LOG_FATAL("Unexpected reply type from local scheduler subscribe."); @@ -1431,7 +1430,7 @@ void redis_driver_table_subscribe_callback(redisAsyncContext *c, CHECK(callback_data->done_callback == NULL); /* If the initial SUBSCRIBE was successful, clean up the timer, but don't * destroy the callback data. */ - event_loop_remove_timer(db->loop, callback_data->timer_id); + remove_timer_callback(db->loop, callback_data); } else { LOG_FATAL("Unexpected reply type from driver subscribe."); @@ -1541,7 +1540,7 @@ void redis_actor_notification_table_subscribe_callback(redisAsyncContext *c, CHECK(callback_data->done_callback == NULL); /* If the initial SUBSCRIBE was successful, clean up the timer, but don't * destroy the callback data. */ - event_loop_remove_timer(db->loop, callback_data->timer_id); + remove_timer_callback(db->loop, callback_data); } else { LOG_FATAL("Unexpected reply type from actor notification subscribe."); @@ -1582,7 +1581,7 @@ void redis_object_info_subscribe_callback(redisAsyncContext *c, } /* Note that we do not destroy the callback data yet because the * subscription callback needs this data. */ - event_loop_remove_timer(db->loop, callback_data->timer_id); + remove_timer_callback(db->loop, callback_data); return; } /* Otherwise, parse the payload and call the callback. */ diff --git a/src/common/state/table.cc b/src/common/state/table.cc index e092ecaa0f23..3db3d2e45408 100644 --- a/src/common/state/table.cc +++ b/src/common/state/table.cc @@ -8,6 +8,8 @@ static const RetryInfo default_retry = {.num_retries = -1, .timeout = 10000, .fail_callback = NULL}; +static int64_t callback_data_id = 0; + TableCallbackData *init_table_callback(DBHandle *db_handle, UniqueID id, const char *label, @@ -36,10 +38,10 @@ TableCallbackData *init_table_callback(DBHandle *db_handle, callback_data->requests_info = NULL; callback_data->user_context = user_context; callback_data->db_handle = db_handle; - /* Add timer and initialize it. */ - callback_data->timer_id = event_loop_add_timer( - db_handle->loop, retry->timeout, - (event_loop_timer_handler) table_timeout_handler, callback_data); + /* TODO(ekl) set a retry timer once we've figured out the retry conditions + * and have a solution to the O(n^2) ae timers issue. For now, use a dummy + * timer id to uniquely id this callback. */ + callback_data->timer_id = callback_data_id++; outstanding_callbacks_add(callback_data); LOG_DEBUG("Initializing table command %s with timer ID %" PRId64, @@ -51,10 +53,18 @@ TableCallbackData *init_table_callback(DBHandle *db_handle, void destroy_timer_callback(event_loop *loop, TableCallbackData *callback_data) { - event_loop_remove_timer(loop, callback_data->timer_id); + /* This is commented out because we no longer add timers to the event loop for + * each Redis command. */ + // event_loop_remove_timer(loop, callback_data->timer_id); destroy_table_callback(callback_data); } +void remove_timer_callback(event_loop *loop, TableCallbackData *callback_data) { + /* This is commented out because we no longer add timers to the event loop for + * each Redis command. */ + // event_loop_remove_timer(loop, callback_data->timer_id); +} + void destroy_table_callback(TableCallbackData *callback_data) { CHECK(callback_data != NULL); diff --git a/src/common/state/table.h b/src/common/state/table.h index fac384c2f308..760cc3dcc6c1 100644 --- a/src/common/state/table.h +++ b/src/common/state/table.h @@ -128,12 +128,22 @@ void destroy_table_callback(TableCallbackData *callback_data); * Destroy all state events associated with the callback data, including memory * and timer events. * + * @param loop The event loop. * @param callback_data The pointer to the data structure of the callback we * want to remove. * @return Void. */ void destroy_timer_callback(event_loop *loop, TableCallbackData *callback_data); +/** + * Remove the callback timer without destroying the callback data. + * + * @param loop The event loop. + * @param callback_data The pointer to the data structure of the callback. + * @return Void. + */ +void remove_timer_callback(event_loop *loop, TableCallbackData *callback_data); + /** * Add an outstanding callback entry. * diff --git a/src/common/test/object_table_tests.cc b/src/common/test/object_table_tests.cc index 195f83b5625a..2598a5021fef 100644 --- a/src/common/test/object_table_tests.cc +++ b/src/common/test/object_table_tests.cc @@ -890,14 +890,14 @@ TEST subscribe_object_available_subscribe_all(void) { SUITE(object_table_tests) { RUN_REDIS_TEST(new_object_test); RUN_REDIS_TEST(new_object_no_task_test); - RUN_REDIS_TEST(lookup_timeout_test); - RUN_REDIS_TEST(add_timeout_test); - RUN_REDIS_TEST(subscribe_timeout_test); + // RUN_REDIS_TEST(lookup_timeout_test); + // RUN_REDIS_TEST(add_timeout_test); + // RUN_REDIS_TEST(subscribe_timeout_test); RUN_REDIS_TEST(add_lookup_test); RUN_REDIS_TEST(add_remove_lookup_test); - RUN_REDIS_TEST(lookup_late_test); - RUN_REDIS_TEST(add_late_test); - RUN_REDIS_TEST(subscribe_late_test); + // RUN_REDIS_TEST(lookup_late_test); + // RUN_REDIS_TEST(add_late_test); + // RUN_REDIS_TEST(subscribe_late_test); RUN_REDIS_TEST(subscribe_success_test); RUN_REDIS_TEST(subscribe_object_not_present_test); RUN_REDIS_TEST(subscribe_object_available_later_test); diff --git a/src/common/test/task_table_tests.cc b/src/common/test/task_table_tests.cc index ae363eca4336..998bbb3eab62 100644 --- a/src/common/test/task_table_tests.cc +++ b/src/common/test/task_table_tests.cc @@ -434,12 +434,12 @@ TEST publish_late_test(void) { SUITE(task_table_tests) { RUN_REDIS_TEST(lookup_nil_test); RUN_REDIS_TEST(add_lookup_test); - RUN_REDIS_TEST(subscribe_timeout_test); - RUN_REDIS_TEST(publish_timeout_test); - RUN_REDIS_TEST(subscribe_retry_test); - RUN_REDIS_TEST(publish_retry_test); - RUN_REDIS_TEST(subscribe_late_test); - RUN_REDIS_TEST(publish_late_test); + // RUN_REDIS_TEST(subscribe_timeout_test); + // RUN_REDIS_TEST(publish_timeout_test); + // RUN_REDIS_TEST(subscribe_retry_test); + // RUN_REDIS_TEST(publish_retry_test); + // RUN_REDIS_TEST(subscribe_late_test); + // RUN_REDIS_TEST(publish_late_test); } GREATEST_MAIN_DEFS(); diff --git a/test/stress_tests.py b/test/stress_tests.py index bb61bf93c938..9fd15cf7e544 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -67,6 +67,23 @@ def g(*xs): self.assertTrue(ray.services.all_processes_alive()) ray.worker.cleanup() + def testSubmittingManyTasks(self): + ray.init() + + @ray.remote + def f(x): + return 1 + + def g(n): + x = 1 + for i in range(n): + x = f.remote(x) + return x + + ray.get([g(1000) for _ in range(100)]) + self.assertTrue(ray.services.all_processes_alive()) + ray.worker.cleanup() + def testGettingAndPutting(self): ray.init(num_workers=1)