Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/content/exporting/pushgateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ g.set_to_current_time()
push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler)
```

# Compressing data before sending to pushgateway
Pushgateway (version >= 1.5.0) supports gzip and snappy compression (v > 1.6.0). This can help in network constrained environments.
To compress a push request, set the `compression` argument to `'gzip'` or `'snappy'`:
```python
push_to_gateway(
'localhost:9091',
job='batchA',
registry=registry,
handler=my_auth_handler,
compression='gzip',
)
```
Snappy compression requires the optional [`python-snappy`](https://github.com/andrix/python-snappy) package.

TLS Auth is also supported when using the push gateway with a special handler.

```python
Expand Down
56 changes: 46 additions & 10 deletions prometheus_client/exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import ssl
import sys
import threading
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union, Literal
from urllib.error import HTTPError
from urllib.parse import parse_qs, quote_plus, urlparse
from urllib.request import (
Expand Down Expand Up @@ -46,6 +46,7 @@
"""Content type of the latest format"""

CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0
CompressionType = Optional[Literal['gzip', 'snappy']]


class _PrometheusRedirectHandler(HTTPRedirectHandler):
Expand Down Expand Up @@ -596,6 +597,7 @@ def push_to_gateway(
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
compression: CompressionType = None,
) -> None:
"""Push metrics to the given pushgateway.

Expand Down Expand Up @@ -632,10 +634,12 @@ def push_to_gateway(
failure.
'content' is the data which should be used to form the HTTP
Message Body.
`compression` selects the payload compression. Supported values are 'gzip'
and 'snappy'. Defaults to None (no compression).

This overwrites all metrics with the same job and grouping_key.
This uses the PUT HTTP method."""
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression)


def pushadd_to_gateway(
Expand All @@ -645,6 +649,7 @@ def pushadd_to_gateway(
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
compression: CompressionType = None,
) -> None:
"""PushAdd metrics to the given pushgateway.

Expand All @@ -663,10 +668,12 @@ def pushadd_to_gateway(
will be carried out by a default handler.
See the 'prometheus_client.push_to_gateway' documentation
for implementation requirements.
`compression` selects the payload compression. Supported values are 'gzip'
and 'snappy'. Defaults to None (no compression).

This replaces metrics with the same name, job and grouping_key.
This uses the POST HTTP method."""
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression)


def delete_from_gateway(
Expand Down Expand Up @@ -706,6 +713,7 @@ def _use_gateway(
grouping_key: Optional[Dict[str, Any]],
timeout: Optional[float],
handler: Callable,
compression: CompressionType = None,
) -> None:
gateway_url = urlparse(gateway)
# See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
Expand All @@ -715,24 +723,52 @@ def _use_gateway(
gateway = gateway.rstrip('/')
url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))

data = b''
if method != 'DELETE':
if registry is None:
registry = REGISTRY
data = generate_latest(registry)

if grouping_key is None:
grouping_key = {}
url += ''.join(
'/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
for k, v in sorted(grouping_key.items()))

data = b''
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love defining this default header in two places, perhaps append here? Or does delete even need this header as there is no content type?

if method != 'DELETE':
if registry is None:
registry = REGISTRY
data = generate_latest(registry)
data, headers = _compress_payload(data, compression)
elif compression is not None:
raise ValueError('Compression is not supported for DELETE requests.')

handler(
url=url, method=method, timeout=timeout,
headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data,
headers=headers, data=data,
)()


def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]:
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
if compression is None:
return data, headers

encoding = compression.lower()
if encoding == 'gzip':
headers.append(('Content-Encoding', 'gzip'))
return gzip.compress(data), headers
if encoding == 'snappy':
try:
import snappy
except ImportError as exc:
raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') from exc
Comment on lines +758 to +761
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying an import on each request feels a bit slow, could we do this once and then have a variable for if snappy is enabled or not?

headers.append(('Content-Encoding', 'snappy'))
compressor = snappy.StreamCompressor()
compressed = compressor.compress(data)
flush = getattr(compressor, 'flush', None)
if callable(flush):
compressed += flush()
return compressed, headers
raise ValueError(f"Unsupported compression type: {compression}")


def _escape_grouping_key(k, v):
if v == "":
# Per https://github.com/prometheus/pushgateway/pull/346.
Expand Down
25 changes: 25 additions & 0 deletions tests/test_exposition.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
from http.server import BaseHTTPRequestHandler, HTTPServer
import os
import threading
Expand Down Expand Up @@ -404,6 +405,30 @@ def test_push_with_trailing_slash(self):

self.assertNotIn('//', self.requests[0][0].path)

def test_push_with_gzip_compression(self):
push_to_gateway(self.address, "my_job", self.registry, compression='gzip')
request, body = self.requests[0]
self.assertEqual(request.headers.get('content-encoding'), 'gzip')
decompressed = gzip.decompress(body)
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')

def test_push_with_snappy_compression(self):
snappy = pytest.importorskip('snappy')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add snappy to one of the test envs such as

{py3.9,pypy3.9}: twisted
so that CI runs this test in at least one environment.

push_to_gateway(self.address, "my_job", self.registry, compression='snappy')
request, body = self.requests[0]
self.assertEqual(request.headers.get('content-encoding'), 'snappy')
decompressor = snappy.StreamDecompressor()
decompressed = decompressor.decompress(body)
flush = getattr(decompressor, 'flush', None)
if callable(flush):
decompressed += flush()
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')

def test_push_with_invalid_compression(self):
with self.assertRaisesRegex(ValueError, 'Unsupported compression type'):
push_to_gateway(self.address, "my_job", self.registry, compression='brotli')
self.assertEqual(self.requests, [])

def test_instance_ip_grouping_key(self):
self.assertTrue('' != instance_ip_grouping_key()['instance'])

Expand Down