Skip to content

Commit c5dfa9d

Browse files
committed
Completely sort out order of initial Keras and/or TensorFlow init msgs
Dont forget about stderr!
1 parent 5f0cb09 commit c5dfa9d

File tree

3 files changed

+49
-5
lines changed

3 files changed

+49
-5
lines changed

plasma/models/builder.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
from __future__ import division
1+
from __future__ import division, print_function
2+
import plasma.global_vars as g
3+
# KGF: the first time Keras is ever imported via mpi_learn.py -> mpi_runner.py
4+
import keras.backend as K
5+
# KGF: see below synchronization--- output is launched here
26
from keras.models import Sequential, Model
37
from keras.layers import Input
48
from keras.layers.core import (
@@ -14,8 +18,6 @@
1418
from keras.callbacks import Callback
1519
from keras.regularizers import l2 # l1, l1_l2
1620

17-
import keras.backend as K
18-
1921
import re
2022
import os
2123
import sys
@@ -24,6 +26,23 @@
2426
from plasma.utils.downloading import makedirs_process_safe
2527
from plasma.utils.hashing import general_object_hash
2628

29+
# Synchronize 2x stderr msg from TensorFlow initialization via Keras backend
30+
# "Succesfully opened dynamic library... libcudart" "Using TensorFlow backend."
31+
if g.comm is not None:
32+
g.flush_all_inorder()
33+
# if g.comm is not None:
34+
# g.comm.Barrier()
35+
# if g.task_index == 0:
36+
# sys.stdout.flush()
37+
# sys.stderr.flush()
38+
# if g.comm is not None:
39+
# g.comm.Barrier()
40+
# TODO(KGF): need to create wrapper .py file (or place in some __init__.py)
41+
# that detects, for an arbitrary import, if tensorflow has been initialized
42+
# either directly from "import tensorflow ..." and/or via backend of
43+
# "from keras.layers ..."
44+
# OR if this is the first time. See below "first_time" variable.
45+
2746

2847
class LossHistory(Callback):
2948
def on_train_begin(self, logs=None):
@@ -262,6 +281,8 @@ def slicer_output_shape(input_shape, indices):
262281
x_out = Dense(1, activation=output_activation)(x_in)
263282
model = Model(inputs=x_input, outputs=x_out)
264283
# bug with tensorflow/Keras
284+
# TODO(KGF): what is this bug? this is the only direct "tensorflow"
285+
# import outside of mpi_runner.py and runner.py
265286
if (conf['model']['backend'] == 'tf'
266287
or conf['model']['backend'] == 'tensorflow'):
267288
first_time = "tensorflow" not in sys.modules

plasma/models/mpi_runner.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
from plasma.utils.performance import PerformanceAnalyzer
55
from plasma.utils.processing import concatenate_sublists
66
from plasma.utils.evaluation import get_loss_from_list
7+
# KGF: this is the first module that imports Keras:
78
from plasma.models import builder
89
from plasma.models.loader import ProcessGenerator
910
from plasma.utils.state_reset import reset_states
11+
# KGF: plasma.conf calls print_unique() for "Selected signals". Ensure that
12+
# Keras "Using TensorFlow backend" stderr messages do not interfere in stdout
1013
from plasma.conf import conf
1114
from mpi4py import MPI
1215
from pkg_resources import parse_version, get_distribution
@@ -39,6 +42,7 @@
3942
import socket
4043
sys.setrecursionlimit(10000)
4144

45+
# TODO(KGF): remove the next 3 lines?
4246
# import keras sequentially because it otherwise reads from ~/.keras/keras.json
4347
# with too many threads:
4448
# from mpi_launch_tensorflow import get_mpi_task_index
@@ -49,6 +53,8 @@
4953
# g.init_MPI()
5054
# TODO(KGF): set "mpi_initialized" global bool flag?
5155

56+
g.flush_all_inorder() # see above about conf_parser.py stdout writes
57+
5258
# initialization code for mpi_runner.py module:
5359
if g.backend == 'tf' or g.backend == 'tensorflow':
5460
if g.NUM_GPUS > 1:
@@ -67,11 +73,20 @@
6773
import tensorflow.compat.v1 as tf
6874
else:
6975
import tensorflow as tf
76+
# TODO(KGF): above, builder.py (bug workaround), mpi_launch_tensorflow.py,
77+
# and runner.py are the only files that import tensorflow directly
78+
7079
from keras.backend.tensorflow_backend import set_session
80+
# KGF: next 3 lines dump many TensorFlow diagnostics to stderr.
81+
# All MPI ranks first "Successfully opened dynamic library libcuda"
82+
# then, one by one: ID GPU, libcudart, libcublas, libcufft, ...
83+
# Finally, "Device interconnect StreamExecutor with strength 1 edge matrix"
7184
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.95,
7285
allow_growth=True)
7386
config = tf.ConfigProto(gpu_options=gpu_options)
7487
set_session(tf.Session(config=config))
88+
g.flush_all_inorder()
89+
g.comm.Barrier()
7590
else:
7691
os.environ['KERAS_BACKEND'] = 'theano'
7792
base_compile_dir = '{}/tmp/{}-{}'.format(
@@ -90,8 +105,9 @@
90105
from keras.utils.generic_utils import Progbar
91106
import keras.callbacks as cbks
92107

93-
94108
g.pprint_unique(conf)
109+
g.flush_all_inorder()
110+
g.comm.Barrier()
95111

96112

97113
class MPIOptimizer(object):

plasma/models/targets.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import print_function
2+
import plasma.global_vars as g
13
import numpy as np
24
import abc
35

@@ -7,9 +9,14 @@
79
)
810
import keras.backend as K
911
from keras.losses import hinge
10-
# Requirement: larger value must mean disruption more likely.
12+
13+
# synchronize output from TensorFlow initialization via Keras backend
14+
if g.comm is not None:
15+
g.flush_all_inorder()
16+
g.comm.Barrier()
1117

1218

19+
# Requirement: larger value must mean disruption more likely.
1320
class Target(object):
1421
activation = 'linear'
1522
loss = 'mse'

0 commit comments

Comments
 (0)