Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 106 additions & 36 deletions tensorboard/backend/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
# ==============================================================================
"""TensorBoard WSGI Application Logic.

TensorBoardApplication constructs TensorBoard as a WSGI application.
It handles serving static assets, and implements TensorBoard data APIs.
Provides TensorBoardWSGIApp for building a TensorBoard WSGI app.
"""

from __future__ import absolute_import
Expand Down Expand Up @@ -107,64 +106,98 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider):
:type plugin_loaders: list[base_plugin.TBLoader]
:rtype: TensorBoardWSGI
"""
event_file_active_filter = _get_event_file_active_filter(flags)
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=DEFAULT_SIZE_GUIDANCE,
tensor_size_guidance=tensor_size_guidance_from_flags(flags),
purge_orphaned_data=flags.purge_orphaned_data,
max_reload_threads=flags.max_reload_threads,
event_file_active_filter=event_file_active_filter)
if flags.generic_data == 'false':
data_provider = None
else:
data_provider = event_data_provider.MultiplexerDataProvider(multiplexer)
loading_multiplexer = multiplexer
data_provider = None
multiplexer = None
reload_interval = flags.reload_interval
db_uri = flags.db
db_connection_provider = None
# For DB import mode, create a DB file if we weren't given one.
if flags.db_import and not flags.db:
tmpdir = tempfile.mkdtemp(prefix='tbimport')
atexit.register(shutil.rmtree, tmpdir)
db_uri = 'sqlite:%s/tmp.sqlite' % tmpdir
if flags.db_import:
# DB import mode.
logger.info('Importing logdir into DB at %s', db_uri)
db_uri = flags.db
# Create a temporary DB file if we weren't given one.
if not db_uri:
tmpdir = tempfile.mkdtemp(prefix='tbimport')
atexit.register(shutil.rmtree, tmpdir)
db_uri = 'sqlite:%s/tmp.sqlite' % tmpdir
db_connection_provider = create_sqlite_connection_provider(db_uri)
loading_multiplexer = db_import_multiplexer.DbImportMultiplexer(
logger.info('Importing logdir into DB at %s', db_uri)
multiplexer = db_import_multiplexer.DbImportMultiplexer(
db_uri=db_uri,
db_connection_provider=db_connection_provider,
purge_orphaned_data=flags.purge_orphaned_data,
max_reload_threads=flags.max_reload_threads)
elif flags.db:
# DB read-only mode, never load event logs.
reload_interval = -1
db_connection_provider = create_sqlite_connection_provider(db_uri)
db_connection_provider = create_sqlite_connection_provider(flags.db)
multiplexer = _DbModeMultiplexer(flags.db, db_connection_provider)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s a little funny for a non-functional multiplexer to exist in the
TBContext: currently plugins case on DB mode by saying, “if I have a
DB connection provider, use DB mode, else use multiplexer mode”, but
they could just as well say, “if I have a multiplexer, use multiplexer
mode, else use DB mode”—but that will no longer work. Concern?

(See also comment on _DbModeMultiplexer.__init__ below.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is basically that I don't trust plugins to all defensively check whether multiplexer is not None. Some of them do this, but inconsistently. For example, the projector plugin checks that self.multiplexer is not None in only 2 out of 3 uses. The profile plugin doesn't check at all.

So rather than trying to go through all the existing plugins to ensure they can handle a non-existent multiplexer, I figured I'd ensure that there is always a dummy placeholder multiplexer that just returns empty data instead.

I realize this isn't that clean, but our near-term goal is to refactor away all the multiplexer-calling code anyway, so I was reluctant to spend much time improving it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it; that this is all slated to be removed soon changes the equation.
This is fine with me, then.

else:
# Regular logdir loading mode.
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=DEFAULT_SIZE_GUIDANCE,
tensor_size_guidance=tensor_size_guidance_from_flags(flags),
purge_orphaned_data=flags.purge_orphaned_data,
max_reload_threads=flags.max_reload_threads,
event_file_active_filter=_get_event_file_active_filter(flags))
if flags.generic_data != 'false':
data_provider = event_data_provider.MultiplexerDataProvider(multiplexer)

if reload_interval >= 0:
# We either reload the multiplexer once when TensorBoard starts up, or we
# continuously reload the multiplexer.
path_to_run = parse_event_files_spec(flags.logdir)
start_reloading_multiplexer(
multiplexer, path_to_run, reload_interval, flags.reload_task)
return TensorBoardWSGIApp(
flags, plugin_loaders, data_provider, assets_zip_provider, multiplexer)


def TensorBoardWSGIApp(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take this opportunity to give this function a better name? Its
casing suggests that it’s a type, but it’s a function; and it’s not
clear from the name how this differs from class TensorBoardWSGI.

Perhaps something like build_tensorboard_wsgi connotes its purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plan (in #2573, one of the WSGI cleanup items) was to convert this into the actual plugin class and remove TensorBoardWSGI. This is blocked on updating the plugin tests so they don't need it as a shim, which I was going to do as a cleanup task to avoid blocking this PR.

So that's why I left it named in class style.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

flags,
plugins,
data_provider=None,
assets_zip_provider=None,
deprecated_multiplexer=None):
"""Constructs a TensorBoard WSGI app from plugins and data providers.

Args:
flags: An argparse.Namespace containing TensorBoard CLI flags.
plugins: A list of TBLoader subclasses for the plugins to load.
assets_zip_provider: See TBContext documentation for more information.
data_provider: Instance of `tensorboard.data.provider.DataProvider`. May
be `None` if `flags.generic_data` is set to `"false"` in which case
`deprecated_multiplexer` must be passed instead.
deprecated_multiplexer: Optional `plugin_event_multiplexer.EventMultiplexer`
to use for any plugins not yet enabled for the DataProvider API.
Required if the data_provider argument is not passed.

Returns:
A WSGI application that implements the TensorBoard backend.
"""
db_uri = None
db_connection_provider = None
if isinstance(
deprecated_multiplexer,
(db_import_multiplexer.DbImportMultiplexer, _DbModeMultiplexer)):
db_uri = deprecated_multiplexer.db_uri
db_connection_provider = deprecated_multiplexer.db_connection_provider
plugin_name_to_instance = {}
context = base_plugin.TBContext(
data_provider=data_provider,
db_connection_provider=db_connection_provider,
db_uri=db_uri,
flags=flags,
logdir=flags.logdir,
multiplexer=multiplexer,
multiplexer=deprecated_multiplexer,
assets_zip_provider=assets_zip_provider,
plugin_name_to_instance=plugin_name_to_instance,
window_title=flags.window_title)
plugins = []
for loader in plugin_loaders:
tbplugins = []
for loader in plugins:
plugin = loader.load(context)
if plugin is None:
continue
plugins.append(plugin)
tbplugins.append(plugin)
plugin_name_to_instance[plugin.plugin_name] = plugin

if reload_interval >= 0:
# We either reload the multiplexer once when TensorBoard starts up, or we
# continuously reload the multiplexer.
path_to_run = parse_event_files_spec(flags.logdir)
start_reloading_multiplexer(
loading_multiplexer, path_to_run, reload_interval, flags.reload_task)
return TensorBoardWSGI(plugins, flags.path_prefix)
return TensorBoardWSGI(tbplugins, flags.path_prefix)


class TensorBoardWSGI(object):
Expand Down Expand Up @@ -531,3 +564,40 @@ def _get_event_file_active_filter(flags):
if inactive_secs < 0:
return lambda timestamp: True
return lambda timestamp: timestamp + inactive_secs >= time.time()


class _DbModeMultiplexer(event_multiplexer.EventMultiplexer):
"""Shim EventMultiplexer to use when in read-only DB mode.

In read-only DB mode, the EventMultiplexer is nonfunctional - there is no
logdir to reload, and the data is all exposed via SQL. This class represents
the do-nothing EventMultiplexer for that purpose, which serves only as a
conduit for DB-related parameters.

The load APIs raise exceptions if called, and the read APIs always
return empty results.
"""
def __init__(self, db_uri, db_connection_provider):
"""Constructor for `_DbModeMultiplexer`.

Args:
db_uri: A URI to the database file in use.
db_connection_provider: Provider function for creating a DB connection.
"""
logger.info('_DbModeMultiplexer initializing for %s', db_uri)
super(_DbModeMultiplexer, self).__init__()
self.db_uri = db_uri
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this serve as a conduit for DB-related parameters? IIUC nothing
uses this curently—the URI and connection provider are always pulled off
the context, not the multiplexer.

Oof, I see: there’s a getattr, which is why my greps turned up
nothing. What do you think of either explicitly mentioning that these
are read in TensorBoardWSGIApp, or changing that logic above to be
something like

db_uri = None
db_connection_provider = None
if isinstance(deprecated_multiplexer, _DbModeMultiplexer):
  db_uri = deprecated_multiplexer.db_uri
  db_connection_provider = deprecated_multiplexer.db_connection_provider
  deprecated_multiplexer = None  # don't pass no-op muxer into `TBContext`

to also resolve my gripe about including a non-functional multiplexer in
the context at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above response about the non-functional multiplexer.

I can change the getattr to two different isinstance checks (one per db multiplexer type) if you'd prefer that. I was not trying to be terribly principled since this code is all going away soon, but I'm happy to make that change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced getattrs with isinstance.

self.db_connection_provider = db_connection_provider
logger.info('_DbModeMultiplexer done initializing')

def AddRun(self, path, name=None):
"""Unsupported."""
raise NotImplementedError()

def AddRunsFromDirectory(self, path, name=None):
"""Unsupported."""
raise NotImplementedError()

def Reload(self):
"""Unsupported."""
raise NotImplementedError()
1 change: 1 addition & 0 deletions tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ py_library(
deps = [
":directory_watcher",
":event_file_loader",
":event_multiplexer",
":io_wrapper",
":sqlite_writer",
"//tensorboard:data_compat",
Expand Down
24 changes: 18 additions & 6 deletions tensorboard/backend/event_processing/db_import_multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from tensorboard.backend.event_processing import directory_watcher
from tensorboard.backend.event_processing import event_file_loader
from tensorboard.backend.event_processing import io_wrapper
from tensorboard.backend.event_processing import plugin_event_multiplexer
from tensorboard.backend.event_processing import sqlite_writer
from tensorboard.compat import tf
from tensorboard.compat.proto import event_pb2
Expand All @@ -39,28 +40,35 @@

logger = tb_logging.get_logger()

class DbImportMultiplexer(object):

class DbImportMultiplexer(plugin_event_multiplexer.EventMultiplexer):
"""A loading-only `EventMultiplexer` that populates a SQLite DB.

This EventMultiplexer only loads data; it provides no read APIs.
This EventMultiplexer only loads data; the read APIs always return empty
results, since all data is accessed instead via SQL against the
db_connection_provider wrapped by this multiplexer.
"""

def __init__(self,
db_uri,
db_connection_provider,
purge_orphaned_data,
max_reload_threads):
"""Constructor for `DbImportMultiplexer`.

Args:
db_uri: A URI to the database file in use.
db_connection_provider: Provider function for creating a DB connection.
purge_orphaned_data: Whether to discard any events that were "orphaned" by
a TensorFlow restart.
max_reload_threads: The max number of threads that TensorBoard can use
to reload runs. Each thread reloads one run at a time. If not provided,
reloads runs serially (one after another).
"""
logger.info('DbImportMultiplexer initializing')
self._db_connection_provider = db_connection_provider
logger.info('DbImportMultiplexer initializing for %s', db_uri)
super(DbImportMultiplexer, self).__init__()
self.db_uri = db_uri
self.db_connection_provider = db_connection_provider
self._purge_orphaned_data = purge_orphaned_data
self._max_reload_threads = max_reload_threads
self._event_sink = None
Expand All @@ -70,13 +78,17 @@ def __init__(self,
logger.warn(
'--db_import does not yet support purging orphaned data')

conn = self._db_connection_provider()
conn = self.db_connection_provider()
# Set the DB in WAL mode so reads don't block writes.
conn.execute('PRAGMA journal_mode=wal')
conn.execute('PRAGMA synchronous=normal') # Recommended for WAL mode
sqlite_writer.initialize_schema(conn)
logger.info('DbImportMultiplexer done initializing')

def AddRun(self, path, name=None):
"""Unsupported; instead use AddRunsFromDirectory."""
raise NotImplementedError("Unsupported; use AddRunsFromDirectory()")

def AddRunsFromDirectory(self, path, name=None):
"""Load runs from a directory; recursively walks subdirectories.

Expand Down Expand Up @@ -111,7 +123,7 @@ def Reload(self):
# Defer event sink creation until needed; this ensures it will only exist in
# the thread that calls Reload(), since DB connections must be thread-local.
if not self._event_sink:
self._event_sink = _SqliteWriterEventSink(self._db_connection_provider)
self._event_sink = _SqliteWriterEventSink(self.db_connection_provider)
# Use collections.deque() for speed when we don't need blocking since it
# also has thread-safe appends/pops.
loader_queue = collections.deque(six.itervalues(self._run_loaders))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def setUp(self):
db_file_name = os.path.join(self.get_temp_dir(), 'db')
self.db_connection_provider = lambda: sqlite3.connect(db_file_name)
self.multiplexer = db_import_multiplexer.DbImportMultiplexer(
db_uri='sqlite:' + db_file_name,
db_connection_provider=self.db_connection_provider,
purge_orphaned_data=False,
max_reload_threads=1)
Expand Down Expand Up @@ -150,6 +151,14 @@ def test_manual_name(self):
self.assertEqual(self._get_runs(), [os.path.join('some', 'nested', 'name'),
os.path.join('some', 'nested', 'name')])

def test_empty_read_apis(self):
path = self.get_temp_dir()
add_event(path)
self.assertEmpty(self.multiplexer.Runs())
self.multiplexer.AddRunsFromDirectory(path)
self.multiplexer.Reload()
self.assertEmpty(self.multiplexer.Runs())


if __name__ == '__main__':
tf.test.main()