Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Rename photon -> local scheduler. #320

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9f2d43f
Rename object_id -> ObjectID.
pcmoritz Feb 25, 2017
01acce4
Rename ray_logger -> RayLogger.
robertnishihara Feb 25, 2017
e335b84
rename task_id -> TaskID, actor_id -> ActorID, function_id -> FunctionID
pcmoritz Feb 25, 2017
9159b6d
Rename plasma_store_info -> PlasmaStoreInfo.
robertnishihara Feb 25, 2017
80dd1b1
Rename plasma_store_state -> PlasmaStoreState.
robertnishihara Feb 25, 2017
c0c4970
Rename plasma_object -> PlasmaObject.
robertnishihara Feb 25, 2017
b91cde3
Rename object_request -> ObjectRequests.
robertnishihara Feb 26, 2017
1758cd3
Rename eviction_state -> EvictionState.
robertnishihara Feb 26, 2017
e86ae05
Bug fix.
robertnishihara Feb 26, 2017
2b88286
rename db_handle -> DBHandle
pcmoritz Feb 26, 2017
22b68a4
Rename local_scheduler_state -> LocalSchedulerState.
robertnishihara Feb 26, 2017
fc42a82
rename db_client_id -> DBClientID
pcmoritz Feb 26, 2017
24dfa76
rename task -> Task
pcmoritz Feb 26, 2017
da801e6
make redis.c C++ compatible
pcmoritz Feb 26, 2017
6c522e2
Rename scheduling_algorithm_state -> SchedulingAlgorithmState.
robertnishihara Feb 26, 2017
3f4a077
Rename plasma_connection -> PlasmaConnection.
robertnishihara Feb 26, 2017
7dfe8b9
Rename client_connection -> ClientConnection.
robertnishihara Feb 26, 2017
a21c418
Fixes from rebase.
robertnishihara Feb 26, 2017
cb600e8
Rename local_scheduler_client -> LocalSchedulerClient.
robertnishihara Feb 26, 2017
6230cb4
Rename object_buffer -> ObjectBuffer.
robertnishihara Feb 26, 2017
8beef9f
Rename client -> Client.
robertnishihara Feb 26, 2017
7537d84
Rename notification_queue -> NotificationQueue.
robertnishihara Feb 26, 2017
ba4d3f1
Rename object_get_requests -> ObjectGetRequests.
robertnishihara Feb 26, 2017
0731c1c
Rename get_request -> GetRequest.
robertnishihara Feb 26, 2017
18d4704
Rename object_info -> ObjectInfo.
robertnishihara Feb 26, 2017
d969bfa
Rename scheduler_object_info -> SchedulerObjectInfo.
robertnishihara Feb 26, 2017
8b08392
Rename local_scheduler -> LocalScheduler and some fixes.
robertnishihara Feb 26, 2017
e47bca6
Rename local_scheduler_info -> LocalSchedulerInfo.
robertnishihara Feb 26, 2017
c688462
Rename global_scheduler_state -> GlobalSchedulerState.
robertnishihara Feb 26, 2017
7e5fcec
Rename global_scheduler_policy_state -> GlobalSchedulerPolicyState.
robertnishihara Feb 26, 2017
6036869
Rename object_size_entry -> ObjectSizeEntry.
robertnishihara Feb 26, 2017
5cfc49d
Rename aux_address_entry -> AuxAddressEntry.
robertnishihara Feb 26, 2017
e990b79
Rename various ID helper methods.
robertnishihara Feb 26, 2017
211f7a3
Rename Task helper methods.
robertnishihara Feb 26, 2017
929ea7c
Rename db_client_cache_entry -> DBClientCacheEntry.
robertnishihara Feb 26, 2017
84c01ca
Rename local_actor_info -> LocalActorInfo.
robertnishihara Feb 26, 2017
7e8a77d
Rename actor_info -> ActorInfo.
robertnishihara Feb 26, 2017
4f9cb72
Rename retry_info -> RetryInfo.
robertnishihara Feb 26, 2017
61dd4cb
Rename actor_notification_table_subscribe_data -> ActorNotificationTa…
robertnishihara Feb 26, 2017
0a89edc
Rename local_scheduler_table_send_info_data -> LocalSchedulerTableSen…
robertnishihara Feb 26, 2017
cc569e6
Rename table_callback_data -> TableCallbackData.
robertnishihara Feb 26, 2017
01c35de
Rename object_info_subscribe_data -> ObjectInfoSubscribeData.
robertnishihara Feb 26, 2017
3266714
Rename local_scheduler_table_subscribe_data -> LocalSchedulerTableSub…
robertnishihara Feb 26, 2017
7bfa7dd
Rename more redis call data structures.
robertnishihara Feb 26, 2017
37e5f12
Rename photon -> local scheduler.
robertnishihara Feb 26, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ matrix:
- cd python/core
- bash ../../src/common/test/run_valgrind.sh
- bash ../../src/plasma/test/run_valgrind.sh
- bash ../../src/photon/test/run_valgrind.sh
- bash ../../src/local_scheduler/test/run_valgrind.sh
- cd ../..

script:
- python ./python/plasma/test/test.py valgrind
- python ./python/photon/test/test.py valgrind
- python ./python/local_scheduler/test/test.py valgrind
- python ./python/global_scheduler/test/test.py valgrind

install:
Expand All @@ -58,7 +58,7 @@ install:
- cd python/core
- bash ../../src/common/test/run_tests.sh
- bash ../../src/plasma/test/run_tests.sh
- bash ../../src/photon/test/run_tests.sh
- bash ../../src/local_scheduler/test/run_tests.sh
- cd ../..

script:
Expand All @@ -69,7 +69,7 @@ script:
- python python/common/test/test.py
- python python/common/redis_module/runtest.py
- python python/plasma/test/test.py
- python python/photon/test/test.py
- python python/local_scheduler/test/test.py
- python python/global_scheduler/test/test.py

- python test/runtest.py
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ project(ray)

add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/common/)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/plasma/)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/photon/)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/local_scheduler/)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/global_scheduler/)
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/numbuf/)
34 changes: 17 additions & 17 deletions python/common/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@
import sys
import unittest

import photon
import local_scheduler

ID_SIZE = 20

def random_object_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_function_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_driver_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_task_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

BASE_SIMPLE_OBJECTS = [
0, 1, 100000, 0.0, 0.5, 0.9, 100000.1, (), [], {},
Expand Down Expand Up @@ -65,9 +65,9 @@ class TestSerialization(unittest.TestCase):
def test_serialize_by_value(self):

for val in SIMPLE_OBJECTS:
self.assertTrue(photon.check_simple_value(val))
self.assertTrue(local_scheduler.check_simple_value(val))
for val in COMPLEX_OBJECTS:
self.assertFalse(photon.check_simple_value(val))
self.assertFalse(local_scheduler.check_simple_value(val))

class TestObjectID(unittest.TestCase):

Expand All @@ -92,17 +92,17 @@ def h():
self.assertRaises(Exception, lambda : pickling.dumps(h))

def test_equality_comparisons(self):
x1 = photon.ObjectID(ID_SIZE * b"a")
x2 = photon.ObjectID(ID_SIZE * b"a")
y1 = photon.ObjectID(ID_SIZE * b"b")
y2 = photon.ObjectID(ID_SIZE * b"b")
x1 = local_scheduler.ObjectID(ID_SIZE * b"a")
x2 = local_scheduler.ObjectID(ID_SIZE * b"a")
y1 = local_scheduler.ObjectID(ID_SIZE * b"b")
y2 = local_scheduler.ObjectID(ID_SIZE * b"b")
self.assertEqual(x1, x2)
self.assertEqual(y1, y2)
self.assertNotEqual(x1, y1)

random_strings = [np.random.bytes(ID_SIZE) for _ in range(256)]
object_ids1 = [photon.ObjectID(random_strings[i]) for i in range(256)]
object_ids2 = [photon.ObjectID(random_strings[i]) for i in range(256)]
object_ids1 = [local_scheduler.ObjectID(random_strings[i]) for i in range(256)]
object_ids2 = [local_scheduler.ObjectID(random_strings[i]) for i in range(256)]
self.assertEqual(len(set(object_ids1)), 256)
self.assertEqual(len(set(object_ids1 + object_ids2)), 256)
self.assertEqual(set(object_ids1), set(object_ids2))
Expand All @@ -121,7 +121,7 @@ def check_task(self, task, function_id, num_return_vals, args):
self.assertEqual(num_return_vals, len(task.returns()))
self.assertEqual(len(args), len(retrieved_args))
for i in range(len(retrieved_args)):
if isinstance(retrieved_args[i], photon.ObjectID):
if isinstance(retrieved_args[i], local_scheduler.ObjectID):
self.assertEqual(retrieved_args[i].id(), args[i].id())
else:
self.assertEqual(retrieved_args[i], args[i])
Expand Down Expand Up @@ -160,10 +160,10 @@ def test_create_and_serialize_task(self):
]
for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
task = photon.Task(driver_id, function_id, args, num_return_vals, parent_id, 0)
task = local_scheduler.Task(driver_id, function_id, args, num_return_vals, parent_id, 0)
self.check_task(task, function_id, num_return_vals, args)
data = photon.task_to_string(task)
task2 = photon.task_from_string(data)
data = local_scheduler.task_to_string(task)
task2 = local_scheduler.task_from_string(data)
self.check_task(task2, function_id, num_return_vals, args)

if __name__ == "__main__":
Expand Down
40 changes: 20 additions & 20 deletions python/global_scheduler/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import unittest

import global_scheduler
import photon
import local_scheduler
import plasma
from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object

Expand All @@ -40,24 +40,24 @@
TASK_PREFIX = "TT:"

def random_driver_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_task_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_function_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_object_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def new_port():
return random.randint(10000, 65535)

class TestGlobalScheduler(unittest.TestCase):

def setUp(self):
# Start one Redis server and N pairs of (plasma, photon)
# Start one Redis server and N pairs of (plasma, local_scheduler)
node_ip_address = "127.0.0.1"
redis_port, self.redis_process = services.start_redis(cleanup=False)
redis_address = services.address(node_ip_address, redis_port)
Expand All @@ -69,7 +69,7 @@ def setUp(self):
self.plasma_manager_pids = []
self.local_scheduler_pids = []
self.plasma_clients = []
self.photon_clients = []
self.local_scheduler_clients = []

for i in range(NUM_CLUSTER_NODES):
# Start the Plasma store. Plasma store name is randomly generated.
Expand All @@ -83,15 +83,15 @@ def setUp(self):
plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name)
self.plasma_clients.append(plasma_client)
# Start the local scheduler.
local_scheduler_name, p4 = photon.start_local_scheduler(
local_scheduler_name, p4 = local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name=plasma_manager_name,
plasma_address=plasma_address,
redis_address=redis_address,
static_resource_list=[10, 0])
# Connect to the scheduler.
photon_client = photon.PhotonClient(local_scheduler_name, NIL_ACTOR_ID)
self.photon_clients.append(photon_client)
local_scheduler_client = local_scheduler.LocalSchedulerClient(local_scheduler_name, NIL_ACTOR_ID)
self.local_scheduler_clients.append(local_scheduler_client)
self.local_scheduler_pids.append(p4)

def tearDown(self):
Expand Down Expand Up @@ -148,11 +148,11 @@ def get_plasma_manager_id(self):
return db_client_id

def test_task_default_resources(self):
task1 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0)
task1 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0)
self.assertEqual(task1.required_resources(), [1.0, 0.0])
task2 = photon.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(), 0,
photon.ObjectID(NIL_ACTOR_ID), 0, [1.0, 2.0])
task2 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(), 0,
local_scheduler.ObjectID(NIL_ACTOR_ID), 0, [1.0, 2.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0])

def test_redis_only_single_task(self):
Expand All @@ -161,7 +161,7 @@ def test_redis_only_single_task(self):
task state transitions in Redis only. TODO(atumanov): implement.
"""
# Check precondition for this test:
# There should be 2n+1 db clients: the global scheduler + one photon and one plasma per node.
# There should be 2n+1 db clients: the global scheduler + one local scheduler and one plasma per node.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))),
2 * NUM_CLUSTER_NODES + 1)
db_client_id = self.get_plasma_manager_id()
Expand All @@ -182,11 +182,11 @@ def test_integration_single_task(self):
plasma_client = self.plasma_clients[0]
object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True)

# Sleep before submitting task to photon.
# Sleep before submitting task to local scheduler.
time.sleep(0.1)
# Submit a task to Redis.
task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.photon_clients[0].submit(task)
task = local_scheduler.Task(random_driver_id(), random_function_id(), [local_scheduler.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.local_scheduler_clients[0].submit(task)
time.sleep(0.1)
# There should now be a task in Redis, and it should get assigned to the
# local scheduler
Expand Down Expand Up @@ -231,8 +231,8 @@ def integration_many_tasks_helper(self, timesync=True):
if timesync:
# Give 10ms for object info handler to fire (long enough to yield CPU).
time.sleep(0.010)
task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.photon_clients[0].submit(task)
task = local_scheduler.Task(random_driver_id(), random_function_id(), [local_scheduler.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.local_scheduler_clients[0].submit(task)
# Check that there are the correct number of tasks in Redis and that they
# all get assigned to the local scheduler.
num_retries = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from __future__ import division
from __future__ import print_function

from core.src.photon.libphoton import *
from .photon_services import *
from core.src.local_scheduler.liblocal_scheduler import *
from .local_scheduler_services import *
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def start_local_scheduler(plasma_store_name,
raise Exception("If one of the plasma_manager_name and the redis_address is provided, then both must be provided.")
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/photon/photon_scheduler")
local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/local_scheduler/local_scheduler")
local_scheduler_name = "/tmp/scheduler{}".format(random_name())
command = [local_scheduler_executable,
"-s", local_scheduler_name,
Expand Down
42 changes: 21 additions & 21 deletions python/photon/test/test.py → python/local_scheduler/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import time
import unittest

import photon
import local_scheduler
import plasma

USE_VALGRIND = False
Expand All @@ -21,27 +21,27 @@
NIL_ACTOR_ID = 20 * b"\xff"

def random_object_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_driver_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_task_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

def random_function_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))

class TestPhotonClient(unittest.TestCase):
class TestLocalSchedulerClient(unittest.TestCase):

def setUp(self):
# Start Plasma store.
plasma_store_name, self.p1 = plasma.start_plasma_store()
self.plasma_client = plasma.PlasmaClient(plasma_store_name)
# Start a local scheduler.
scheduler_name, self.p2 = photon.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND)
scheduler_name, self.p2 = local_scheduler.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND)
# Connect to the scheduler.
self.photon_client = photon.PhotonClient(scheduler_name, NIL_ACTOR_ID)
self.local_scheduler_client = local_scheduler.LocalSchedulerClient(scheduler_name, NIL_ACTOR_ID)

def tearDown(self):
# Check that the processes are still alive.
Expand Down Expand Up @@ -99,40 +99,40 @@ def test_submit_and_get_task(self):

for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
task = photon.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0)
task = local_scheduler.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0)
# Submit a task.
self.photon_client.submit(task)
self.local_scheduler_client.submit(task)
# Get the task.
new_task = self.photon_client.get_task()
new_task = self.local_scheduler_client.get_task()
self.assertEqual(task.function_id().id(), new_task.function_id().id())
retrieved_args = new_task.arguments()
returns = new_task.returns()
self.assertEqual(len(args), len(retrieved_args))
self.assertEqual(num_return_vals, len(returns))
for i in range(len(retrieved_args)):
if isinstance(args[i], photon.ObjectID):
if isinstance(args[i], local_scheduler.ObjectID):
self.assertEqual(args[i].id(), retrieved_args[i].id())
else:
self.assertEqual(args[i], retrieved_args[i])

# Submit all of the tasks.
for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
task = photon.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0)
self.photon_client.submit(task)
task = local_scheduler.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0)
self.local_scheduler_client.submit(task)
# Get all of the tasks.
for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
new_task = self.photon_client.get_task()
new_task = self.local_scheduler_client.get_task()

def test_scheduling_when_objects_ready(self):
# Create a task and submit it.
object_id = random_object_id()
task = photon.Task(random_driver_id(), random_function_id(), [object_id], 0, random_task_id(), 0)
self.photon_client.submit(task)
task = local_scheduler.Task(random_driver_id(), random_function_id(), [object_id], 0, random_task_id(), 0)
self.local_scheduler_client.submit(task)
# Launch a thread to get the task.
def get_task():
self.photon_client.get_task()
self.local_scheduler_client.get_task()
t = threading.Thread(target=get_task)
t.start()
# Sleep to give the thread time to call get_task.
Expand All @@ -148,12 +148,12 @@ def test_scheduling_when_objects_evicted(self):
# Create a task with two dependencies and submit it.
object_id1 = random_object_id()
object_id2 = random_object_id()
task = photon.Task(random_driver_id(), random_function_id(), [object_id1, object_id2], 0, random_task_id(), 0)
self.photon_client.submit(task)
task = local_scheduler.Task(random_driver_id(), random_function_id(), [object_id1, object_id2], 0, random_task_id(), 0)
self.local_scheduler_client.submit(task)

# Launch a thread to get the task.
def get_task():
self.photon_client.get_task()
self.local_scheduler_client.get_task()
t = threading.Thread(target=get_task)
t.start()

Expand Down
Loading