Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix actor garbage collection by breaking cyclic references #1064

25 changes: 13 additions & 12 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def __ray_restore_from_checkpoint__(cls, pickled_checkpoint):
# constructor.
exported = []

# Create objects to wrap method invocations. This is done so that we can
# invoke methods with actor.method.remote() instead of actor.method().
class ActorMethod(object):
def __init__(self, actor, method_name, method_signature):
self.actor = actor
Expand Down Expand Up @@ -307,14 +309,6 @@ def _manual_init(self, *args, **kwargs):
self._ray_method_signatures[k] = signature.extract_signature(
v, ignore_first=True)

# Create objects to wrap method invocations. This is done so that
# we can invoke methods with actor.method.remote() instead of
# actor.method().
self._actor_method_invokers = dict()
for k, v in self._ray_actor_methods.items():
self._actor_method_invokers[k] = ActorMethod(
self, k, self._ray_method_signatures[k])

# Do not export the actor class or the actor if run in PYTHON_MODE
# Instead, instantiate the actor locally and add it to
# global_worker's dictionary
Expand Down Expand Up @@ -390,10 +384,17 @@ def __getattribute__(self, attr):
"_actor_method_call"]:
return object.__getattribute__(self, attr)
if attr in self._ray_actor_methods.keys():
return self._actor_method_invokers[attr]
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'"
.format(Class, attr))
# We create the ActorMethod on the fly here so that the
# ActorHandle doesn't need a reference to the ActorMethod. The
# ActorMethod has a reference to the ActorHandle and this was
# causing cyclic references which were prevent object
# deallocation from behaving in a predictable manner.
return ActorMethod(self, attr,
self._ray_method_signatures[attr])
else:
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'"
.format(Class, attr))

def __repr__(self):
return "Actor(" + self._ray_actor_id.hex() + ")"
Expand Down
1 change: 1 addition & 0 deletions python/ray/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def _pid_alive(pid):
"""
try:
os.kill(pid, 0)
return True
Copy link
Collaborator

@robertnishihara robertnishihara Oct 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of this return statement was a bug meaning that any calls to wait_for_pid_to_exit always succeeded. I'm fixing it here so it can be used in the actor test.

except OSError:
return False

Expand Down
18 changes: 12 additions & 6 deletions src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,6 @@ void handle_actor_worker_connect(LocalSchedulerState *state,
dispatch_actor_task(state, algorithm_state, actor_id);
}

void handle_actor_worker_disconnect(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id) {
remove_actor(algorithm_state, actor_id);
}

/**
* This will add a task to the task queue for an actor. If this is the first
* task being processed for this actor, it is possible that the LocalActorInfo
Expand Down Expand Up @@ -1156,6 +1150,18 @@ void handle_worker_removed(LocalSchedulerState *state,

/* Make sure we removed the worker at most once. */
CHECK(num_times_removed <= 1);

/* Attempt to dispatch some tasks because some resources may have freed up. */
dispatch_all_tasks(state, algorithm_state);
}

void handle_actor_worker_disconnect(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id) {
remove_actor(algorithm_state, actor_id);

/* Attempt to dispatch some tasks because some resources may have freed up. */
dispatch_all_tasks(state, algorithm_state);
}

void handle_actor_worker_available(LocalSchedulerState *state,
Expand Down
36 changes: 36 additions & 0 deletions test/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import collections
import random
import numpy as np
import os
import time
import unittest

import ray
import ray.test.test_utils


class ActorAPI(unittest.TestCase):
Expand Down Expand Up @@ -279,6 +281,40 @@ def f(self, y):

ray.worker.cleanup()

def testActorDeletion(self):
ray.init(num_workers=0)

# Make sure that when an actor handles goes out of scope, the actor
# destructor is called.

@ray.remote
class Actor(object):
def getpid(self):
return os.getpid()

a = Actor.remote()
pid = ray.get(a.getpid.remote())
a = None
ray.test.test_utils.wait_for_pid_to_exit(pid)

actors = [Actor.remote() for _ in range(10)]
pids = ray.get([a.getpid.remote() for a in actors])
a = None
actors = None
[ray.test.test_utils.wait_for_pid_to_exit(pid) for pid in pids]

@ray.remote
class Actor(object):
def method(self):
return 1

# Make sure that if we create an actor and call a method on it
# immediately, the actor doesn't get killed before the method is
# called.
self.assertEqual(ray.get(Actor.remote().method.remote()), 1)

ray.worker.cleanup()

def testActorState(self):
ray.init()

Expand Down
8 changes: 4 additions & 4 deletions test/jenkins_tests/multi_node_tests/many_drivers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def check_ids(self):


def driver(redis_address, driver_index):
"""The script for driver 0.
"""The script for all drivers.

This driver should create five actors that each use one GPU and some actors
that use no GPUs. After a while, it should exit.
This driver should create five actors that each use one GPU. After a while,
it should exit.
"""
ray.init(redis_address=redis_address)

Expand All @@ -44,7 +44,7 @@ def driver(redis_address, driver_index):
for i in range(driver_index - max_concurrent_drivers + 1):
_wait_for_event("DRIVER_{}_DONE".format(i), redis_address)

def try_to_create_actor(actor_class, timeout=100):
def try_to_create_actor(actor_class, timeout=500):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, this test seems to fail unless we increase the timeout here. I still don't see why this PR affects this test.

In a follow up PR, I'll try releasing GPU resources as soon as an actor exits and see if that speeds up this test.

Another possibility is that the monitor is the bottleneck because it has to process so many drivers that are exiting.

# Try to create an actor, but allow failures while we wait for the
# monitor to release the resources for the removed drivers.
start_time = time.time()
Expand Down