-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcpu_throttle.py
237 lines (214 loc) · 9.03 KB
/
cpu_throttle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 - Technout
# URL: https://github.com/technout/cpu_throttle
"""
Python3 Linux script for throttling system CPU frequency based on a desired maximum temperature (celsius).
CPU Throttle is released under the terms of the GNU GPLv3 License.
This script requires package: cpufrequtils
Install command for Ubuntu based distro's: sudo apt install cpufrequtils
Stress testing cpu with command: stress -c 4 -t 120s
With c the number of threads and t time in seconds to stress test.
You can install Gnome extensions on https://extensions.gnome.org/
Very helpful tools are: cpufreq and Freon for temperature and cpu speed measurements
Instructions for running the script as a systemd service daemon:
Step 1.
sudo nano /etc/systemd/system/cpu_throttle.service
[Unit]
Description=Throttles cpu speed at defined temperature
After=multi-user.target
[Service]
Type=simple
Restart=always
ExecStart=/usr/bin/python3 /home/<username>/cpu_throttle.py
[Install]
WantedBy=multi-user.target
Step 2.
sudo systemctl daemon-reload
Step 3.
sudo systemctl enable cpu_throttle.service
sudo systemctl start cpu_throttle.service
"""
import os, sys, time, argparse
import subprocess, logging, signal
# logging.basicConfig(filename='cpu_throttle.log', level=logging.DEBUG)
# logFormatter = logging.Formatter("%(asctime)s %(filename)s: " + fmt.format("%(levelname)s") + " %(message)s", "%Y/%m/%d %H:%M:%S")
if os.geteuid() != 0:
exit('You need to run this with root privileges. Please try again with sudo.')
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)8s] %(message)s",
handlers=[
logging.FileHandler("/var/log/cpu_throttle.log"),
logging.StreamHandler(sys.stdout)
]
)
version = "1.2-2021.09.25"
def getArguments():
parser = argparse.ArgumentParser()
parser.add_argument('--time', type=int, help='Seconds to cooldown cpu before next check, default is 30 seconds.')
parser.add_argument('--crit_temp', type=int, help='Temp for cpu to throttle down (temperature in celcius degrees)')
parser.add_argument('--debug', action='store_true', help='Output more information when set to True.')
args = parser.parse_args()
if args.time is None:
relaxtime = 30 # time in seconds
else:
relaxtime = int(args.time)
if args.crit_temp is None:
crit_temp = 64000 # temp in mili celcius degree
else:
crit_temp = int(args.crit_temp)*1000
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
return relaxtime, crit_temp, args.debug
# determine hardware and kernel types
def hardwareCheck():
# does this work: $ echo "performance" | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
if os.path.exists("/sys/devices/LNXSYSTM:00/LNXTHERM:00/LNXTHERM:01/thermal_zone/temp") == True:
return 4
elif os.path.exists("/sys/bus/acpi/devices/LNXTHERM:00/thermal_zone/temp") == True:
return 5 # intel
elif os.path.exists("/sys/class/hwmon/hwmon0") == True:
return 6 # amd
elif os.path.exists("/sys/class/thermal/thermal_zone3/") == True:
return 7 # intel
elif os.path.exists("/proc/acpi/thermal_zone/THM0/temperature") == True:
return 1
elif os.path.exists("/proc/acpi/thermal_zone/THRM/temperature") == True:
return 2
elif os.path.exists("/proc/acpi/thermal_zone/THR1/temperature") == True:
return 3
else:
return 0
# depending on the kernel and hardware config, read the temperature
def getTemp(hardware):
temp = 0
if hardware == 6:
# logging.debug('reading temp..')
with open("/sys/class/hwmon/hwmon0/temp1_input", 'r') as mem1:
temp = mem1.read().strip()
elif hardware == 1:
temp = open("/proc/acpi/thermal_zone/THM0/temperature").read().strip().lstrip('temperature :').rstrip(' C')
elif hardware == 2:
temp = open("/proc/acpi/thermal_zone/THRM/temperature").read().strip().lstrip('temperature :').rstrip(' C')
elif hardware == 3:
temp = open("/proc/acpi/thermal_zone/THR1/temperature").read().strip().lstrip('temperature :').rstrip(' C')
elif hardware == 4:
temp = open("/sys/devices/LNXSYSTM:00/LNXTHERM:00/LNXTHERM:01/thermal_zone/temp").read().strip().rstrip('000')
elif hardware == 5:
with open("/sys/class/thermal/thermal_zone0/temp") as mem1:
temp = mem1.read().strip()
elif hardware == 7:
with open("/sys/class/thermal/thermal_zone3/temp") as mem1:
temp = mem1.read().strip()
else:
return 0
# logging.debug(f"Temp is {temp}")
# logging.debug(f"Temp is an integer: {isinstance(temp, int)}")
temp = float(temp)
if temp < 1000:
temp = temp * 1000
return int(temp)
def getMinMaxFrequencies(hardware):
if hardware == 0:
# with open("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_min_freq", 'r') as mem1:
# min_freq = mem1.read().strip()
# with open("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq", 'r') as mem1:
# max_freq = mem1.read().strip()
# return (min_freq, max_freq, '')
pass
else:
freq = subprocess.run('cpufreq-info -p', shell=True, stdout=subprocess.PIPE)
if freq.returncode != 0:
logging.warning('cpufreq-info gives error, cpufrequtils package installed?')
return (0, 0, 0)
else:
return tuple(freq.stdout.decode('utf-8').strip().lower().split(' '))
def setMaxFreq(frequency, hardware, cores):
if hardware != 0 :
logging.info(f"Set max frequency to {int(frequency/1000)} MHz")
for x in range(cores):
logging.debug(f'Setting core {x} to {frequency} KHz')
if subprocess.run(f'cpufreq-set -c {x} --max {frequency}', shell=True).returncode != 0:
logging.warning('cpufreq-set gives error, cpufrequtils package installed?')
break
def setGovernor(hardware, governor):
if subprocess.run(f'cpufreq-set -g {governor}', shell=True).returncode != 0:
logging.warning('cpufreq-set gives error, cpufrequtils package installed?')
def getCovernors(hardware):
govs = subprocess.run('cpufreq-info -g', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if govs.returncode != 0:
logging.warning('cpufreq-info gives error, cpufrequtils package installed?')
return ()
else:
logging.debug(f'cpufreq-info governors: {govs.stdout.decode().strip()}')
if govs.stdout is None:
logging.warning('No covernors found!?')
logging.debug(f'Govs: {govs.stdout.decode()}')
return ()
else:
return tuple(govs.stdout.decode('utf-8').strip().lower().split(' '))
# if proces receives a kill signal or sigterm,
# raise an error and handle it in the finally statement for a proper exit
def signal_term_handler(self, args):
raise KeyboardInterrupt()
def main():
global version
hardware = 0
cur_temp = 0
governor_high = 'ondemand'
governor_low = 'powersave'
cur_governor = 'performance'
govs = ()
relax_time, crit_temp, debug = getArguments()
logging.debug(f'critic_temp: {crit_temp}, relaxtime: {relax_time}, debug: {debug}')
cores = os.cpu_count()
if cores is None:
cores = 16
hardware = hardwareCheck()
logging.debug(f'Detected hardware/kernel type is {hardware}')
if hardware == 0:
logging.warning("Sorry, this hardware is not supported")
sys.exit()
freq = getMinMaxFrequencies(hardware)
logging.debug(f'min max gov: {freq}')
min_freq = int(freq[0])
max_freq = int(freq[1])
if freq[2] is not None:
cur_governor = freq[2]
govs = getCovernors(hardware)
if governor_high not in govs:
governor_high = 'performance'
if governor_low not in govs:
logging.warning('Wait, powersave mode not in governors list?')
governor_low = 'userspace'
# logging.debug(f'govs received: {govs}')
signal.signal(signal.SIGINT, signal_term_handler)
signal.signal(signal.SIGTERM, signal_term_handler)
try:
while True:
cur_temp = getTemp(hardware)
logging.info(f'Current temp is {int(cur_temp/1000)}')
if cur_temp is None:
logging.warning('Error: Current temp is None?!')
break
if cur_temp > crit_temp:
logging.warning("CPU temp too high")
logging.info(f"Slowing down for {relax_time} seconds")
setGovernor(hardware, governor_low)
setMaxFreq(min_freq, hardware, cores)
time.sleep(relax_time)
else:
setGovernor(hardware, governor_high)
setMaxFreq(max_freq, hardware, cores)
time.sleep(3)
except KeyboardInterrupt:
logging.warning('Terminating')
finally:
logging.warning('Setting max cpu and governor back to normal.')
setGovernor(hardware, cur_governor)
setMaxFreq(max_freq, hardware, cores)
if __name__ == '__main__':
main()