18
18
# the worker is currently allowed to use.
19
19
gpu_ids = []
20
20
21
+
21
22
def get_gpu_ids ():
22
23
"""Get the IDs of the GPU that are available to the worker.
23
24
@@ -26,12 +27,15 @@ def get_gpu_ids():
26
27
"""
27
28
return gpu_ids
28
29
30
+
29
31
def random_string ():
30
32
return np .random .bytes (20 )
31
33
34
+
32
35
def random_actor_id ():
33
36
return ray .local_scheduler .ObjectID (random_string ())
34
37
38
+
35
39
def get_actor_method_function_id (attr ):
36
40
"""Get the function ID corresponding to an actor method.
37
41
@@ -47,10 +51,14 @@ def get_actor_method_function_id(attr):
47
51
assert len (function_id ) == 20
48
52
return ray .local_scheduler .ObjectID (function_id )
49
53
54
+
50
55
def fetch_and_register_actor (key , worker ):
51
56
"""Import an actor."""
52
- driver_id , actor_id_str , actor_name , module , pickled_class , assigned_gpu_ids , actor_method_names = \
53
- worker .redis_client .hmget (key , ["driver_id" , "actor_id" , "name" , "module" , "class" , "gpu_ids" , "actor_method_names" ])
57
+ (driver_id , actor_id_str , actor_name ,
58
+ module , pickled_class , assigned_gpu_ids ,
59
+ actor_method_names ) = worker .redis_client .hmget (
60
+ key , ["driver_id" , "actor_id" , "name" , "module" , "class" , "gpu_ids" ,
61
+ "actor_method_names" ])
54
62
actor_id = ray .local_scheduler .ObjectID (actor_id_str )
55
63
actor_name = actor_name .decode ("ascii" )
56
64
module = module .decode ("ascii" )
@@ -64,12 +72,14 @@ def fetch_and_register_actor(key, worker):
64
72
class TemporaryActor (object ):
65
73
pass
66
74
worker .actors [actor_id_str ] = TemporaryActor ()
75
+
67
76
def temporary_actor_method (* xs ):
68
77
raise Exception ("The actor with name {} failed to be imported, and so "
69
78
"cannot execute this method" .format (actor_name ))
70
79
for actor_method_name in actor_method_names :
71
80
function_id = get_actor_method_function_id (actor_method_name ).id ()
72
- worker .functions [driver_id ][function_id ] = (actor_method_name , temporary_actor_method )
81
+ worker .functions [driver_id ][function_id ] = (actor_method_name ,
82
+ temporary_actor_method )
73
83
74
84
try :
75
85
unpickled_class = pickling .loads (pickled_class )
@@ -84,11 +94,15 @@ def temporary_actor_method(*xs):
84
94
# TODO(pcm): Why is the below line necessary?
85
95
unpickled_class .__module__ = module
86
96
worker .actors [actor_id_str ] = unpickled_class .__new__ (unpickled_class )
87
- for (k , v ) in inspect .getmembers (unpickled_class , predicate = (lambda x : inspect .isfunction (x ) or inspect .ismethod (x ))):
97
+ for (k , v ) in inspect .getmembers (
98
+ unpickled_class , predicate = (lambda x : (inspect .isfunction (x ) or
99
+ inspect .ismethod (x )))):
88
100
function_id = get_actor_method_function_id (k ).id ()
89
101
worker .functions [driver_id ][function_id ] = (k , v )
90
- # We do not set worker.function_properties[driver_id][function_id] because
91
- # we currently do need the actor worker to submit new tasks for the actor.
102
+ # We do not set worker.function_properties[driver_id][function_id]
103
+ # because we currently do need the actor worker to submit new tasks for
104
+ # the actor.
105
+
92
106
93
107
def select_local_scheduler (local_schedulers , num_gpus , worker ):
94
108
"""Select a local scheduler to assign this actor to.
@@ -119,26 +133,33 @@ def select_local_scheduler(local_schedulers, num_gpus, worker):
119
133
# Loop through all of the local schedulers.
120
134
for local_scheduler in local_schedulers :
121
135
# See if there are enough available GPUs on this local scheduler.
122
- local_scheduler_total_gpus = int (float (local_scheduler [b"num_gpus" ].decode ("ascii" )))
123
- gpus_in_use = worker .redis_client .hget (local_scheduler [b"ray_client_id" ], b"gpus_in_use" )
136
+ local_scheduler_total_gpus = int (float (
137
+ local_scheduler [b"num_gpus" ].decode ("ascii" )))
138
+ gpus_in_use = worker .redis_client .hget (local_scheduler [b"ray_client_id" ],
139
+ b"gpus_in_use" )
124
140
gpus_in_use = 0 if gpus_in_use is None else int (gpus_in_use )
125
141
if gpus_in_use + num_gpus <= local_scheduler_total_gpus :
126
142
# Attempt to reserve some GPUs for this actor.
127
- new_gpus_in_use = worker .redis_client .hincrby (local_scheduler [b"ray_client_id" ], b"gpus_in_use" , num_gpus )
143
+ new_gpus_in_use = worker .redis_client .hincrby (
144
+ local_scheduler [b"ray_client_id" ], b"gpus_in_use" , num_gpus )
128
145
if new_gpus_in_use > local_scheduler_total_gpus :
129
146
# If we failed to reserve the GPUs, undo the increment.
130
- worker .redis_client .hincrby (local_scheduler [b"ray_client_id" ], b"gpus_in_use" , num_gpus )
147
+ worker .redis_client .hincrby (local_scheduler [b"ray_client_id" ],
148
+ b"gpus_in_use" , num_gpus )
131
149
else :
132
150
# We succeeded at reserving the GPUs, so we are done.
133
151
local_scheduler_id = local_scheduler [b"ray_client_id" ]
134
152
gpu_ids = list (range (new_gpus_in_use - num_gpus , new_gpus_in_use ))
135
153
break
136
154
if local_scheduler_id is None :
137
155
raise Exception ("Could not find a node with enough GPUs to create this "
138
- "actor. The local scheduler information is {}." .format (local_schedulers ))
156
+ "actor. The local scheduler information is {}."
157
+ .format (local_schedulers ))
139
158
return local_scheduler_id , gpu_ids
140
159
141
- def export_actor (actor_id , Class , actor_method_names , num_cpus , num_gpus , worker ):
160
+
161
+ def export_actor (actor_id , Class , actor_method_names , num_cpus , num_gpus ,
162
+ worker ):
142
163
"""Export an actor to redis.
143
164
144
165
Args:
@@ -158,13 +179,16 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, worker
158
179
driver_id = worker .task_driver_id .id ()
159
180
for actor_method_name in actor_method_names :
160
181
function_id = get_actor_method_function_id (actor_method_name ).id ()
161
- worker .function_properties [driver_id ][function_id ] = (1 , num_cpus , num_gpus )
182
+ worker .function_properties [driver_id ][function_id ] = (1 , num_cpus ,
183
+ num_gpus )
162
184
163
185
# Select a local scheduler for the actor.
164
186
local_schedulers = state .get_local_schedulers (worker )
165
- local_scheduler_id , gpu_ids = select_local_scheduler (local_schedulers , num_gpus , worker )
187
+ local_scheduler_id , gpu_ids = select_local_scheduler (local_schedulers ,
188
+ num_gpus , worker )
166
189
167
- worker .redis_client .publish ("actor_notifications" , actor_id .id () + local_scheduler_id )
190
+ worker .redis_client .publish ("actor_notifications" ,
191
+ actor_id .id () + local_scheduler_id )
168
192
169
193
d = {"driver_id" : driver_id ,
170
194
"actor_id" : actor_id .id (),
@@ -176,6 +200,7 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, worker
176
200
worker .redis_client .hmset (key , d )
177
201
worker .redis_client .rpush ("Exports" , key )
178
202
203
+
179
204
def actor (* args , ** kwargs ):
180
205
def make_actor_decorator (num_cpus = 1 , num_gpus = 0 ):
181
206
def make_actor (Class ):
@@ -189,7 +214,8 @@ def actor_method_call(actor_id, attr, *args, **kwargs):
189
214
raise Exception ("Actors currently do not support **kwargs." )
190
215
function_id = get_actor_method_function_id (attr )
191
216
# TODO(pcm): Extend args with keyword args.
192
- object_ids = ray .worker .global_worker .submit_task (function_id , "" , args ,
217
+ object_ids = ray .worker .global_worker .submit_task (function_id , "" ,
218
+ args ,
193
219
actor_id = actor_id )
194
220
if len (object_ids ) == 1 :
195
221
return object_ids [0 ]
@@ -199,24 +225,34 @@ def actor_method_call(actor_id, attr, *args, **kwargs):
199
225
class NewClass (object ):
200
226
def __init__ (self , * args , ** kwargs ):
201
227
self ._ray_actor_id = random_actor_id ()
202
- self ._ray_actor_methods = {k : v for (k , v ) in inspect .getmembers (Class , predicate = (lambda x : inspect .isfunction (x ) or inspect .ismethod (x )))}
203
- export_actor (self ._ray_actor_id , Class , self ._ray_actor_methods .keys (), num_cpus , num_gpus , ray .worker .global_worker )
228
+ self ._ray_actor_methods = {
229
+ k : v for (k , v ) in inspect .getmembers (
230
+ Class , predicate = (lambda x : (inspect .isfunction (x ) or
231
+ inspect .ismethod (x ))))}
232
+ export_actor (self ._ray_actor_id , Class ,
233
+ self ._ray_actor_methods .keys (), num_cpus , num_gpus ,
234
+ ray .worker .global_worker )
204
235
# Call __init__ as a remote function.
205
236
if "__init__" in self ._ray_actor_methods .keys ():
206
237
actor_method_call (self ._ray_actor_id , "__init__" , * args , ** kwargs )
207
238
else :
208
239
print ("WARNING: this object has no __init__ method." )
240
+
209
241
# Make tab completion work.
210
242
def __dir__ (self ):
211
243
return self ._ray_actor_methods
244
+
212
245
def __getattribute__ (self , attr ):
213
246
# The following is needed so we can still access self.actor_methods.
214
247
if attr in ["_ray_actor_id" , "_ray_actor_methods" ]:
215
248
return super (NewClass , self ).__getattribute__ (attr )
216
249
if attr in self ._ray_actor_methods .keys ():
217
- return lambda * args , ** kwargs : actor_method_call (self ._ray_actor_id , attr , * args , ** kwargs )
250
+ return lambda * args , ** kwargs : actor_method_call (
251
+ self ._ray_actor_id , attr , * args , ** kwargs )
218
252
# There is no method with this name, so raise an exception.
219
- raise AttributeError ("'{}' Actor object has no attribute '{}'" .format (Class , attr ))
253
+ raise AttributeError ("'{}' Actor object has no attribute '{}'"
254
+ .format (Class , attr ))
255
+
220
256
def __repr__ (self ):
221
257
return "Actor(" + self ._ray_actor_id .hex () + ")"
222
258
@@ -230,7 +266,9 @@ def __repr__(self):
230
266
return make_actor_decorator (num_cpus = 1 , num_gpus = 0 )(Class )
231
267
232
268
# In this case, the actor decorator is something like @ray.actor(num_gpus=1).
233
- if len (args ) == 0 and len (kwargs ) > 0 and all ([key in ["num_cpus" , "num_gpus" ] for key in kwargs .keys ()]):
269
+ if len (args ) == 0 and len (kwargs ) > 0 and all ([key
270
+ in ["num_cpus" , "num_gpus" ]
271
+ for key in kwargs .keys ()]):
234
272
num_cpus = kwargs ["num_cpus" ] if "num_cpus" in kwargs .keys () else 1
235
273
num_gpus = kwargs ["num_gpus" ] if "num_gpus" in kwargs .keys () else 0
236
274
return make_actor_decorator (num_cpus = num_cpus , num_gpus = num_gpus )
@@ -240,4 +278,5 @@ def __repr__(self):
240
278
"some of the arguments 'num_cpus' or 'num_gpus' as in "
241
279
"'ray.actor(num_gpus=1)'." )
242
280
281
+
243
282
ray .worker .global_worker .fetch_and_register ["Actor" ] = fetch_and_register_actor
0 commit comments