an asyncio-based client for send metric to StatsD
, Graphite.carbon
, TelegrafStatsD
and DogStatsD
.
pip install aio_statsd
Create connection and send gauge metric. aiostatsd client will automatically send messages in the background when the loop is running
import asyncio
from aio_statsd import StatsdClient
loop = asyncio.get_event_loop()
client = StatsdClient()
loop.run_until_complete(client.connect())
client.gauge('test.key', 1)
loop.run_forever()
Use context manager
import asyncio
from aio_statsd import StatsdClient
async def main():
async with StatsdClient() as client:
client.gauge('test.key', 1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
- host: default value 'localhost', Statsd Server ip
- port: default value 8125, Statsd Server port
- protocol: default value ProtocolFlag.udp, Transport Layer Prrotocol, Select Tcp:
ProtocolFlag.udp
or Udp:ProtocolFlag.tcp
- timeout: default value 0, send msg timeout, if timeout==0, not enable timeout
- debug: default value False, enable debug
- close_timeout: default value 9, Within a few seconds after the client is closed, continue to send messages which in the queue
- create_timeout: default value 9, Create connection timeout
- max_len: default value 10000, deque length
- sample_rate(Use in StatsD Client, DogStatsD Client): default value 1, use sample rate in Statsd or DogStatsD
import asyncio
from aio_statsd import StatsdClient
async def main():
async with StatsdClient() as client:
client.gauge('test.key', 1)
client.counter('test.key', 1)
client.sets('test.key', 1)
client.timer('test.key', 1)
with client.timeit('test'):
pass # run your code
# all metric support sample rate
client.gauge('test1.key', 1, sample_rate=0.5)
# mutli metric support(not support sample rate, the sample rate will always be set to 1)
from aio_statsd import StatsdProtocol
metric = StatsdProtocol()
metric.gauge('test2.key', 1)
metric.sets('test2.key', 1)
client.send_statsd(metric)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
from aio_statsd import GraphiteClient
loop = asyncio.get_event_loop()
client = GraphiteClient()
loop.run_until_complete(client.connect())
client.send_graphite('test.key', 1) # Multiple clients timestamp interval synchronization
loop.run_forever()
Note: Not tested in production
import asyncio
from aio_statsd import DogStatsdClient
async def main():
async with DogStatsdClient() as client:
client.gauge('test.key', 1)
client.distribution('test.key', 1)
client.increment('test.key',1)
client.histogram('test.key', 1)
client.timer('test.key', 1)
with client.timeit('test'):
pass # run your code
# all metric support sample rate and DogStatsD tag
client.gauge('test1.key', 1, sample_rate=0.5, tag_dict={'tag': 'tag1'})
# mutli metric support(
# DogStatsdProtocol will store the message in its own queue and
# DogStatsDClient traverses to read DogStatsdProtocol's message and send it
# )
from aio_statsd import DogStatsdProtocol
metric = DogStatsdProtocol()
metric.gauge('test2.key', 1, tag_dict={'tag': 'tag1'})
metric.histogram('test2.key', 1)
client.send_dog_statsd(metric, sample_rate=0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Note: Not tested in production
import asyncio
from aio_statsd import TelegrafStatsdClient
async def main():
async with TelegrafStatsdClient() as client:
client.gauge('test.key', 1)
client.distribution('test.key', 1)
client.increment('test.key',1)
client.histogram('test.key', 1)
client.timer('test.key', 1)
with client.timeit('test'):
pass # run your code
# all metric support sample rate and TelegrafStatsd tag
client.gauge('test1.key', 1, sample_rate=0.5, tag_dict={'tag': 'tag1'})
# mutli metric support(
# TelegrafStatsdProtocol will store the message in its own queue and
# TelegrafStatsDClient traverses to read TelegrafStatsdProtocol's message and send it
# )
from aio_statsd import TelegrafStatsdProtocol
metric = TelegrafStatsdProtocol()
metric.gauge('test2.key', 1, tag_dict={'tag': 'tag1'})
metric.histogram('test2.key', 1)
client.send_telegraf_statsd(metric, sample_rate=0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Note: Not tested in production
import asyncio
from aio_statsd import TelegrafClient
async def main():
async with TelegrafClient() as client:
client.send_telegraf('test.key', {"field1": 100}, user_server_time=True)