Skip to content

Commit 73abc55

Browse files
authored
Merge pull request #1 from gistart/robust-udp
robust udp + udp dns round robin support?
2 parents 1e292c6 + 317097d commit 73abc55

File tree

5 files changed

+85
-16
lines changed

5 files changed

+85
-16
lines changed

.gitignore

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ __pycache__
1515
.coverage
1616
coverage.xml
1717
test/poke.py
18-
build/*
19-
dist/*
20-
compare/*
18+
build*
19+
dist*
20+
compare*
2121

2222
!.gitkeep
2323
!/.gitignore

README.md

+10-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ Metrics with no labels are initialized at creation time. This can have unpleasan
5959

6060
To avoid that we'll have to properly isolate each task's metrics, which can be impossible or rather tricky, or we can create metrics with default, non-changing labels (like `hostname`). Such metrics will be initialized on first use (inc), and we'll be pushing only those we actually utilized.
6161

62-
## Batch clients
62+
## Clients
63+
64+
### Batch clients
6365

6466
Batch clients spawn synchronization jobs "in background" (meaning in a thread or asyncio task) to periodically send all metrics from `ppc.PUSH_REGISTRY` to the destination.
6567

@@ -107,7 +109,7 @@ async def main(urls):
107109
```
108110

109111

110-
## Streaming clients
112+
### Streaming clients
111113

112114
If for some reason every metric change needs to be synced, UDP streaming clients are implemented in this library.
113115

@@ -120,4 +122,9 @@ def statsd_udp_stream(host, port):
120122

121123
Usage is completely identical to batch clients' decorators / context managers.
122124

123-
:warning: Histogram and Summary `.time() decorator` doesn't work in this mode atm, because it can't be monkey-patched easily.
125+
:warning: Histogram and Summary `.time() decorator` doesn't work in this mode atm, because it can't be monkey-patched easily.
126+
127+
128+
## Transports
129+
130+
Main goal is not to interrupt measured jobs with errors from monitoring code. Therefor all transports will attempt to catch all network errors, logging error info and corresponding tracebacks to stdout.

prometheus_push_client/clients/batch.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def __init__(self, format, transport, period=15.0, *args, **kwargs):
1717
self.stop_event = None
1818

1919
self._period_step = 0.25 # check event every 0.25 seconds
20+
self._min_period = 0.1 # sleep no less than
2021

2122
super().__init__(*args, **kwargs)
2223

@@ -56,7 +57,7 @@ def run(self):
5657
self.transport.push_all(data_gen)
5758
except Exception:
5859
log.error("push crashed", exc_info=True)
59-
period = self.period - (time.time() - ts_start)
60+
period = max(self._min_period, self.period - (time.time() - ts_start))
6061

6162

6263
class AsyncBatchClient(BaseBatchClient):
@@ -90,4 +91,4 @@ async def run(self):
9091
await self.transport.push_all(data_gen)
9192
except Exception:
9293
log.error("push crashed", exc_info=True)
93-
period = self.period - (time.time() - ts_start)
94+
period = max(self._min_period, self.period - (time.time() - ts_start))

prometheus_push_client/transports/udp.py

+33-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import asyncio
22
import socket
3+
import logging
34

45
from prometheus_push_client import compat
56

67

7-
class BaseUdpTransport:
8+
log = logging.getLogger("prometheus.udp")
9+
810

11+
class BaseUdpTransport:
912
def __init__(self, host, port, mtu=508, datagram_lines=25):
1013
self.host = host
1114
self.port = int(port)
@@ -40,28 +43,42 @@ def push_all_sync(self, iterable):
4043
self.push_one(data)
4144

4245
def push_one(self, data):
43-
raise NotImplementedError()
46+
try:
47+
return self.transport.sendto(data, (self.host, self.port))
48+
except socket.gaierror: # name resolution error
49+
pass
4450

4551

46-
# TODO: crashes on creation time DNS errors -- retry?
52+
# TODO: ipv6 support?
4753

4854

4955
class SyncUdpTransport(BaseUdpTransport):
5056
def start(self):
57+
self._getaddrinfo()
5158
self.transport = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
5259

5360
push_all = BaseUdpTransport.push_all_sync
5461

55-
def push_one(self, data):
56-
self.transport.sendto(data, (self.host, self.port))
62+
def _getaddrinfo(self):
63+
try:
64+
return socket.getaddrinfo(
65+
self.host,
66+
self.port,
67+
family=socket.AF_INET,
68+
type=socket.SOCK_DGRAM,
69+
)
70+
except socket.gaierror as e:
71+
log.error("%s -- %s:%s", e, self.host, self.port)
5772

5873

5974
class AioUdpTransport(BaseUdpTransport):
6075
async def start(self, loop=None):
6176
loop = loop or compat.get_running_loop()
77+
await self._getaddrinfo(loop)
6278
self.transport, _ = await loop.create_datagram_endpoint(
6379
lambda: asyncio.DatagramProtocol(),
64-
remote_addr=(self.host, self.port)
80+
family=socket.AF_INET,
81+
allow_broadcast=False,
6582
)
6683

6784
async def stop(self):
@@ -70,5 +87,13 @@ async def stop(self):
7087
async def push_all(self, iterable):
7188
self.push_all_sync(iterable)
7289

73-
def push_one(self, data):
74-
self.transport.sendto(data)
90+
async def _getaddrinfo(self, loop):
91+
try:
92+
return await loop.getaddrinfo(
93+
self.host,
94+
self.port,
95+
family=socket.AF_INET,
96+
type=socket.SOCK_DGRAM,
97+
)
98+
except socket.gaierror as e:
99+
log.error("%s -- %s:%s", e, self.host, self.port)

test/test_offline/test_udp.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import pytest
2+
import logging
3+
import socket
4+
5+
import prometheus_push_client as ppc
6+
7+
8+
def test_sync_udp_gaierror(caplog):
9+
transport = ppc.SyncUdpTransport("X-does-not-exist-X-123", 1)
10+
with caplog.at_level(logging.INFO):
11+
transport.start()
12+
13+
assert any(
14+
lr.name == "prometheus.udp" and
15+
any(isinstance(a, socket.gaierror) for a in lr.args)
16+
for lr in caplog.records
17+
)
18+
19+
# does not raise
20+
transport.push_all([b"1", b"2"])
21+
22+
23+
@pytest.mark.asyncio
24+
async def test_async_udp_gaierror(caplog):
25+
transport = ppc.AioUdpTransport("X-does-not-exist-X-123", 1)
26+
with caplog.at_level(logging.INFO):
27+
await transport.start()
28+
29+
assert any(
30+
lr.name == "prometheus.udp" and
31+
any(isinstance(a, socket.gaierror) for a in lr.args)
32+
for lr in caplog.records
33+
)
34+
35+
# does not raise
36+
await transport.push_all([b"1", b"2"])

0 commit comments

Comments
 (0)