Skip to content

Commit

Permalink
use Ray specific logger so logging does not interfere with other pyth…
Browse files Browse the repository at this point in the history
…on modules that use the logging module (ray-project#321)
  • Loading branch information
robertnishihara authored and pcmoritz committed Jul 29, 2016
1 parent 46f88c2 commit 352e5e1
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions lib/python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def register_function(self, function):
Args:
function (Callable): The remote function that this worker can execute.
"""
logging.info("Registering function {}.".format(function.func_name))
_logger().info("Registering function {}.".format(function.func_name))
ray.lib.register_function(self.handle, function.func_name, len(function.return_types))
self.functions[function.func_name] = function

Expand Down Expand Up @@ -437,6 +437,9 @@ def submit_task(self, func_name, args):
made by one task do not affect other tasks.
"""

logger = logging.getLogger("ray")
"""Logger: The logging object for the Python worker code."""

def check_connected(worker=global_worker):
"""Check if the worker is connected.
Expand Down Expand Up @@ -542,11 +545,11 @@ def register_module(module, worker=global_worker):
module (module): The module of functions to register.
"""
check_connected(worker)
logging.info("registering functions in module {}.".format(module.__name__))
_logger().info("registering functions in module {}.".format(module.__name__))
for name in dir(module):
val = getattr(module, name)
if hasattr(val, "is_remote") and val.is_remote:
logging.info("registering {}.".format(val.func_name))
_logger().info("registering {}.".format(val.func_name))
worker.register_function(val)

def connect(scheduler_address, objstore_address, worker_address, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE):
Expand All @@ -569,7 +572,16 @@ def connect(scheduler_address, objstore_address, worker_address, is_driver=False
worker.handle = ray.lib.create_worker(worker.scheduler_address, worker.objstore_address, worker.worker_address, is_driver)
worker.set_mode(mode)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=ray.config.get_log_file_path("-".join(["worker", worker_address]) + ".log"))
# Configure the Python logging module. Note that if we do not provide our own
# logger, then our logging will interfere with other Python modules that also
# use the logging module.
log_handler = logging.FileHandler(ray.config.get_log_file_path("-".join(["worker", worker_address]) + ".log"))
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(logging.Formatter(FORMAT))
_logger().addHandler(log_handler)
_logger().setLevel(logging.DEBUG)
_logger().propagate = False
# Configure the logging from the worker C++ code.
ray.lib.set_log_config(ray.config.get_log_file_path("-".join(["worker", worker_address, "c++"]) + ".log"))
if mode in [ray.SHELL_MODE, ray.SCRIPT_MODE, ray.SILENT_MODE]:
for function_to_export in worker.cached_remote_functions:
Expand Down Expand Up @@ -721,7 +733,7 @@ def process_task(task): # wrapping these lines in a function should cause the lo
failure_objects = [RayFailedObject(exception_message) for _ in range(len(return_objrefs))]
store_outputs_in_objstore(return_objrefs, failure_objects, worker)
ray.lib.notify_task_completed(worker.handle, False, exception_message) # notify the scheduler that the task threw an exception
logging.info("Worker threw exception with message: \n\n{}\n, while running function {}.".format(exception_message, func_name))
_logger().info("Worker threw exception with message: \n\n{}\n, while running function {}.".format(exception_message, func_name))
else:
store_outputs_in_objstore(return_objrefs, outputs, worker) # store output in local object store
ray.lib.notify_task_completed(worker.handle, True, "") # notify the scheduler that the task completed successfully
Expand Down Expand Up @@ -773,6 +785,15 @@ def _mode(worker=global_worker):
"""
return worker.mode

def _logger():
"""Return the logger object.
We use this wrapper because so that functions which do logging can be pickled.
Normally a logger object is specific to a machine (it opens a local file), and
so cannot be pickled.
"""
return logger

def _export_reusable_variable(name, reusable, worker=global_worker):
"""Export a reusable variable to the workers. This is only called by a driver.
Expand Down Expand Up @@ -811,12 +832,12 @@ def func_call(*args, **kwargs):
return objrefs
def func_executor(arguments):
"""This gets run when the remote function is executed."""
logging.info("Calling function {}".format(func.__name__))
_logger().info("Calling function {}".format(func.__name__))
start_time = time.time()
result = func(*arguments)
end_time = time.time()
check_return_values(func_call, result) # throws an exception if result is invalid
logging.info("Finished executing function {}, it took {} seconds".format(func.__name__, end_time - start_time))
_logger().info("Finished executing function {}, it took {} seconds".format(func.__name__, end_time - start_time))
return result
func_call.executor = func_executor
func_call.arg_types = arg_types
Expand Down Expand Up @@ -1009,9 +1030,9 @@ def get_arguments_for_execution(function, args, worker=global_worker):

if isinstance(arg, ray.lib.ObjRef):
# get the object from the local object store
logging.info("Getting argument {} for function {}.".format(i, function.__name__))
_logger().info("Getting argument {} for function {}.".format(i, function.__name__))
argument = worker.get_object(arg)
logging.info("Successfully retrieved argument {} for function {}.".format(i, function.__name__))
_logger().info("Successfully retrieved argument {} for function {}.".format(i, function.__name__))
else:
# pass the argument by value
argument = arg
Expand Down Expand Up @@ -1043,7 +1064,7 @@ def store_outputs_in_objstore(objrefs, outputs, worker=global_worker):
for i in range(len(objrefs)):
if isinstance(outputs[i], ray.lib.ObjRef):
# An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to
logging.info("Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val))
_logger().info("Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val))
worker.alias_objrefs(objrefs[i], outputs[i])
pass
else:
Expand Down

0 comments on commit 352e5e1

Please sign in to comment.