Skip to content

Commit 08136b4

Browse files
committed
elasticapm/transport: specific shutdown handling for http transport
We have a race condition at http transport shutdown where our atexit handler is racing against urllib3 ConnectionPool weakref finalizer. Having the urllib3 finalizer called before atexit would lead to have our thread hang waiting for send any eventual queued data via urllib3 pools that are closed. Force the creation of a new PoolManager so that we are always able to flush.
1 parent 8163b0c commit 08136b4

File tree

3 files changed

+106
-1
lines changed

3 files changed

+106
-1
lines changed

elasticapm/transport/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,8 @@ def close(self) -> None:
292292
if not self._flushed.wait(timeout=self._max_flush_time_seconds):
293293
logger.error("Closing the transport connection timed out.")
294294

295-
stop_thread = close
295+
def stop_thread(self) -> None:
296+
self.close()
296297

297298
def flush(self):
298299
"""

elasticapm/transport/http.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import hashlib
3434
import json
35+
import os
3536
import re
3637
import ssl
3738
import urllib.parse
@@ -250,6 +251,23 @@ def ca_certs(self):
250251
return self._server_ca_cert_file
251252
return certifi.where() if (certifi and self.client.config.use_certifi) else None
252253

254+
def close(self):
255+
"""
256+
Take care of being able to shutdown cleanly
257+
:return:
258+
"""
259+
if self._closed or (not self._thread or self._thread.pid != os.getpid()):
260+
return
261+
262+
self._closed = True
263+
# we are racing against urllib3 ConnectionPool weakref finalizer that would lead to having them closed
264+
# and we hanging waiting for send any eventual queued data
265+
# Force the creation of a new PoolManager so that we are always able to flush
266+
self._http = None
267+
self.queue("close", None)
268+
if not self._flushed.wait(timeout=self._max_flush_time_seconds):
269+
logger.error("Closing the transport connection timed out.")
270+
253271

254272
def version_string_to_tuple(version):
255273
if version:

tests/transports/test_urllib3.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131

3232
import os
33+
import time
3334

3435
import certifi
3536
import mock
@@ -517,3 +518,88 @@ def test_fetch_server_info_flat_string(waiting_httpserver, caplog, elasticapm_cl
517518
transport.fetch_server_info()
518519
assert elasticapm_client.server_version is None
519520
assert_any_record_contains(caplog.records, "No version key found in server response")
521+
522+
523+
def test_close(waiting_httpserver, elasticapm_client):
524+
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
525+
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
526+
transport = Transport(
527+
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
528+
)
529+
transport.start_thread()
530+
531+
transport.close()
532+
533+
assert transport._closed is True
534+
assert transport._flushed.is_set() is True
535+
536+
537+
def test_close_does_nothing_if_called_from_another_pid(waiting_httpserver, caplog, elasticapm_client):
538+
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
539+
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
540+
transport = Transport(
541+
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
542+
)
543+
transport.start_thread()
544+
545+
with mock.patch("os.getpid") as getpid_mock:
546+
getpid_mock.return_value = 0
547+
transport.close()
548+
549+
assert transport._closed is False
550+
551+
transport.close()
552+
553+
554+
def test_close_can_be_called_multiple_times(waiting_httpserver, caplog, elasticapm_client):
555+
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
556+
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
557+
transport = Transport(
558+
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
559+
)
560+
transport.start_thread()
561+
562+
with caplog.at_level("INFO", logger="elasticapm.transport.http"):
563+
transport.close()
564+
565+
assert transport._closed is True
566+
567+
transport.close()
568+
569+
570+
def test_close_timeout_error_without_flushing(waiting_httpserver, caplog, elasticapm_client):
571+
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
572+
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
573+
574+
with caplog.at_level("INFO", logger="elasticapm.transport.http"):
575+
with mock.patch.object(Transport, "_max_flush_time_seconds", 0):
576+
with mock.patch.object(Transport, "_flush") as flush_mock:
577+
# enough to take more that the timeout
578+
flush_mock.side_effect = lambda: time.sleep(0.2)
579+
transport = Transport(
580+
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
581+
)
582+
transport.start_thread()
583+
transport.close()
584+
585+
assert transport._flushed.is_set() is False
586+
assert transport._closed is True
587+
record = caplog.records[-1]
588+
assert "Closing the transport connection timed out." in record.msg
589+
590+
591+
def test_http_pool_manager_is_recycled_at_stop_thread(waiting_httpserver, caplog, elasticapm_client):
592+
elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request
593+
waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"})
594+
transport = Transport(
595+
waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers
596+
)
597+
transport.start_thread()
598+
pool_manager = transport.http
599+
600+
with caplog.at_level("INFO", logger="elasticapm.transport.http"):
601+
transport.stop_thread()
602+
603+
assert transport._flushed.is_set() is True
604+
assert pool_manager != transport._http
605+
assert not caplog.records

0 commit comments

Comments
 (0)