Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 79 additions & 137 deletions tensorboard/backend/event_processing/directory_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,62 @@
from __future__ import division
from __future__ import print_function

import bisect

from tensorboard.backend.event_processing import io_wrapper
from tensorboard.compat import tf
from tensorboard.util import tb_logging


logger = tb_logging.get_logger()


class _EventPathLoader(object):
"""Simple wrapper for loading events from a path."""

def __init__(self, path, loader_factory):
self._path = path
self._next_event = None
self._loader = loader_factory(path)

def _NextEventStep(self):
if not self._next_event:
return int(1e30)
return self._next_event.step

def _NextEventWallTime(self):
if not self._next_event:
return int(1e30)
return self._next_event.wall_time

def NextEventSortKey(self):
"""Sort by step, then secondarily by wall time."""
return (self._NextEventStep(), self._NextEventWallTime())

def PopNextEvent(self):
"""Pops the next event. Must call PrepareNextEvent() first."""
next_event = self._next_event
self._next_event = None
return next_event

def PrepareNextEvent(self):
"""Loads the next event. Returns True if it was successfully loaded."""
if not self._next_event:
self._next_event = next(self._loader.Load(), None)
return self._next_event is not None


# The number of paths before the current one to check for out of order writes.
_OOO_WRITE_CHECK_COUNT = 20


class DirectoryWatcher(object):
"""A DirectoryWatcher wraps a loader to load from a sequence of paths.

A loader reads a path and produces some kind of values as an iterator. A
DirectoryWatcher takes a directory, a factory for loaders, and optionally a
path filter and watches all the paths inside that directory.

This class is only valid under the assumption that only one path will be
written to by the data source at a time and that once the source stops writing
to a path, it will start writing to a new path that's lexicographically
greater and never come back. It uses some heuristics to check whether this is
true based on tracking changes to the files' sizes, but the check can have
false negatives. However, it should have no false positives.
This class returns events from multiple files, preferring to return events
with smaller step counts first.
"""

def __init__(self, directory, loader_factory, path_filter=lambda x: True):
Expand All @@ -60,23 +94,20 @@ def __init__(self, directory, loader_factory, path_filter=lambda x: True):
if loader_factory is None:
raise ValueError('A loader factory is required')
self._directory = directory
self._path = None
# Dict from path name to _EventPathLoader.
self._paths = {}
self._loader_factory = loader_factory
self._loader = None
self._path_filter = path_filter
self._ooo_writes_detected = False
# The file size for each file at the time it was finalized.
self._finalized_sizes = {}
# Step and walltime of previous writes, ordered from earlier events to
# later events.
self._previous_writes = []

def Load(self):
"""Loads new values.

The watcher will load from one path at a time; as soon as that path stops
yielding events, it will move on to the next path. We assume that old paths
are never modified after a newer path has been written. As a result, Load()
can be called multiple times in a row without losing events that have not
been yielded yet. In other words, we guarantee that every event will be
yielded exactly once.
The watcher will load from multiple paths, preferring to return events
with smaller step counts earlier.

Yields:
All values that have not been yielded yet.
Expand All @@ -103,146 +134,57 @@ def _LoadInternal(self):
Yields:
All values that have not been yielded yet.
"""

# If the loader exists, check it for a value.
if not self._loader:
self._InitializeLoader()
self._LoadNewPaths()
if not self._paths:
raise StopIteration

while True:
# Yield all the new events in the path we're currently loading from.
for event in self._loader.Load():
yield event

next_path = self._GetNextPath()
if not next_path:
logger.info('No path found after %s', self._path)
# Current path is empty and there are no new paths, so we're done.
# Yield all the new events in the paths we're currently loading from.
self._LoadNewPaths()
valid_paths = []
for path in self._paths.values():
if path.PrepareNextEvent():
valid_paths.append(path)
if not valid_paths:
logger.info('No new events available in any paths.')
return

# There's a new path, so check to make sure there weren't any events
# written between when we finished reading the current path and when we
# checked for the new one. The sequence of events might look something
# like this:
#
# 1. Event #1 written to path #1.
# 2. We check for events and yield event #1 from path #1
# 3. We check for events and see that there are no more events in path #1.
# 4. Event #2 is written to path #1.
# 5. Event #3 is written to path #2.
# 6. We check for a new path and see that path #2 exists.
#
# Without this loop, we would miss event #2. We're also guaranteed by the
# loader contract that no more events will be written to path #1 after
# events start being written to path #2, so we don't have to worry about
# that.
for event in self._loader.Load():
yield event

logger.info('Directory watcher advancing from %s to %s', self._path,
next_path)

# Advance to the next path and start over.
self._SetPath(next_path)

# The number of paths before the current one to check for out of order writes.
_OOO_WRITE_CHECK_COUNT = 20
next_path = min(
valid_paths,
key=lambda p: p.NextEventSortKey())
next_event_key = next_path.NextEventSortKey()
next_event = next_path.PopNextEvent()
for previous_write in self._previous_writes:
if previous_write > next_event_key:
self._ooo_writes_detected = True
self._previous_writes.append(next_event_key)
if len(self._previous_writes) > _OOO_WRITE_CHECK_COUNT:
self._previous_writes.pop(0)
yield next_event

def OutOfOrderWritesDetected(self):
"""Returns whether any out-of-order writes have been detected.

Out-of-order writes are only checked as part of the Load() iterator. Once an
out-of-order write is detected, this function will always return true.

Note that out-of-order write detection is not performed on GCS paths, so
this function will always return false.

Returns:
Whether any out-of-order write has ever been detected by this watcher.

"""
return self._ooo_writes_detected

def _InitializeLoader(self):
path = self._GetNextPath()
if path:
self._SetPath(path)
else:
raise StopIteration

def _SetPath(self, path):
"""Sets the current path to watch for new events.

This also records the size of the old path, if any. If the size can't be
found, an error is logged.
def _LoadNewPaths(self):
"""Checks the directory for any new paths that may have been created.

Args:
path: The full path of the file to watch.
"""
old_path = self._path
if old_path and not io_wrapper.IsCloudPath(old_path):
try:
# We're done with the path, so store its size.
size = tf.io.gfile.stat(old_path).length
logger.debug('Setting latest size of %s to %d', old_path, size)
self._finalized_sizes[old_path] = size
except tf.errors.OpError as e:
logger.error('Unable to get size of %s: %s', old_path, e)

self._path = path
self._loader = self._loader_factory(path)

def _GetNextPath(self):
"""Gets the next path to load from.

This function also does the checking for out-of-order writes as it iterates
through the paths.

Returns:
The next path to load events from, or None if there are no more paths.
Loads them into self._paths.
"""
paths = sorted(path
for path in io_wrapper.ListDirectoryAbsolute(self._directory)
if self._path_filter(path))
if not paths:
return None

if self._path is None:
return paths[0]

# Don't bother checking if the paths are GCS (which we can't check) or if
# we've already detected an OOO write.
if not io_wrapper.IsCloudPath(paths[0]) and not self._ooo_writes_detected:
# Check the previous _OOO_WRITE_CHECK_COUNT paths for out of order writes.
current_path_index = bisect.bisect_left(paths, self._path)
ooo_check_start = max(0, current_path_index - self._OOO_WRITE_CHECK_COUNT)
for path in paths[ooo_check_start:current_path_index]:
if self._HasOOOWrite(path):
self._ooo_writes_detected = True
break

next_paths = list(path
for path in paths
if self._path is None or path > self._path)
if next_paths:
return min(next_paths)
else:
return None

def _HasOOOWrite(self, path):
"""Returns whether the path has had an out-of-order write."""
# Check the sizes of each path before the current one.
size = tf.io.gfile.stat(path).length
old_size = self._finalized_sizes.get(path, None)
if size != old_size:
if old_size is None:
logger.error('File %s created after file %s even though it\'s '
'lexicographically earlier', path, self._path)
else:
logger.error('File %s updated even though the current file is %s',
path, self._path)
return True
else:
return False
for path in paths:
if path not in self._paths:
logger.info('New path detected: %s.' % path)
self._paths[path] = _EventPathLoader(path, self._loader_factory)


class DirectoryDeletedError(Exception):
Expand Down
Loading