Skip to content

don't use ChilledQueue if queue.Queue has been monkeypatched #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

try:
import eventlet

eventlet.monkey_patch()
except ImportError:
pass

import sys
from os.path import abspath, dirname, join

Expand Down
40 changes: 30 additions & 10 deletions elasticapm/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def __init__(
self._max_flush_time = max_flush_time
self._max_buffer_size = max_buffer_size
self._queued_data = None
self._event_queue = ChilledQueue(maxsize=10000, chill_until=queue_chill_count, max_chill_time=queue_chill_time)
self._event_queue = self._init_event_queue(chill_until=queue_chill_count, max_chill_time=queue_chill_time)
self._is_chilled_queue = isinstance(self._event_queue, ChilledQueue)
self._event_process_thread = threading.Thread(target=self._process_queue, name="eapm event processor thread")
self._event_process_thread.daemon = True
self._last_flush = timeit.default_timer()
Expand All @@ -105,18 +106,14 @@ def __init__(
def queue(self, event_type, data, flush=False):
try:
self._flushed.clear()
self._event_queue.put((event_type, data, flush), block=False, chill=not (event_type == "close" or flush))
kwargs = {"chill": not (event_type == "close" or flush)} if self._is_chilled_queue else {}
self._event_queue.put((event_type, data, flush), block=False, **kwargs)

except compat.queue.Full:
logger.warning("Event of type %s dropped due to full event queue", event_type)

def _process_queue(self):
def init_buffer():
buffer = gzip.GzipFile(fileobj=compat.BytesIO(), mode="w", compresslevel=self._compress_level)
data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8")
buffer.write(data)
return buffer

buffer = init_buffer()
buffer = self._init_buffer()
buffer_written = False
# add some randomness to timeout to avoid stampedes of several workers that are booted at the same time
max_flush_time = self._max_flush_time * random.uniform(0.9, 1.1) if self._max_flush_time else None
Expand Down Expand Up @@ -166,11 +163,34 @@ def init_buffer():
if buffer_written:
self._flush(buffer)
self._last_flush = timeit.default_timer()
buffer = init_buffer()
buffer = self._init_buffer()
buffer_written = False
max_flush_time = self._max_flush_time * random.uniform(0.9, 1.1) if self._max_flush_time else None
self._flushed.set()

def _init_buffer(self):
buffer = gzip.GzipFile(fileobj=compat.BytesIO(), mode="w", compresslevel=self._compress_level)
data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8")
buffer.write(data)
return buffer

def _init_event_queue(self, chill_until, max_chill_time):
# some libraries like eventlet monkeypatch queue.Queue and switch out the implementation.
# In those cases we can't rely on internals of queue.Queue to be there, so we simply use
# their queue and forgo the optimizations of ChilledQueue. In the case of eventlet, this
# isn't really a loss, because the main reason for ChilledQueue (avoiding context switches
# due to the event processor thread being woken up all the time) is not an issue.
if all(
(
hasattr(compat.queue.Queue, "not_full"),
hasattr(compat.queue.Queue, "not_empty"),
hasattr(compat.queue.Queue, "unfinished_tasks"),
)
):
return ChilledQueue(maxsize=10000, chill_until=chill_until, max_chill_time=max_chill_time)
else:
return compat.queue.Queue(maxsize=10000)

def _flush(self, buffer):
"""
Flush the queue. This method should only be called from the event processing queue
Expand Down
1 change: 1 addition & 0 deletions tests/.jenkins_framework.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ FRAMEWORK:
- elasticsearch-6
- cassandra-newest
- psutil-newest
- eventlet-newest

2 changes: 1 addition & 1 deletion tests/.jenkins_framework_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ FRAMEWORK:
- psutil-newest
- psutil-5.0
- psutil-4.0

- eventlet-newest

53 changes: 53 additions & 0 deletions tests/contrib/test_eventlet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# BSD 3-Clause License
#
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pytest # isort:skip

eventlet = pytest.importorskip("eventlet") # isort:skip

import os

import elasticapm
from elasticapm.conf import constants
from eventlet.patcher import is_monkey_patched

pytestmark = pytest.mark.eventlet


def test_transaction_with_eventlet(sending_elasticapm_client):
assert is_monkey_patched(os)
transaction = sending_elasticapm_client.begin_transaction("test")
with elasticapm.capture_span("bla"):
pass
sending_elasticapm_client.end_transaction("test", "OK")
sending_elasticapm_client.close()
assert len(sending_elasticapm_client.httpserver.requests) == 1
assert sending_elasticapm_client.httpserver.payloads[0][1][constants.SPAN]
assert sending_elasticapm_client.httpserver.payloads[0][2][constants.TRANSACTION]
2 changes: 2 additions & 0 deletions tests/requirements/requirements-eventlet-newest.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
eventlet
-r requirements-base.txt
1 change: 1 addition & 0 deletions tests/scripts/envs/eventlet.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export PYTEST_MARKER="-m eventlet"