-
Notifications
You must be signed in to change notification settings - Fork 2
/
pdlib.py
308 lines (241 loc) · 9.01 KB
/
pdlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# This file is part of pre.di.c
# pre.di.c, a preamp and digital crossover
# Copyright (C) Roberto Ripio
"""
miscellanea of utility functions for use in predic scripts
"""
import socket
import sys
import time
import subprocess as sp
import math as m
import jack
import yaml
import numpy as np
import baseconfig as base
import init
# used on startaudio.py and stopaudio.py
def read_clients(phase):
"""
reads list of programs to start/stop from config/clients.yml file
phase: <'start'|'stop'> phase of client activation or deactivation
"""
clients_list_path = init.clients_path
with open(clients_list_path) as clients_file:
clients_dict = yaml.safe_load(clients_file)
# init a list of client actions
clients = [
clients_dict[i][phase]
for i in clients_dict if phase in clients_dict[i]
]
return clients
def get_yaml(filepath):
"""
returns dictionary from yaml config file
"""
with open(filepath) as configfile:
config_dict = yaml.safe_load(configfile)
return config_dict
def client_socket(data, port, quiet=True):
"""
makes a socket for talking to the server
"""
# avoid void command to reach server and get processed due to encoding
if data == '':
return b'ACK\n'
server = 'localhost'
# port = init.config['control_port']
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
if not quiet:
print(f'\n(lib) Connecting to {server}, port {str(port)}...')
s.connect((server, port))
except socket.gaierror as e:
print(f'\n(lib) Address-related error connecting to server: {e}')
sys.exit(-1)
except socket.error as e:
print(f'\n(lib) Connection error: {e}')
sys.exit(-1)
if not quiet:
print('\n(lib) Connected')
try:
# if a parameter is passed it is send to server
s.send(data.encode())
# return raw bytes server answer
return s.recv(2048)
except Exception:
print(f'\n(lib) unexpected error: {sys.exc_info()[0]}')
def read_state():
"""
retrieve state dictionary from server to be used by clients
"""
string = client_socket('status', init.config['control_port'])
return yaml.safe_load(string.decode().replace('\nOK', ''))
def wait4result(command, answer, tmax=5, interval=0.1):
"""
looks for chain "answer" in "command" output
"""
time_start = time.time()
def elapsed():
return time.time() - time_start
while elapsed() < tmax:
try:
if init.config['verbose'] in {0, 1}:
output = {"stderr": sp.DEVNULL}
else:
output = {}
if answer in sp.check_output(
command, shell=True, universal_newlines=True, **output):
if init.config['verbose'] in {1, 2}:
print(
f'\n(lib) found string "{answer}" in output of '
f'command: {command}'
)
return True
except Exception as e:
if init.config['verbose'] in {2}:
print(e)
time.sleep(interval)
else:
if init.config['verbose'] in {1, 2}:
print(
f'\n(lib) time out >{tmax}s waiting for string "{answer}"'
f' in output of command: {command}'
)
return False
def wait4source(source, tmax=5, interval=0.1):
"""
wait for source jack ports to be up
"""
source_ports = init.sources[source]['source_ports']
if wait4ports(source_ports, tmax, interval):
return True
else:
return False
def wait4ports(ports, tmax=5, interval=0.1):
"""
wait for jack ports to be up
"""
time_start = time.time()
jc = jack.Client('wait_client')
ports_name = ports[1].split(':', 1)[0]
while (time.time() - time_start) < tmax:
# names of up input ports at this very moment as a generator
up_ports = (
port.name for port in
jc.get_ports(ports_name, is_output=False)
)
# compare sets and, if wanted ports are among up input ports, \
# then wanted ports are up and ready :-)
if set(ports).issubset(set(up_ports)):
# go on
jc.close()
return True
else:
time.sleep(interval)
# time is exhausted and source ports are down :-(
# leave function without any connection made
jc.close()
return False
def gain_dB(x):
return 20 * m.log10(x)
"""
calculates gain in dB from gain multiplier
"""
def calc_gain(level):
"""
calculates gain from level and reference gain
"""
gain = level + init.speaker['ref_level_gain']
return gain
def calc_level(gain):
"""
calculates level from gain and reference gain
"""
level = gain - init.speaker['ref_level_gain']
return level
def calc_headroom(gain):
"""
calculates headroom from gain and equalizer
"""
tones = {'off': 0, 'on': 1}[init.state['tones']]
headroom = (base.gain_max
- gain
- np.clip(init.state['bass'], 0, None) * tones
- np.clip(init.state['treble'], 0, None) * tones
- abs(init.state['balance'])
)
return headroom
def calc_source_gain(source):
"""
retrieves source gain shift
"""
return init.sources[source]['gain']
def show():
"""
compose a status report string
"""
source_gain = calc_source_gain(init.state['source'])
gain = calc_gain(init.state['level']) + source_gain
headroom = calc_headroom(gain)
source_headroom = headroom - source_gain
show_str = f'''
Loudspeaker: {init.config['loudspeaker']: >10s}
fs {init.speaker['devices']['samplerate']: 10d}
Reference level {init.speaker['ref_level_gain']: 10.1f}
Mute {init.state['mute']: >10s}
Level {init.state['level']: 10.1f}
Balance {init.state['balance']: 10.1f}
Channels {init.state['channels']: >10s}
Channels flip {init.state['channels_flip']: >10s}
Polarity {init.state['polarity']: >10s}
Polarity flip {init.state['polarity_flip']: >10s}
Stereo {init.state['stereo']: >10s}
Solo {init.state['solo']: >10s}
Tones {init.state['tones']: >10s}
Bass {init.state['bass']: 10.1f}
Treble {init.state['treble']: 10.1f}
Loudness {init.state['loudness']: >10s}
Loudness reference {init.state['loudness_ref']: 10.1f}
DRC {init.state['drc']: >10s}
DRC set {init.state['drc_set']: >10s}
Phase equalizer {init.state['phase_eq']: >10s}
EQ {init.state['eq']: >10s}
EQ filter {init.state['eq_filter']: >10s}
Sources {init.state['sources']: >10s}
Source {init.state['source']: >10s}
Source gain {source_gain: 10.1f}
Source headroom {source_headroom: 10.1f}
Gain {gain: 10.1f}
Headroom {headroom: 10.1f}
Clamp gain {init.state['clamp']: >10s}
'''
return show_str
help_str = '''
status Display status
save Save status
ping Request an answer from server
show Show a human readable status
clamp <off|on|toggle> Set or toggle level clamp
sources <off|on|toggle> Set or toggle sources
source <input> Select source
drc <off|on|toggle> Set or toggle DRC filters
drc_set <drc_set> Select DRC filters set
phase_eq <off|on|toggle> Set or toggle phase equalizer filter
channels <lr|l|r> Set input channels selector mode
channels_flip <off|on|toggle> Set or toggle channels flip
polarity <off|on|toggle> Set or toggle polarity inversion
polarity_flip <off|on|toggle> Set or toggle polarity flip
stereo <normal|mid|side> Set input channels mixer mode
solo <lr|l|r> Set output channels selector mode
mute <off|on|toggle> Set or toggle mute
loudness <off|on|toggle> Set or toggle loudness control
loudness_ref <loudness_ref> [add] Set loudness reference level
tones <off|on|toggle> Set or toggle tone control
treble <treble> [add] Set treble level
bass <bass> [add] Set bass level
balance <balance> [add] Set balance (- left , + for right)
level <level> [add] Set volume level
gain <gain> Set digital gain
'add' option makes previous number be an increment
'''