Skip to content
This repository was archived by the owner on Feb 26, 2026. It is now read-only.

Commit 1b037b6

Browse files
committed
Refactored _close
1 parent b46779e commit 1b037b6

File tree

2 files changed

+62
-35
lines changed

2 files changed

+62
-35
lines changed

google/cloud/logging_v2/handlers/transports/background_thread.py

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def start(self):
171171
)
172172
self._thread.daemon = True
173173
self._thread.start()
174-
atexit.register(self._close)
174+
atexit.register(self._handle_exit)
175175

176176
def stop(self, *, grace_period=None):
177177
"""Signals the background thread to stop.
@@ -211,33 +211,20 @@ def stop(self, *, grace_period=None):
211211

212212
return success
213213

214-
def _close(self):
215-
"""Callback that attempts to send pending logs before termination."""
214+
def _close(self, close_msg):
215+
"""Callback that attempts to send pending logs before termination if the main thread is alive."""
216216
if not self.is_alive:
217217
return
218218

219-
# Print different messages to the user depending on whether or not the
220-
# program is shutting down. This is because this function now handles both
221-
# the atexit handler and the regular close.
222219
if not self._queue.empty():
223-
if threading.main_thread().is_alive():
224-
print(
225-
"Background thread shutting down, attempting to send %d queued log "
226-
"entries to Cloud Logging..." % (self._queue.qsize(),),
227-
file=sys.stderr,
228-
)
229-
else:
230-
print(
231-
_CLOSE_THREAD_SHUTDOWN_ERROR_MSG,
232-
file=sys.stderr,
233-
)
220+
print(close_msg, file=sys.stderr)
234221

235222
if (
223+
threading.main_thread().is_alive() and
236224
self.stop(grace_period=self._grace_period)
237-
and threading.main_thread().is_alive()
238225
):
239226
print("Sent all pending logs.", file=sys.stderr)
240-
else:
227+
elif not self._queue.empty():
241228
print(
242229
"Failed to send %d pending logs." % (self._queue.qsize(),),
243230
file=sys.stderr,
@@ -277,10 +264,22 @@ def flush(self):
277264
def close(self):
278265
"""Signals the worker thread to stop, then closes the transport thread.
279266
280-
This call should be followed up by disowning the transport object.
267+
This call will attempt to send pending logs before termination, and
268+
should be followed up by disowning the transport object.
269+
"""
270+
atexit.unregister(self._handle_exit)
271+
self._close(
272+
"Background thread shutting down, attempting to send %d queued log "
273+
"entries to Cloud Logging..." % (self._queue.qsize(),)
274+
)
275+
276+
def _handle_exit(self):
277+
"""Handle system exit.
278+
279+
Since we cannot send pending logs during system shutdown due to thread errors,
280+
log an error message to stderr to notify the user.
281281
"""
282-
atexit.unregister(self._close)
283-
self._close()
282+
self._close(_CLOSE_THREAD_SHUTDOWN_ERROR_MSG)
284283

285284

286285
class BackgroundThreadTransport(Transport):
@@ -342,4 +341,4 @@ def flush(self):
342341

343342
def close(self):
344343
"""Closes the worker thread."""
345-
self.worker.stop(grace_period=self.grace_period)
344+
self.worker.close()

tests/unit/handlers/transports/test_background_thread.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ def test_start(self):
250250
self.assertTrue(worker._thread.daemon)
251251
self.assertEqual(worker._thread._target, worker._thread_main)
252252
self.assertEqual(worker._thread._name, background_thread._WORKER_THREAD_NAME)
253-
self.assertIn(worker._close, atexit_mock.registered_funcs)
253+
self.assertIn(worker._handle_exit, atexit_mock.registered_funcs)
254254

255255
# Calling start again should not start a new thread.
256256
current_thread = worker._thread
@@ -291,21 +291,25 @@ def test__close(self):
291291
worker = self._make_one(_Logger(self.NAME))
292292

293293
self._start_with_thread_patch(worker)
294-
worker._close()
294+
worker._close("")
295295

296296
self.assertFalse(worker.is_alive)
297297

298298
# Calling twice should not be an error
299-
worker._close()
299+
worker._close("")
300300

301301
def test__close_non_empty_queue(self):
302302
worker = self._make_one(_Logger(self.NAME))
303+
msg = "My Message"
303304

304305
self._start_with_thread_patch(worker)
305306
record = mock.Mock()
306307
record.created = time.time()
307308
worker.enqueue(record, "")
308-
worker._close()
309+
310+
with mock.patch("sys.stderr", new_callable=StringIO) as stderr_mock:
311+
worker._close(msg)
312+
self.assertIn(msg, stderr_mock.getvalue())
309313

310314
self.assertFalse(worker.is_alive)
311315

@@ -317,11 +321,11 @@ def test__close_did_not_join(self):
317321
record = mock.Mock()
318322
record.created = time.time()
319323
worker.enqueue(record, "")
320-
worker._close()
324+
worker._close("")
321325

322326
self.assertFalse(worker.is_alive)
323327

324-
def test__close_main_thread_not_alive(self):
328+
def test__handle_exit(self):
325329
from google.cloud.logging_v2.handlers.transports.background_thread import (
326330
_CLOSE_THREAD_SHUTDOWN_ERROR_MSG,
327331
)
@@ -333,21 +337,45 @@ def test__close_main_thread_not_alive(self):
333337
with self._init_atexit_mock():
334338
self._start_with_thread_patch(worker)
335339
self._enqueue_record(worker, "test")
336-
worker._close()
340+
worker._handle_exit()
337341

338342
self.assertRegex(
339343
stderr_mock.getvalue(),
340344
re.compile("^%s$" % _CLOSE_THREAD_SHUTDOWN_ERROR_MSG, re.MULTILINE),
341345
)
342346

347+
self.assertRegex(
348+
stderr_mock.getvalue(),
349+
re.compile(r"^Failed to send %d pending logs\.$" % worker._queue.qsize(), re.MULTILINE),
350+
)
351+
352+
def test__handle_exit_no_items(self):
353+
worker = self._make_one(_Logger(self.NAME))
354+
355+
with mock.patch("sys.stderr", new_callable=StringIO) as stderr_mock:
356+
with self._init_main_thread_is_alive_mock(False):
357+
with self._init_atexit_mock():
358+
self._start_with_thread_patch(worker)
359+
worker._handle_exit()
360+
361+
self.assertEqual(stderr_mock.getvalue(), "")
362+
343363
def test_close_unregister_atexit(self):
344364
worker = self._make_one(_Logger(self.NAME))
345365

346-
with self._init_atexit_mock() as atexit_mock:
347-
self._start_with_thread_patch(worker)
348-
self.assertIn(worker._close, atexit_mock.registered_funcs)
349-
worker.close()
350-
self.assertNotIn(worker._close, atexit_mock.registered_funcs)
366+
with mock.patch("sys.stderr", new_callable=StringIO) as stderr_mock:
367+
with self._init_atexit_mock() as atexit_mock:
368+
self._start_with_thread_patch(worker)
369+
self.assertIn(worker._handle_exit, atexit_mock.registered_funcs)
370+
worker.close()
371+
self.assertNotIn(worker._handle_exit, atexit_mock.registered_funcs)
372+
373+
self.assertNotRegex(
374+
stderr_mock.getvalue(),
375+
re.compile(r"^Failed to send %d pending logs\.$" % worker._queue.qsize(), re.MULTILINE),
376+
)
377+
378+
self.assertFalse(worker.is_alive)
351379

352380
@staticmethod
353381
def _enqueue_record(worker, message, levelno=logging.INFO, **kw):

0 commit comments

Comments
 (0)