Skip to content

move queue size logic into a lock-protected block #351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ A higher value also impacts the time until data is indexed and searchable in Ela
This setting is useful to limit memory consumption if you experience a sudden spike of traffic.
It has to be provided in *<<config-format-size, size format>>*.

NOTE: by default, the APM Server limits request payload size to 1 MByte.
NOTE: Due to internal buffering of gzip, the actual request size can be a few kilobytes larger than the given limit.
By default, the APM Server limits request payload size to 1 MByte.

[float]
[[config-api-request-time]]
Expand Down
33 changes: 11 additions & 22 deletions elasticapm/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
"""
self.state = TransportState()
self._metadata = metadata if metadata is not None else {}
self._compress_level = compress_level
self._compress_level = min(9, max(0, compress_level if compress_level is not None else 0))
self._json_serializer = json_serializer
self._max_flush_time = max_flush_time
self._max_buffer_size = max_buffer_size
Expand All @@ -59,9 +59,10 @@ def __init__(

def queue(self, event_type, data, flush=False):
with self._queue_lock:
self.queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
since_last_flush = timeit.default_timer() - self._last_flush
queue_size = self.queued_data_size
queued_data = self.queued_data
queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
since_last_flush = timeit.default_timer() - self._last_flush
queue_size = 0 if queued_data.fileobj is None else queued_data.fileobj.tell()
if flush:
logger.debug("forced flush")
self.flush()
Expand All @@ -84,21 +85,11 @@ def queue(self, event_type, data, flush=False):
@property
def queued_data(self):
if self._queued_data is None:
if self._compress_level:
self._queued_data = gzip.GzipFile(fileobj=BytesIO(), mode="w", compresslevel=self._compress_level)
else:
self._queued_data = BytesIO()
self._queued_data.write((self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8"))
self._queued_data = gzip.GzipFile(fileobj=BytesIO(), mode="w", compresslevel=self._compress_level)
data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8")
self._queued_data.write(data)
return self._queued_data

@property
def queued_data_size(self):
f = self._queued_data
if f:
# return size of the underlying BytesIO object if it is compressed
return f.fileobj.tell() if hasattr(f, "fileobj") else f.tell()
return 0

def flush(self, sync=False, start_flush_timer=True):
"""
Flush the queue
Expand All @@ -112,11 +103,9 @@ def flush(self, sync=False, start_flush_timer=True):
if queued_data and not self.state.should_try():
logger.error("dropping flushed data due to transport failure back-off")
elif queued_data:
if self._compress_level:
fileobj = queued_data.fileobj # get a reference to the fileobj before closing the gzip file
queued_data.close()
else:
fileobj = queued_data
fileobj = queued_data.fileobj # get a reference to the fileobj before closing the gzip file
queued_data.close()

# StringIO on Python 2 does not have getbuffer, so we need to fall back to getvalue
data = fileobj.getbuffer() if hasattr(fileobj, "getbuffer") else fileobj.getvalue()
if hasattr(self, "send_async") and not sync:
Expand Down
14 changes: 11 additions & 3 deletions tests/transports/test_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
import random
import string
import timeit
Expand Down Expand Up @@ -62,9 +63,10 @@ def test_metadata_prepended(mock_send):
transport.queue("error", {}, flush=True)
assert mock_send.call_count == 1
args, kwargs = mock_send.call_args
data = args[0]
if compat.PY3:
data = data.tobytes()
if compat.PY2:
data = gzip.GzipFile(fileobj=compat.StringIO(args[0])).read()
else:
data = gzip.decompress(args[0])
data = data.decode("utf-8").split("\n")
assert "metadata" in data[0]

Expand Down Expand Up @@ -157,3 +159,9 @@ def test_send_timer(sending_elasticapm_client, caplog):
assert "Starting flush timer" in caplog.records[0].message
assert "Cancelling flush timer" in caplog.records[1].message
assert "Sent request" in caplog.records[2].message


def test_compress_level_sanitization():
assert Transport(compress_level=None)._compress_level == 0
assert Transport(compress_level=-1)._compress_level == 0
assert Transport(compress_level=10)._compress_level == 9