Skip to content

Commit 4e71a82

Browse files
committed
feat(): Added compression support in pushgateway
1 parent 1783ca8 commit 4e71a82

File tree

3 files changed

+85
-10
lines changed

3 files changed

+85
-10
lines changed

docs/content/exporting/pushgateway.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ g.set_to_current_time()
5454
push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler)
5555
```
5656

57+
# Compressing data before sending to pushgateway
58+
Pushgateway (version >= 1.5.0) supports gzip and snappy compression (v > 1.6.0). This can help in network constrained environments.
59+
To compress a push request, set the `compression` argument to `'gzip'` or `'snappy'`:
60+
```python
61+
push_to_gateway(
62+
'localhost:9091',
63+
job='batchA',
64+
registry=registry,
65+
handler=my_auth_handler,
66+
compression='gzip',
67+
)
68+
```
69+
Snappy compression requires the optional [`python-snappy`](https://github.com/andrix/python-snappy) package.
70+
5771
TLS Auth is also supported when using the push gateway with a special handler.
5872

5973
```python

prometheus_client/exposition.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import ssl
1010
import sys
1111
import threading
12-
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
12+
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union, Literal
1313
from urllib.error import HTTPError
1414
from urllib.parse import parse_qs, quote_plus, urlparse
1515
from urllib.request import (
@@ -46,6 +46,7 @@
4646
"""Content type of the latest format"""
4747

4848
CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0
49+
CompressionType = Optional[Literal['gzip', 'snappy']]
4950

5051

5152
class _PrometheusRedirectHandler(HTTPRedirectHandler):
@@ -596,6 +597,7 @@ def push_to_gateway(
596597
grouping_key: Optional[Dict[str, Any]] = None,
597598
timeout: Optional[float] = 30,
598599
handler: Callable = default_handler,
600+
compression: CompressionType = None,
599601
) -> None:
600602
"""Push metrics to the given pushgateway.
601603
@@ -632,10 +634,12 @@ def push_to_gateway(
632634
failure.
633635
'content' is the data which should be used to form the HTTP
634636
Message Body.
637+
`compression` selects the payload compression. Supported values are 'gzip'
638+
and 'snappy'. Defaults to None (no compression).
635639
636640
This overwrites all metrics with the same job and grouping_key.
637641
This uses the PUT HTTP method."""
638-
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
642+
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression)
639643

640644

641645
def pushadd_to_gateway(
@@ -645,6 +649,7 @@ def pushadd_to_gateway(
645649
grouping_key: Optional[Dict[str, Any]] = None,
646650
timeout: Optional[float] = 30,
647651
handler: Callable = default_handler,
652+
compression: CompressionType = None,
648653
) -> None:
649654
"""PushAdd metrics to the given pushgateway.
650655
@@ -663,10 +668,12 @@ def pushadd_to_gateway(
663668
will be carried out by a default handler.
664669
See the 'prometheus_client.push_to_gateway' documentation
665670
for implementation requirements.
671+
`compression` selects the payload compression. Supported values are 'gzip'
672+
and 'snappy'. Defaults to None (no compression).
666673
667674
This replaces metrics with the same name, job and grouping_key.
668675
This uses the POST HTTP method."""
669-
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
676+
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression)
670677

671678

672679
def delete_from_gateway(
@@ -706,6 +713,7 @@ def _use_gateway(
706713
grouping_key: Optional[Dict[str, Any]],
707714
timeout: Optional[float],
708715
handler: Callable,
716+
compression: CompressionType = None,
709717
) -> None:
710718
gateway_url = urlparse(gateway)
711719
# See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
@@ -715,24 +723,52 @@ def _use_gateway(
715723
gateway = gateway.rstrip('/')
716724
url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))
717725

718-
data = b''
719-
if method != 'DELETE':
720-
if registry is None:
721-
registry = REGISTRY
722-
data = generate_latest(registry)
723-
724726
if grouping_key is None:
725727
grouping_key = {}
726728
url += ''.join(
727729
'/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
728730
for k, v in sorted(grouping_key.items()))
729731

732+
data = b''
733+
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
734+
if method != 'DELETE':
735+
if registry is None:
736+
registry = REGISTRY
737+
data = generate_latest(registry)
738+
data, headers = _compress_payload(data, compression)
739+
elif compression is not None:
740+
raise ValueError('Compression is not supported for DELETE requests.')
741+
730742
handler(
731743
url=url, method=method, timeout=timeout,
732-
headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data,
744+
headers=headers, data=data,
733745
)()
734746

735747

748+
def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]:
749+
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
750+
if compression is None:
751+
return data, headers
752+
753+
encoding = compression.lower()
754+
if encoding == 'gzip':
755+
headers.append(('Content-Encoding', 'gzip'))
756+
return gzip.compress(data), headers
757+
if encoding == 'snappy':
758+
try:
759+
import snappy
760+
except ImportError as exc:
761+
raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') from exc
762+
headers.append(('Content-Encoding', 'snappy'))
763+
compressor = snappy.StreamCompressor()
764+
compressed = compressor.compress(data)
765+
flush = getattr(compressor, 'flush', None)
766+
if callable(flush):
767+
compressed += flush()
768+
return compressed, headers
769+
raise ValueError(f"Unsupported compression type: {compression}")
770+
771+
736772
def _escape_grouping_key(k, v):
737773
if v == "":
738774
# Per https://github.com/prometheus/pushgateway/pull/346.

tests/test_exposition.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import gzip
12
from http.server import BaseHTTPRequestHandler, HTTPServer
23
import os
34
import threading
@@ -404,6 +405,30 @@ def test_push_with_trailing_slash(self):
404405

405406
self.assertNotIn('//', self.requests[0][0].path)
406407

408+
def test_push_with_gzip_compression(self):
409+
push_to_gateway(self.address, "my_job", self.registry, compression='gzip')
410+
request, body = self.requests[0]
411+
self.assertEqual(request.headers.get('content-encoding'), 'gzip')
412+
decompressed = gzip.decompress(body)
413+
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
414+
415+
def test_push_with_snappy_compression(self):
416+
snappy = pytest.importorskip('snappy')
417+
push_to_gateway(self.address, "my_job", self.registry, compression='snappy')
418+
request, body = self.requests[0]
419+
self.assertEqual(request.headers.get('content-encoding'), 'snappy')
420+
decompressor = snappy.StreamDecompressor()
421+
decompressed = decompressor.decompress(body)
422+
flush = getattr(decompressor, 'flush', None)
423+
if callable(flush):
424+
decompressed += flush()
425+
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
426+
427+
def test_push_with_invalid_compression(self):
428+
with self.assertRaisesRegex(ValueError, 'Unsupported compression type'):
429+
push_to_gateway(self.address, "my_job", self.registry, compression='brotli')
430+
self.assertEqual(self.requests, [])
431+
407432
def test_instance_ip_grouping_key(self):
408433
self.assertTrue('' != instance_ip_grouping_key()['instance'])
409434

0 commit comments

Comments
 (0)