generated from anoadragon453/nio-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot_commands.py
146 lines (131 loc) · 5.02 KB
/
bot_commands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from chat_functions import send_text_to_room
import requests
import json
class Command(object):
def __init__(self, client, store, config, command, room, event):
"""A command made by a user
Args:
client (nio.AsyncClient): The client to communicate to matrix with
store (Storage): Bot storage
config (Config): Bot configuration parameters
command (str): The command and arguments
room (nio.rooms.MatrixRoom): The room the command was sent in
event (nio.events.room_events.RoomMessageText): The event describing the command
"""
self.client = client
self.store = store
self.command = command
self.config = config
self.room = room
self.event = event
self.args = self.command.split()[1:]
async def process(self):
"""Process the command"""
if self.command.startswith("echo"):
await self._echo()
elif self.command.startswith("help"):
await self._show_help()
elif self.command.startswith("ads"):
await self._pihole_stats()
elif self.command.startswith("uptime"):
await self._utrobot_stats()
else:
await self._unknown_command()
async def _echo(self):
"""Echo back the command's arguments"""
response = " ".join(self.args)
await send_text_to_room(self.client, self.room.room_id, response)
async def _show_help(self):
"""Show the help text"""
if not self.args:
text = (
"Hello, I am a bot made with matrix-nio! Use `help commands` to view "
"available commands."
)
await send_text_to_room(self.client, self.room.room_id, text)
return
topic = self.args[0]
if topic == "rules":
text = "These are the rules!"
elif topic == "commands":
text = "Available commands"
else:
text = "Unknown help topic!"
await send_text_to_room(self.client, self.room.room_id, text)
async def _pihole_stats(self):
"""Echo back some PiHole stats"""
url = self.config.pihole_url + "/admin/api.php"
json_data = json.loads(requests.request("GET", url, data="", headers="").text)
response = (
"**PiHole Stats** 🖥️<br>Today's DNS queries: **"
+ str(json_data["dns_queries_today"])
+ "**<br>Blocked: **"
+ str(json_data["ads_blocked_today"])
+ "** / **"
+ str(round(json_data["ads_percentage_today"], 2))
+ "**%<br>Client count: **"
+ str(json_data["unique_clients"])
+ "**"
)
await send_text_to_room(self.client, self.room.room_id, response)
async def _utrobot_stats(self):
"""Echo back some uptimerobot stats"""
payload = (
f"api_key={self.config.utrobot_apikey}&format=json&logs=1&response_times=1"
)
headers = {
"content-type": "application/x-www-form-urlencoded",
"cache-control": "no-cache",
}
url = "https://api.uptimerobot.com/v2/getMonitors"
json_data = json.loads(
requests.request("POST", url, data=payload, headers=headers).text
)
response = (
"**Uptime Stats for "
+ str(json_data["pagination"]["total"])
+ " services:** <br><br>**Response Time**:"
)
for monitor in json_data["monitors"]:
response = (
response
+ "<br>- "
+ monitor["friendly_name"]
+ ": "
+ str(monitor["response_times"][0]["value"])
+ "ms"
)
response = response + "<br><br>**Current Status**: "
for monitor in json_data["monitors"]:
if monitor["logs"][0]["type"] == 2:
response = (
response
+ "<br>- "
+ monitor["friendly_name"]
+ ": ✅ for "
+ str(round((monitor["logs"][0]["duration"]) / 60 / 60 / 24, 2))
+ " days"
)
elif monitor["logs"][0]["type"] == 1:
response = (
response
+ "<br>- "
+ monitor["friendly_name"]
+ ": ❌ for "
+ str(round((monitor["logs"][0]["duration"]) / 60 / 60 / 24, 2))
+ " days"
)
else:
response = (
response
+ "<br>- "
+ monitor["friendly_name"]
+ ": ❓️ paused/rebooting"
)
await send_text_to_room(self.client, self.room.room_id, response)
async def _unknown_command(self):
await send_text_to_room(
self.client,
self.room.room_id,
f"I don't know '{self.command}'. Please try again.",
)