Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rllib] Initial work on integrating hyperparameter search tool #1107

Merged
merged 17 commits into from
Oct 13, 2017
Prev Previous commit
Next Next commit
update
  • Loading branch information
ericl committed Oct 10, 2017
commit 4ba055a71be7c072483577b61f7af1fc3d4ef809
5 changes: 2 additions & 3 deletions python/ray/rllib/a3c/a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import ray
from ray.rllib.a3c.runner import RunnerThread, process_rollout
from ray.rllib.a3c.envs import create_and_wrap
from ray.rllib.common import Agent, TrainingResult, get_tensorflow_log_dir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intended?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, seems like Eric split local_log_dir and upload_dir, that's a better way to do it than what we had :)

from ray.rllib.common import Agent, TrainingResult
from ray.rllib.a3c.shared_model import SharedModel
from ray.rllib.a3c.shared_model_lstm import SharedModelLSTM

Expand Down Expand Up @@ -73,9 +73,8 @@ def get_completed_rollout_metrics(self):
return completed

def start(self):
logdir = get_tensorflow_log_dir(self.logdir)
summary_writer = tf.summary.FileWriter(
os.path.join(logdir, "agent_%d" % self.id))
os.path.join(self.logdir, "agent_%d" % self.id))
self.summary_writer = summary_writer
self.runner.start_runner(self.policy.sess, summary_writer)

Expand Down
146 changes: 72 additions & 74 deletions python/ray/rllib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,64 +24,6 @@
logger.setLevel(logging.INFO)


def get_tensorflow_log_dir(logdir):
if logdir.startswith("s3"):
print("WARNING: TensorFlow logging to S3 not supported by"
"TensorFlow, logging to /tmp/ray/ instead")
logdir = "/tmp/ray/"
if not os.path.exists(logdir):
os.makedirs(logdir)
return logdir


class RLLibEncoder(json.JSONEncoder):

def __init__(self, nan_str="null", **kwargs):
super(RLLibEncoder, self).__init__(**kwargs)
self.nan_str = nan_str

def iterencode(self, o, _one_shot=False):
if self.ensure_ascii:
_encoder = json.encoder.encode_basestring_ascii
else:
_encoder = json.encoder.encode_basestring

def floatstr(o, allow_nan=self.allow_nan, nan_str=self.nan_str):
return repr(o) if not np.isnan(o) else nan_str

_iterencode = json.encoder._make_iterencode(
None, self.default, _encoder, self.indent, floatstr,
self.key_separator, self.item_separator, self.sort_keys,
self.skipkeys, _one_shot)
return _iterencode(o, 0)

def default(self, value):
if np.isnan(value):
return None
if np.issubdtype(value, float):
return float(value)
if np.issubdtype(value, int):
return int(value)


class RLLibLogger(object):
"""Writing small amounts of data to S3 with real-time updates.
"""

def __init__(self, uri):
self.result_buffer = StringIO.StringIO()
self.uri = uri

def write(self, b):
# TODO(pcm): At the moment we are writing the whole results output from
# the beginning in each iteration. This will write O(n^2) bytes where n
# is the number of bytes printed so far. Fix this! This should at least
# only write the last 5MBs (S3 chunksize).
with smart_open.smart_open(self.uri, "w") as f:
self.result_buffer.write(b)
f.write(self.result_buffer.getvalue())


TrainingResult = namedtuple("TrainingResult", [
# Unique string identifier for this experiment. This id is preserved
# across checkpoint / restore calls.
Expand Down Expand Up @@ -128,16 +70,18 @@ class Agent(object):
"""

def __init__(
self, env_creator, config, upload_dir='/tmp/ray', agent_id=''):
self, env_creator, config, local_dir='/tmp/ray',
upload_dir=None, agent_id=''):
"""Initialize an RLLib agent.

Args:
env_creator (str|func): Name of the OpenAI gym environment to train
against, or a function that creates such an env.
config (obj): Algorithm-specific configuration data.
upload_dir (str): Root directory into which the output directory
should be placed. Can be local like /tmp/ray/ or on S3
like s3://bucketname/.
local_dir (str): Directory where results and temporary files will
be placed.
upload_dir (str): Optional remote URI like s3://bucketname/ where
results will be uploaded.
agent_id (str): Optional unique identifier for this agent, used
to determine where to store results.
"""
Expand All @@ -154,32 +98,34 @@ def __init__(
self.config.update({"env_name": env_name})
self.config.update({"alg": self._agent_name})
self.config.update({"agent_id": agent_id})

logdir_prefix = "{}_{}_{}".format(
env_name,
self.__class__.__name__,
agent_id or datetime.today().strftime("%Y-%m-%d_%H-%M-%S"))

if upload_dir.startswith("/"):
if not os.path.exists(upload_dir):
os.makedirs(upload_dir)
self.logdir = tempfile.mkdtemp(
prefix=logdir_prefix, dir=local_dir)
else:
self.logdir = os.path.join(upload_dir, logdir_prefix)
if not os.path.exists(local_dir):
os.makedirs(local_dir)

self.logdir = tempfile.mkdtemp(
prefix=logdir_prefix, dir=local_dir)

if not os.path.exists(self.logdir):
os.makedirs(self.logdir)
if upload_dir:
log_upload_uri = os.path.join(upload_dir, logdir_prefix)
else:
log_upload_uri = None

# TODO(ekl) consider inlining config into the result jsons
config_out = os.path.join(self.logdir, "config.json")
with open(config_out, "w") as f:
json.dump(self.config, f, sort_keys=True, cls=RLLibEncoder)
logger.info(
"%s algorithm created with logdir '%s'",
self.__class__.__name__, self.logdir)
"%s algorithm created with logdir '%s' and upload uri '%s'",
self.__class__.__name__, self.logdir, log_upload_uri)

self._result_logger = RLLibLogger(
os.path.join(self.logdir, "result.json"))
os.path.join(self.logdir, "result.json"),
log_upload_uri and os.path.join(log_upload_uri, "result.json"))
self._file_writer = tf.summary.FileWriter(self.logdir)

self._iteration = 0
Expand Down Expand Up @@ -305,3 +251,55 @@ def _restore(self):
"""Subclasses should override this to implement restore()."""

raise NotImplementedError


class RLLibEncoder(json.JSONEncoder):

def __init__(self, nan_str="null", **kwargs):
super(RLLibEncoder, self).__init__(**kwargs)
self.nan_str = nan_str

def iterencode(self, o, _one_shot=False):
if self.ensure_ascii:
_encoder = json.encoder.encode_basestring_ascii
else:
_encoder = json.encoder.encode_basestring

def floatstr(o, allow_nan=self.allow_nan, nan_str=self.nan_str):
return repr(o) if not np.isnan(o) else nan_str

_iterencode = json.encoder._make_iterencode(
None, self.default, _encoder, self.indent, floatstr,
self.key_separator, self.item_separator, self.sort_keys,
self.skipkeys, _one_shot)
return _iterencode(o, 0)

def default(self, value):
if np.isnan(value):
return None
if np.issubdtype(value, float):
return float(value)
if np.issubdtype(value, int):
return int(value)


class RLLibLogger(object):
"""Writing small amounts of data to S3 with real-time updates.
"""

def __init__(self, local_file, uri=None):
self.local_out = open(local_file, "w")
self.result_buffer = StringIO.StringIO()
self.uri = uri

def write(self, b):
self.local_out.write(b)
self.local_out.flush()
# TODO(pcm): At the moment we are writing the whole results output from
# the beginning in each iteration. This will write O(n^2) bytes where n
# is the number of bytes printed so far. Fix this! This should at least
# only write the last 5MBs (S3 chunksize).
if self.uri:
with smart_open.smart_open(self.uri, "w") as f:
self.result_buffer.write(b)
f.write(self.result_buffer.getvalue())
5 changes: 2 additions & 3 deletions python/ray/rllib/ppo/ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from tensorflow.python import debug as tf_debug

import ray
from ray.rllib.common import Agent, TrainingResult, get_tensorflow_log_dir
from ray.rllib.common import Agent, TrainingResult
from ray.rllib.ppo.runner import Runner, RemoteRunner
from ray.rllib.ppo.rollout import collect_samples
from ray.rllib.ppo.utils import shuffle
Expand Down Expand Up @@ -94,9 +94,8 @@ def _init(self):
for _ in range(self.config["num_workers"])]
self.start_time = time.time()
if self.config["write_logs"]:
logdir = get_tensorflow_log_dir(self.logdir)
self.file_writer = tf.summary.FileWriter(
logdir, self.model.sess.graph)
self.logdir, self.model.sess.graph)
else:
self.file_writer = None
self.saver = tf.train.Saver(max_to_keep=None)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/rllib/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
help="The number of training iterations to run.")
parser.add_argument("--config", default="{}", type=str,
help="The configuration options of the algorithm.")
parser.add_argument("--upload-dir", default="file:///tmp/ray", type=str,
help="Where the traces are stored.")
parser.add_argument("--upload-dir", default=None, type=str,
help="Where the traces are uploaded (optional).")
parser.add_argument("--checkpoint-freq", default=sys.maxsize, type=int,
help="How many iterations between checkpoints.")
parser.add_argument("--restore", default="", type=str,
Expand Down