Skip to content

Commit

Permalink
add tensorboard summary
Browse files Browse the repository at this point in the history
  • Loading branch information
phuocphn committed Oct 4, 2019
1 parent 4ddf264 commit afea234
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 43 deletions.
94 changes: 59 additions & 35 deletions a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, env, worker_task_index, sess=None):
self.env = env
self.sess = sess
self.is_chief = (worker_task_index==0)
self.worker_task_index = worker_task_index

# we will definite network and all necessary operations in here.

Expand All @@ -18,7 +19,7 @@ def __init__(self, env, worker_task_index, sess=None):
ps_tasks=1, ps_device="/job:ps",
worker_device="/job:worker/task:{}/cpu:0".format(worker_task_index))):
with tf.variable_scope("global", reuse=None):
self.global_network = CNNLSTMPolicy(state_shape = [84, 84, 3], num_action=5) #NOTE: get state_shape from env.observation_space later.
self.global_network = CNNLSTMPolicy(state_shape = [160, 120, 3], num_action=env.action_space.n) #NOTE: get state_shape from env.observation_space later.
self.global_step = tf.get_variable(name="global_step",
shape=[],
dtype=tf.int32,
Expand All @@ -31,46 +32,62 @@ def __init__(self, env, worker_task_index, sess=None):
# define local network in local worker (`local network weights` and `local step`)
with tf.device(device_name_or_function="/job:worker/task:{}/cpu:0".format(worker_task_index)):
with tf.variable_scope("local", reuse=None):
self.local_network = CNNLSTMPolicy(state_shape = [84, 84, 3], num_action=5)
self.local_step = self.global_step
self.local_network = CNNLSTMPolicy(state_shape = [160, 120, 3], num_action=env.action_space.n)
self.local_network.global_step = self.global_step
#NOTE: #ICM, we will implement this later
#self.local_prediction_network = StateActionPredictor(state_shape = env.observation_space.shape, num_action=env.action_space.n)


self.actions = tf.placeholder(dtype=tf.float32, shape=[None, 5], name="actions") #NOTE: get shape from env.action_space.n later.
self.advantages = tf.placeholder(dtype=tf.float32, shape=[None], name="advantages")
self.rewards = tf.placeholder(dtype=tf.float32, shape=[None], name="rewards")
self.actions = tf.placeholder(dtype=tf.float32, shape=[None, 5], name="actions") #NOTE: get shape from env.action_space.n later.
self.advantages = tf.placeholder(dtype=tf.float32, shape=[None], name="advantages")
self.rewards = tf.placeholder(dtype=tf.float32, shape=[None], name="rewards")

# https://discuss.pytorch.org/t/what-is-the-difference-between-log-softmax-and-softmax/11801
probs = tf.nn.softmax(self.local_network.logits)
# https://discuss.pytorch.org/t/what-is-the-difference-between-log-softmax-and-softmax/11801
probs = tf.nn.softmax(self.local_network.logits)

policy_loss = -tf.reduce_mean(input_tensor= tf.reduce_sum(tf.log(probs) * self.actions, axis=1) * self.advantages) #scalar value
value_function_loss = 0.5 * tf.reduce_mean(tf.square(self.local_network.value_function - self.rewards))
entropy_loss = -tf.reduce_mean(tf.reduce_sum(probs * tf.log(probs), axis=1)) #element-wise multiplication
self.loss = policy_loss + 0.5 * value_function_loss - entropy_loss * 0.01
policy_loss = -tf.reduce_mean(input_tensor= tf.reduce_sum(tf.log(probs) * self.actions, axis=1) * self.advantages) #scalar value
value_function_loss = 0.5 * tf.reduce_mean(tf.square(self.local_network.value_function - self.rewards))
entropy_loss = -tf.reduce_mean(tf.reduce_sum(probs * tf.log(probs), axis=1)) #element-wise multiplication
self.loss = policy_loss + 0.5 * value_function_loss - entropy_loss * 0.01

gradients = tf.gradients(self.loss, tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="local"))
# NOTE: #ICM, we will implement this later
#self.prediction_network_gradients = 0.01 * self.local_prediction_network.
gradients = tf.gradients(self.loss, tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="local"))
# NOTE: #ICM, we will implement this later
#self.prediction_network_gradients = 0.01 * self.local_prediction_network.

gradients, gradient_norms = tf.clip_by_global_norm(t_list=gradients,clip_norm=40.0)
grads_and_vars = list(zip(gradients, tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="global")))
tf.summary.scalar("model/policy_loss", policy_loss )
tf.summary.scalar("model/value_loss", value_function_loss )
tf.summary.scalar("model/entropy", entropy_loss)
tf.summary.scalar("model/reward_mean", tf.math.reduce_mean(self.rewards))
tf.summary.scalar("model/grad_global_norm", tf.global_norm(gradients))
tf.summary.scalar("model/variable_global_norm", tf.global_norm(tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="local")))

optimizer = tf.train.AdamOptimizer(learning_rate=float(1e-4))
self.train_op = tf.group(optimizer.apply_gradients(grads_and_vars=grads_and_vars, global_step=self.global_step.assign_add(1))) #NOTE: 1 or shape[0]
self.summary_op = tf.summary.merge_all()

# copy weights from the parameter server to the local model.
sync_assigns = [local_var.assign(global_var) for local_var, global_var in zip(
tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="local"),
tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="global")
)]
self.sync_weights_op = tf.group(*sync_assigns)
gradients, gradient_norms = tf.clip_by_global_norm(t_list=gradients,clip_norm=40.0)
grads_and_vars = list(zip(gradients, tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="global")))

def preprocess(self, img, resolution=(84, 84)):
optimizer = tf.train.AdamOptimizer(learning_rate=float(1e-4))
self.train_op = tf.group(optimizer.apply_gradients(grads_and_vars=grads_and_vars,
global_step=self.global_step.assign_add( 1)))

# copy weights from the parameter server to the local model.
sync_assigns = [local_var.assign(global_var) for local_var, global_var in zip(
tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="local"),
tf.get_collection(key=tf.GraphKeys.TRAINABLE_VARIABLES, scope="global")
)]
self.sync_weights_op = tf.group(*sync_assigns)

self.summary_writer = None
self.local_step = 0

def preprocess(self, img, resolution=(160, 120)):
return np.asarray(skimage.transform.resize(img, resolution).astype(np.float32))

def train(self, sess):
def provide_context(self, sess, summary_writer):
self.sess = sess
self.summary_writer = summary_writer

def train(self, sess, summary_writer):

# sync weights from global target network
self.sess.run(self.sync_weights_op)
Expand Down Expand Up @@ -116,23 +133,30 @@ def train(self, sess):
[batch_states, batch_actions, batch_advantages, batch_rewards, terminal] = episode_rollout.get_training_batch()

if not terminal:
print ("ingore ....." * 100)
print ("*" * 100)
print ("ignore.")
return

if self.is_chief:
pass
should_write_summary = (self.is_chief and self.local_step % 10 == 0)
if should_write_summary:
fetches = [self.train_op, self.global_step]
else:
fetches = [self.train_op, self.global_step]

fetches = [self.train_op, self.global_step]
feed_dict = {
self.local_network.inputs: batch_states,
self.advantages: batch_advantages,
self.actions: batch_actions,
self.rewards: batch_rewards,
}
fetched = self.sess.run(fetches, feed_dict)
if terminal:
print (f"Global step counter: {fetched[-1]}")
print ("Train ...")
fetched = self.sess.run(fetches, feed_dict=feed_dict)
self.local_step += 1

if should_write_summary:
summary = sess.run(self.summary_op, feed_dict=feed_dict)
self.summary_writer.add_summary(summary, fetched[-1])
self.summary_writer.flush()
print (f"*** Worker {self.worker_task_index} at local step: {self.local_step}, reward_mean: {np.mean(batch_rewards)}")

class EpisodeRollout(object):
def __init__(self):
Expand Down
2 changes: 1 addition & 1 deletion extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, state_shape, num_action):
"""

# https://github.com/mwydmuch/ViZDoom/blob/b50fcd26ffeebb07d9527c8b951976907ef2acfe/examples/python/learning_tensorflow.py
self.inputs = tf.placeholder(dtype=tf.float32, shape=[None] + state_shape, name="input")
self.inputs = tf.placeholder(dtype=tf.float32, shape=[None] + state_shape, name="inputs")

conv_1 = tf.contrib.layers.convolution2d(self.inputs, num_outputs=32, kernel_size=[3, 3], stride=[2, 2],
activation_fn=tf.nn.elu,
Expand Down
18 changes: 11 additions & 7 deletions train.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
LOG_DIR = '/tmp/mspacman-v0'
ENV_ID = 'MsPacman-v0'
NUM_WORKERS = 20
TOTAL_TRAINING_STEP = 10000 # this is total step and is used for all workers.
TOTAL_TRAINING_STEP = 1000000 # this is total step and is used for all workers.

env = gym.make('MsPacman-v0')
print ("*" * 50)
Expand All @@ -30,17 +30,17 @@
print ("*" * 50)
time.sleep(5)

cluster = tf.train.ClusterSpec({"ps": ["localhost:12200"], "worker": ["localhost:12300", "localhost:12301"]})
cluster = tf.train.ClusterSpec({"ps": ["localhost:12200"], "worker": ["localhost:12300", "localhost:12301", "localhost:12302", "localhost:12303"]})
if JOB_NAME == 'ps':
os.system("kill -9 $( lsof -i:12200 -t ) > /dev/null 2>&1")
#os.system("kill -9 $( lsof -i:12200 -t ) > /dev/null 2>&1")
server = tf.train.Server(server_or_cluster_def=cluster, job_name=JOB_NAME, task_index=0,
config=tf.ConfigProto(device_filters=["/job:ps"]))
print ("Parameter server is starting...")
server.join()

if JOB_NAME == 'worker':
# Create server obj to get managed_session, and then train agent.
os.system("kill -9 $( lsof -i:12300-12301 -t ) > /dev/null 2>&1")
#os.system("kill -9 $( lsof -i:12300-12301 -t ) > /dev/null 2>&1")
server = tf.train.Server(server_or_cluster_def=cluster, job_name=JOB_NAME, task_index=TASK_INDEX,
config=tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=2))

Expand All @@ -53,27 +53,31 @@
selected_variables_init_op = tf.variables_initializer(selected_variables)

saver = tf.train.Saver(var_list=selected_variables)
summary_writer = tf.summary.FileWriter(LOG_DIR + "__%d" % TASK_INDEX)

supervisor = tf.train.Supervisor(is_chief=(JOB_NAME=='worker' and TASK_INDEX==0),
logdir=LOG_DIR,
saver=saver,
init_op=selected_variables_init_op,
summary_writer=summary_writer,
summary_op=None,
ready_op=tf.report_uninitialized_variables(selected_variables),
global_step=trainer.global_step,
save_model_secs=30 # Number of seconds between the creation of model checkpoints. Defaults to 600 seconds. Pass 0 to disable checkpoints.
)
with supervisor.managed_session(master=server.target,
config=tf.ConfigProto(device_filters=["/job:ps", f"/job:worker/task:{TASK_INDEX}/cpu:0"])) as sess:
config=tf.ConfigProto(device_filters=["/job:ps", f"/job:worker/task:{TASK_INDEX}/cpu:0"])) as sess, sess.as_default():

if PRETRAIN_MODEL_PATH:
saver.restore(sess=sess, save_path=tf.train.latest_checkpoint(PRETRAIN_MODEL_PATH))

#sess.run(tf.global_variables_initializer())
sess.run(trainer.sync_weights_op)
trainer.provide_context(sess=sess, summary_writer=summary_writer)
global_step = sess.run(trainer.global_step) # training_step is put in parameter server.
print (f"Worker: {JOB_NAME + ':'+ str(TASK_INDEX) } start training at global step: {str(global_step)}")

while not supervisor.should_stop() and global_step < TOTAL_TRAINING_STEP:
trainer.train(sess=sess)
trainer.train(sess=sess, summary_writer=summary_writer)
global_step = sess.run(trainer.global_step)

# Ask for all the services to stop.
Expand Down

0 comments on commit afea234

Please sign in to comment.