Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Prometheus metrics exporter #260

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions mtprotoproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import re
import runpy
import signal
import http.server

try:
import uvloop
Expand Down Expand Up @@ -153,6 +154,11 @@ def debug_signal(signum, frame):
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
PROMETHEUS_HOST = config.get("PROMETHEUS_HOST")
PROMETHEUS_PORT = config.get("PROMETHEUS_PORT")
# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall,
# set it to false value to disable.
PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'})

TG_DATACENTER_PORT = 443

Expand Down Expand Up @@ -895,6 +901,68 @@ async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
task_clt_to_tg.cancel()

writer_tg.transport.abort()


async def http_reply(writer, line, body=b"", eof=False):
BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler
msg = (
"HTTP/1.1 {}\r\n"
"Server: mtprotoproxy\r\n"
"Date: {}\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: {:d}\r\n"
).format(
line,
BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler),
len(body)
).encode("ascii")
if eof:
msg += b"Connection: close\r\n"
msg += b"\r\n" + body
writer.write(msg)
await writer.drain()
if eof:
writer.write_eof()
writer.close()

async def handle_promstats(reader, writer):
set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time
if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS:
return
while True: # Keep-Alive
request = await reader.readuntil(b"\r\n\r\n")
if request.startswith(b"GET /metrics HTTP/1."):
promstat = (
"# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n"
"# TYPE mtproxy_pump_bytes counter\n"
) + "".join(
"mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"])
for u in stats
) + (
"# HELP mtproxy_connections Current number of post-handshake client connections.\n"
"# TYPE mtproxy_connections gauge\n"
) + "".join(
"mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"])
for u in stats
) + (
"# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n"
"# TYPE mtproxy_connections_total counter\n"
) + "".join(
"mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"])
for u in stats
)
await http_reply(writer, "200 OK", promstat.encode("ascii"))
else:
await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True)
return

async def handle_promstats_wrapper(reader, writer):
try:
await handle_promstats(reader, writer)
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
pass
finally:
writer.transport.abort()


async def handle_client_wrapper(reader, writer):
Expand Down Expand Up @@ -1084,6 +1152,17 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)

if PROMETHEUS_PORT:
task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT,
limit=4096, # http request is quite small
backlog=8, # there are few prometheus collectors
reuse_address=True, # that's still server, TIME_WAIT should not block restart
reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong!
loop=loop)
server_promstats = loop.run_until_complete(task_promstats)
else:
server_promstats = None

reuse_port = hasattr(socket, "SO_REUSEPORT")

task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
Expand All @@ -1109,6 +1188,10 @@ def main():
server_v6.close()
loop.run_until_complete(server_v6.wait_closed())

if server_promstats is not None:
server_promstats.close()
loop.run_until_complete(server_promstats.wait_closed())

loop.close()


Expand Down