Skip to content

Commit

Permalink
Suppress irrelevant Redis connection errors. (ray-project#434)
Browse files Browse the repository at this point in the history
* Suppress error messages in worker import thread when Redis terminates.

* Suppress some warnings from one of the tests.
  • Loading branch information
robertnishihara authored and pcmoritz committed Apr 8, 2017
1 parent 0eac3cc commit 7cd0074
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 28 deletions.
59 changes: 32 additions & 27 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,33 +1295,38 @@ def import_thread(worker):
raise Exception("This code should be unreachable.")
num_imported += 1

for msg in worker.import_pubsub_client.listen():
with worker.lock:
if msg["type"] == "psubscribe":
continue
assert msg["data"] == b"rpush"
num_imports = worker.redis_client.llen("Exports")
assert num_imports >= num_imported
for i in range(num_imported, num_imports):
key = worker.redis_client.lindex("Exports", i)
if key.startswith(b"RemoteFunction"):
with log_span("ray:import_remote_function", worker=worker):
fetch_and_register_remote_function(key, worker=worker)
elif key.startswith(b"EnvironmentVariables"):
with log_span("ray:import_environment_variable", worker=worker):
fetch_and_register_environment_variable(key, worker=worker)
elif key.startswith(b"FunctionsToRun"):
with log_span("ray:import_function_to_run", worker=worker):
fetch_and_execute_function_to_run(key, worker=worker)
elif key.startswith(b"Actor"):
# Only get the actor if the actor ID matches the actor ID of this
# worker.
actor_id, = worker.redis_client.hmget(key, "actor_id")
if worker.actor_id == actor_id:
worker.fetch_and_register["Actor"](key, worker)
else:
raise Exception("This code should be unreachable.")
num_imported += 1
try:
for msg in worker.import_pubsub_client.listen():
with worker.lock:
if msg["type"] == "psubscribe":
continue
assert msg["data"] == b"rpush"
num_imports = worker.redis_client.llen("Exports")
assert num_imports >= num_imported
for i in range(num_imported, num_imports):
key = worker.redis_client.lindex("Exports", i)
if key.startswith(b"RemoteFunction"):
with log_span("ray:import_remote_function", worker=worker):
fetch_and_register_remote_function(key, worker=worker)
elif key.startswith(b"EnvironmentVariables"):
with log_span("ray:import_environment_variable", worker=worker):
fetch_and_register_environment_variable(key, worker=worker)
elif key.startswith(b"FunctionsToRun"):
with log_span("ray:import_function_to_run", worker=worker):
fetch_and_execute_function_to_run(key, worker=worker)
elif key.startswith(b"Actor"):
# Only get the actor if the actor ID matches the actor ID of this
# worker.
actor_id, = worker.redis_client.hmget(key, "actor_id")
if worker.actor_id == actor_id:
worker.fetch_and_register["Actor"](key, worker)
else:
raise Exception("This code should be unreachable.")
num_imported += 1
except redis.ConnectionError:
# When Redis terminates the listen call will throw a ConnectionError, which
# we catch here.
pass


def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
Expand Down
2 changes: 1 addition & 1 deletion test/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ def testActorMultipleGPUsFromMultipleTasks(self):
num_gpus_per_scheduler = 10
ray.worker._init(
start_ray_local=True, num_workers=0,
num_local_schedulers=num_local_schedulers,
num_local_schedulers=num_local_schedulers, redirect_output=True,
num_gpus=(num_local_schedulers * [num_gpus_per_scheduler]))

@ray.remote
Expand Down

0 comments on commit 7cd0074

Please sign in to comment.