-
Notifications
You must be signed in to change notification settings - Fork 837
Added compression support in pushgateway #1144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,7 @@ | |
| import ssl | ||
| import sys | ||
| import threading | ||
| from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union | ||
| from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union, Literal | ||
| from urllib.error import HTTPError | ||
| from urllib.parse import parse_qs, quote_plus, urlparse | ||
| from urllib.request import ( | ||
|
|
@@ -46,6 +46,7 @@ | |
| """Content type of the latest format""" | ||
|
|
||
| CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0 | ||
| CompressionType = Optional[Literal['gzip', 'snappy']] | ||
|
|
||
|
|
||
| class _PrometheusRedirectHandler(HTTPRedirectHandler): | ||
|
|
@@ -596,6 +597,7 @@ def push_to_gateway( | |
| grouping_key: Optional[Dict[str, Any]] = None, | ||
| timeout: Optional[float] = 30, | ||
| handler: Callable = default_handler, | ||
| compression: CompressionType = None, | ||
| ) -> None: | ||
| """Push metrics to the given pushgateway. | ||
|
|
||
|
|
@@ -632,10 +634,12 @@ def push_to_gateway( | |
| failure. | ||
| 'content' is the data which should be used to form the HTTP | ||
| Message Body. | ||
| `compression` selects the payload compression. Supported values are 'gzip' | ||
| and 'snappy'. Defaults to None (no compression). | ||
|
|
||
| This overwrites all metrics with the same job and grouping_key. | ||
| This uses the PUT HTTP method.""" | ||
| _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler) | ||
| _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression) | ||
|
|
||
|
|
||
| def pushadd_to_gateway( | ||
|
|
@@ -645,6 +649,7 @@ def pushadd_to_gateway( | |
| grouping_key: Optional[Dict[str, Any]] = None, | ||
| timeout: Optional[float] = 30, | ||
| handler: Callable = default_handler, | ||
| compression: CompressionType = None, | ||
| ) -> None: | ||
| """PushAdd metrics to the given pushgateway. | ||
|
|
||
|
|
@@ -663,10 +668,12 @@ def pushadd_to_gateway( | |
| will be carried out by a default handler. | ||
| See the 'prometheus_client.push_to_gateway' documentation | ||
| for implementation requirements. | ||
| `compression` selects the payload compression. Supported values are 'gzip' | ||
| and 'snappy'. Defaults to None (no compression). | ||
|
|
||
| This replaces metrics with the same name, job and grouping_key. | ||
| This uses the POST HTTP method.""" | ||
| _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler) | ||
| _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression) | ||
|
|
||
|
|
||
| def delete_from_gateway( | ||
|
|
@@ -706,6 +713,7 @@ def _use_gateway( | |
| grouping_key: Optional[Dict[str, Any]], | ||
| timeout: Optional[float], | ||
| handler: Callable, | ||
| compression: CompressionType = None, | ||
| ) -> None: | ||
| gateway_url = urlparse(gateway) | ||
| # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6. | ||
|
|
@@ -715,24 +723,52 @@ def _use_gateway( | |
| gateway = gateway.rstrip('/') | ||
| url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job)) | ||
|
|
||
| data = b'' | ||
| if method != 'DELETE': | ||
| if registry is None: | ||
| registry = REGISTRY | ||
| data = generate_latest(registry) | ||
|
|
||
| if grouping_key is None: | ||
| grouping_key = {} | ||
| url += ''.join( | ||
| '/{}/{}'.format(*_escape_grouping_key(str(k), str(v))) | ||
| for k, v in sorted(grouping_key.items())) | ||
|
|
||
| data = b'' | ||
| headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] | ||
| if method != 'DELETE': | ||
| if registry is None: | ||
| registry = REGISTRY | ||
| data = generate_latest(registry) | ||
| data, headers = _compress_payload(data, compression) | ||
| elif compression is not None: | ||
| raise ValueError('Compression is not supported for DELETE requests.') | ||
|
|
||
| handler( | ||
| url=url, method=method, timeout=timeout, | ||
| headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data, | ||
| headers=headers, data=data, | ||
| )() | ||
|
|
||
|
|
||
| def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]: | ||
| headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] | ||
| if compression is None: | ||
| return data, headers | ||
|
|
||
| encoding = compression.lower() | ||
| if encoding == 'gzip': | ||
| headers.append(('Content-Encoding', 'gzip')) | ||
| return gzip.compress(data), headers | ||
| if encoding == 'snappy': | ||
| try: | ||
| import snappy | ||
| except ImportError as exc: | ||
| raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') from exc | ||
|
Comment on lines
+758
to
+761
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trying an import on each request feels a bit slow, could we do this once and then have a variable for if snappy is enabled or not? |
||
| headers.append(('Content-Encoding', 'snappy')) | ||
| compressor = snappy.StreamCompressor() | ||
| compressed = compressor.compress(data) | ||
| flush = getattr(compressor, 'flush', None) | ||
| if callable(flush): | ||
| compressed += flush() | ||
| return compressed, headers | ||
| raise ValueError(f"Unsupported compression type: {compression}") | ||
|
|
||
|
|
||
| def _escape_grouping_key(k, v): | ||
| if v == "": | ||
| # Per https://github.com/prometheus/pushgateway/pull/346. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -1,3 +1,4 @@ | ||||
| import gzip | ||||
| from http.server import BaseHTTPRequestHandler, HTTPServer | ||||
| import os | ||||
| import threading | ||||
|
|
@@ -404,6 +405,30 @@ def test_push_with_trailing_slash(self): | |||
|
|
||||
| self.assertNotIn('//', self.requests[0][0].path) | ||||
|
|
||||
| def test_push_with_gzip_compression(self): | ||||
| push_to_gateway(self.address, "my_job", self.registry, compression='gzip') | ||||
| request, body = self.requests[0] | ||||
| self.assertEqual(request.headers.get('content-encoding'), 'gzip') | ||||
| decompressed = gzip.decompress(body) | ||||
| self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n') | ||||
|
|
||||
| def test_push_with_snappy_compression(self): | ||||
| snappy = pytest.importorskip('snappy') | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to add snappy to one of the test envs such as Line 11 in 1783ca8
|
||||
| push_to_gateway(self.address, "my_job", self.registry, compression='snappy') | ||||
| request, body = self.requests[0] | ||||
| self.assertEqual(request.headers.get('content-encoding'), 'snappy') | ||||
| decompressor = snappy.StreamDecompressor() | ||||
| decompressed = decompressor.decompress(body) | ||||
| flush = getattr(decompressor, 'flush', None) | ||||
| if callable(flush): | ||||
| decompressed += flush() | ||||
| self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n') | ||||
|
|
||||
| def test_push_with_invalid_compression(self): | ||||
| with self.assertRaisesRegex(ValueError, 'Unsupported compression type'): | ||||
| push_to_gateway(self.address, "my_job", self.registry, compression='brotli') | ||||
| self.assertEqual(self.requests, []) | ||||
|
|
||||
| def test_instance_ip_grouping_key(self): | ||||
| self.assertTrue('' != instance_ip_grouping_key()['instance']) | ||||
|
|
||||
|
|
||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love defining this default header in two places, perhaps append here? Or does delete even need this header as there is no content type?