|
30 | 30 |
|
31 | 31 |
|
32 | 32 | import os
|
| 33 | +import time |
33 | 34 |
|
34 | 35 | import certifi
|
35 | 36 | import mock
|
@@ -517,3 +518,90 @@ def test_fetch_server_info_flat_string(waiting_httpserver, caplog, elasticapm_cl
|
517 | 518 | transport.fetch_server_info()
|
518 | 519 | assert elasticapm_client.server_version is None
|
519 | 520 | assert_any_record_contains(caplog.records, "No version key found in server response")
|
| 521 | + |
| 522 | + |
| 523 | +def test_close(waiting_httpserver, elasticapm_client): |
| 524 | + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request |
| 525 | + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) |
| 526 | + transport = Transport( |
| 527 | + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers |
| 528 | + ) |
| 529 | + transport.start_thread() |
| 530 | + |
| 531 | + transport.close() |
| 532 | + |
| 533 | + assert transport._closed is True |
| 534 | + assert transport._flushed.is_set() is True |
| 535 | + |
| 536 | + |
| 537 | +def test_close_does_nothing_if_called_from_another_pid(waiting_httpserver, caplog, elasticapm_client): |
| 538 | + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request |
| 539 | + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) |
| 540 | + transport = Transport( |
| 541 | + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers |
| 542 | + ) |
| 543 | + transport.start_thread() |
| 544 | + |
| 545 | + with mock.patch("os.getpid") as getpid_mock: |
| 546 | + getpid_mock.return_value = 0 |
| 547 | + transport.close() |
| 548 | + |
| 549 | + assert transport._closed is False |
| 550 | + |
| 551 | + transport.close() |
| 552 | + |
| 553 | + |
| 554 | +def test_close_can_be_called_multiple_times(waiting_httpserver, caplog, elasticapm_client): |
| 555 | + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request |
| 556 | + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) |
| 557 | + transport = Transport( |
| 558 | + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers |
| 559 | + ) |
| 560 | + transport.start_thread() |
| 561 | + |
| 562 | + with caplog.at_level("INFO", logger="elasticapm.transport.http"): |
| 563 | + transport.close() |
| 564 | + |
| 565 | + assert transport._closed is True |
| 566 | + |
| 567 | + transport.close() |
| 568 | + |
| 569 | + |
| 570 | +def test_close_timeout_error_without_flushing(waiting_httpserver, caplog, elasticapm_client): |
| 571 | + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request |
| 572 | + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) |
| 573 | + |
| 574 | + with caplog.at_level("INFO", logger="elasticapm.transport.http"): |
| 575 | + with mock.patch.object(Transport, "_max_flush_time_seconds", 0): |
| 576 | + with mock.patch.object(Transport, "_flush") as flush_mock: |
| 577 | + # sleep more that the timeout |
| 578 | + flush_mock.side_effect = lambda x: time.sleep(0.1) |
| 579 | + transport = Transport( |
| 580 | + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers |
| 581 | + ) |
| 582 | + transport.start_thread() |
| 583 | + # need to write something to the buffer to have _flush() called |
| 584 | + transport.queue("error", {"an": "error"}) |
| 585 | + transport.close() |
| 586 | + |
| 587 | + assert transport._flushed.is_set() is False |
| 588 | + assert transport._closed is True |
| 589 | + record = caplog.records[-1] |
| 590 | + assert "Closing the transport connection timed out." in record.msg |
| 591 | + |
| 592 | + |
| 593 | +def test_http_pool_manager_is_recycled_at_stop_thread(waiting_httpserver, caplog, elasticapm_client): |
| 594 | + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request |
| 595 | + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) |
| 596 | + transport = Transport( |
| 597 | + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers |
| 598 | + ) |
| 599 | + transport.start_thread() |
| 600 | + pool_manager = transport.http |
| 601 | + |
| 602 | + with caplog.at_level("INFO", logger="elasticapm.transport.http"): |
| 603 | + transport.stop_thread() |
| 604 | + |
| 605 | + assert transport._flushed.is_set() is True |
| 606 | + assert pool_manager != transport._http |
| 607 | + assert not caplog.records |
0 commit comments