Skip to content

Commit 4425fad

Browse files
committed
remove special case for compress_level=0 and always use GzipFile (#351)
also, move queue size logic into a lock-protected block. This avoids some weird special cases, and maintains more or less the same functionality, as GzipFile(compresslevel=0) also disables gzip compression
1 parent 464c960 commit 4425fad

File tree

3 files changed

+24
-26
lines changed

3 files changed

+24
-26
lines changed

docs/configuration.asciidoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,8 @@ A higher value also impacts the time until data is indexed and searchable in Ela
427427
This setting is useful to limit memory consumption if you experience a sudden spike of traffic.
428428
It has to be provided in *<<config-format-size, size format>>*.
429429

430-
NOTE: by default, the APM Server limits request payload size to 1 MByte.
430+
NOTE: Due to internal buffering of gzip, the actual request size can be a few kilobytes larger than the given limit.
431+
By default, the APM Server limits request payload size to 1 MByte.
431432

432433
[float]
433434
[[config-api-request-time]]

elasticapm/transport/base.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def __init__(
4848
"""
4949
self.state = TransportState()
5050
self._metadata = metadata if metadata is not None else {}
51-
self._compress_level = compress_level
51+
self._compress_level = min(9, max(0, compress_level if compress_level is not None else 0))
5252
self._json_serializer = json_serializer
5353
self._max_flush_time = max_flush_time
5454
self._max_buffer_size = max_buffer_size
@@ -59,9 +59,10 @@ def __init__(
5959

6060
def queue(self, event_type, data, flush=False):
6161
with self._queue_lock:
62-
self.queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
63-
since_last_flush = timeit.default_timer() - self._last_flush
64-
queue_size = self.queued_data_size
62+
queued_data = self.queued_data
63+
queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
64+
since_last_flush = timeit.default_timer() - self._last_flush
65+
queue_size = 0 if queued_data.fileobj is None else queued_data.fileobj.tell()
6566
if flush:
6667
logger.debug("forced flush")
6768
self.flush()
@@ -84,21 +85,11 @@ def queue(self, event_type, data, flush=False):
8485
@property
8586
def queued_data(self):
8687
if self._queued_data is None:
87-
if self._compress_level:
88-
self._queued_data = gzip.GzipFile(fileobj=BytesIO(), mode="w", compresslevel=self._compress_level)
89-
else:
90-
self._queued_data = BytesIO()
91-
self._queued_data.write((self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8"))
88+
self._queued_data = gzip.GzipFile(fileobj=BytesIO(), mode="w", compresslevel=self._compress_level)
89+
data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8")
90+
self._queued_data.write(data)
9291
return self._queued_data
9392

94-
@property
95-
def queued_data_size(self):
96-
f = self._queued_data
97-
if f:
98-
# return size of the underlying BytesIO object if it is compressed
99-
return f.fileobj.tell() if hasattr(f, "fileobj") else f.tell()
100-
return 0
101-
10293
def flush(self, sync=False, start_flush_timer=True):
10394
"""
10495
Flush the queue
@@ -112,11 +103,9 @@ def flush(self, sync=False, start_flush_timer=True):
112103
if queued_data and not self.state.should_try():
113104
logger.error("dropping flushed data due to transport failure back-off")
114105
elif queued_data:
115-
if self._compress_level:
116-
fileobj = queued_data.fileobj # get a reference to the fileobj before closing the gzip file
117-
queued_data.close()
118-
else:
119-
fileobj = queued_data
106+
fileobj = queued_data.fileobj # get a reference to the fileobj before closing the gzip file
107+
queued_data.close()
108+
120109
# StringIO on Python 2 does not have getbuffer, so we need to fall back to getvalue
121110
data = fileobj.getbuffer() if hasattr(fileobj, "getbuffer") else fileobj.getvalue()
122111
if hasattr(self, "send_async") and not sync:

tests/transports/test_base.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import gzip
12
import random
23
import string
34
import timeit
@@ -62,9 +63,10 @@ def test_metadata_prepended(mock_send):
6263
transport.queue("error", {}, flush=True)
6364
assert mock_send.call_count == 1
6465
args, kwargs = mock_send.call_args
65-
data = args[0]
66-
if compat.PY3:
67-
data = data.tobytes()
66+
if compat.PY2:
67+
data = gzip.GzipFile(fileobj=compat.StringIO(args[0])).read()
68+
else:
69+
data = gzip.decompress(args[0])
6870
data = data.decode("utf-8").split("\n")
6971
assert "metadata" in data[0]
7072

@@ -157,3 +159,9 @@ def test_send_timer(sending_elasticapm_client, caplog):
157159
assert "Starting flush timer" in caplog.records[0].message
158160
assert "Cancelling flush timer" in caplog.records[1].message
159161
assert "Sent request" in caplog.records[2].message
162+
163+
164+
def test_compress_level_sanitization():
165+
assert Transport(compress_level=None)._compress_level == 0
166+
assert Transport(compress_level=-1)._compress_level == 0
167+
assert Transport(compress_level=10)._compress_level == 9

0 commit comments

Comments
 (0)