Skip to content

Commit

Permalink
Make sure handler.flush() doesn't deadlock. (#1112)
Browse files Browse the repository at this point in the history
  • Loading branch information
gukoff authored Mar 29, 2022
1 parent afca369 commit 9ffa48a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ def log_record_to_envelope(self, record):
raise NotImplementedError # pragma: NO COVER

def flush(self, timeout=None):
if self._queue.is_empty():
return

# We must check the worker thread is alive, because otherwise flush
# is useless. Also, it would deadlock if no timeout is given, and the
# queue isn't empty.
# This is a very possible scenario during process termination, when
# atexit first calls handler.close() and then logging.shutdown(),
# that in turn calls handler.flush() without arguments.
if not self._worker.is_alive():
logger.warning("Can't flush %s, worker thread is dead. "
"Any pending messages will be lost.", self)
return

self._queue.flush(timeout=timeout)


Expand Down
5 changes: 4 additions & 1 deletion opencensus/common/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ def _gets(self, count, timeout):
def gets(self, count, timeout):
return tuple(self._gets(count, timeout))

def is_empty(self):
return not self._queue.qsize()

def flush(self, timeout=None):
if self._queue.qsize() == 0:
return 0
Expand All @@ -124,7 +127,7 @@ def flush(self, timeout=None):
return
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if event.wait(timeout):
if event.wait(wait_time):
return time.time() - start_time # time taken to flush

def put(self, item, block=True, timeout=None):
Expand Down

0 comments on commit 9ffa48a

Please sign in to comment.