-
Notifications
You must be signed in to change notification settings - Fork 85
/
lutra.py
346 lines (287 loc) · 13 KB
/
lutra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python3
"""Lutra -- A EuroPi reimagining of Expert Sleepers' Otterley
This script is a free-running, syncable LFO with variable shapes. Each output channel's clock speed is slightly
different, with the spread being controlled by K2 (and optionally AIN). The overall speed is controlled by
K1 (and optionally AIN). B1 & DIN both act as synchronization inputs. B2 cycles through the different output
waveforms.
Lutra: a genus of otters, including L. lutra, the eurasian otter and L. sumatrana, the hairy-nosed otter.
@author Chris Iverach-Brereton
@year 2024
@see https://expert-sleepers.co.uk/otterley.html
"""
from europi import *
from europi_script import EuroPiScript
import configuration
import math
from random import random
import time
import _thread
from experimental.math_extras import rescale
from experimental.thread import DigitalInputHelper
class WaveGenerator:
"""Generates the output wave forms and sets the voltage going to one of the output jacks
5 wave shapes are supported, with the cycle time expressed in "ticks." These ticks have no 1:1 relationship
with any real-world time unit, and are simply defined by each iteration through the script's main loop.
"""
## Supported wave shapes
WAVE_SHAPE_SINE = 0
WAVE_SHAPE_SQUARE = 1
WAVE_SHAPE_TRIANGLE = 2
WAVE_SHAPE_SAW = 3
WAVE_SHAPE_RAMP = 4
WAVE_SHAPE_STEP_RANDOM = 5
WAVE_SHAPE_SMOOTH_RANDOM = 6
NUM_WAVE_SHAPES = 7
WAVE_SHAPES_NAMES = [
"Sine",
"Square",
"Triangle",
"Saw",
"Ramp",
"Step_Random",
"Smooth_Random",
]
WAVE_SHAPES_NAMES_TO_SHAPES = {
"sine": WAVE_SHAPE_SINE,
"square": WAVE_SHAPE_SQUARE,
"triangle": WAVE_SHAPE_TRIANGLE,
"saw": WAVE_SHAPE_SAW,
"ramp": WAVE_SHAPE_RAMP,
"step_random": WAVE_SHAPE_STEP_RANDOM,
"smooth_random": WAVE_SHAPE_SMOOTH_RANDOM,
}
## 12x12 pixel images of the wave shapes
WAVE_SHAPE_IMAGES = [
FrameBuffer(bytearray(b'\x10\x00(\x00D\x00D\x00\x82\x00\x82\x00\x82\x10\x82\x10\x01\x10\x01\x10\x00\xa0\x00@'), 12, 12, MONO_HLSB), # SINE
FrameBuffer(bytearray(b'\xfe\x00\x82\x00\x82\x00\x82\x00\x82\x00\x82\x00\x82\x00\x82\x00\x82\x00\x82\x00\x82\x00\x83\xf0'), 12, 12, MONO_HLSB), # SQUARE
FrameBuffer(bytearray(b'\x06\x00\x06\x00\t\x00\t\x00\x10\x80\x10\x80 @ @@ @ \x80\x10\x80\x10'), 12, 12, MONO_HLSB), # TRIANGLE
FrameBuffer(bytearray(b'\x80\x00\xc0\x00\xa0\x00\x90\x00\x88\x00\x84\x00\x82\x00\x81\x00\x80\x80\x80@\x80 \x80\x10'), 12, 12, MONO_HLSB), # SAW
FrameBuffer(bytearray(b'\x00\x10\x000\x00P\x00\x90\x01\x10\x02\x10\x04\x10\x08\x10\x10\x10 \x10@\x10\x80\x10'), 12, 12, MONO_HLSB), # RAMP
FrameBuffer(bytearray(b'\x00\xe0\x00\xa0\x00\xa0\x00\xa0<\xa0$\xa0$\xa0\xe4\xb0\x04\x80\x04\x80\x04\x80\x07\x80'), 12, 12, MONO_HLSB), # STEP_RANDOM
FrameBuffer(bytearray(b'\x00`\x00P\x00P\x08P\x14P$PD\x90\x82\x80\x02\x80\x02\x80\x02\x80\x01\x80'), 12, 12, MONO_HLSB), # SMOOTH_RANDOM
]
IMAGE_SIZE = (12, 12)
def __init__(self, cv_output):
"""Constructor
@param cv_output The CV output jack that the generated wave gets output on
"""
self.cv_output = cv_output
self.shape = 0
self.cycle_ticks = 1000
self.current_tick = 0
self.prev_random_goal = random() * MAX_OUTPUT_VOLTAGE
self.random_goal = random() * MAX_OUTPUT_VOLTAGE
def reset(self):
"""Reset the wave to the beginning
"""
self.current_tick = 0
def change_cycle_length(self, new_length):
"""Change the number of steps in the pattern
We need to preserve our relative progress to avoid skipping when changing the cycle length
"""
if new_length != self.cycle_ticks:
progress = self.current_tick / self.cycle_ticks
self.cycle_ticks = new_length
self.current_tick = int(new_length * progress)
def tick(self):
"""Calculate the appropriate voltage for the output, given the current clock time
@return The desired voltage
"""
if self.shape == self.WAVE_SHAPE_SINE:
# we want to start at -1 and go up, so we actually use a negative cos wave, but the shape is the same
theta = (self.current_tick / self.cycle_ticks) * 2 * math.pi
volts = (-math.cos(theta) + 1) / 2 * MAX_OUTPUT_VOLTAGE
elif self.shape == self.WAVE_SHAPE_SQUARE:
if self.current_tick < (self.cycle_ticks >> 1):
volts = MAX_OUTPUT_VOLTAGE
else:
volts = 0
elif self.shape == self.WAVE_SHAPE_TRIANGLE:
half_cycle_ticks = (self.cycle_ticks >> 1)
if self.current_tick < half_cycle_ticks:
volts = self.current_tick / half_cycle_ticks * MAX_OUTPUT_VOLTAGE
else:
volts = MAX_OUTPUT_VOLTAGE - (self.current_tick - half_cycle_ticks) / half_cycle_ticks * MAX_OUTPUT_VOLTAGE
elif self.shape == self.WAVE_SHAPE_SAW:
volts = MAX_OUTPUT_VOLTAGE - self.current_tick / self.cycle_ticks * MAX_OUTPUT_VOLTAGE
elif self.shape == self.WAVE_SHAPE_RAMP:
volts = self.current_tick / self.cycle_ticks * MAX_OUTPUT_VOLTAGE
elif self.shape == self.WAVE_SHAPE_STEP_RANDOM:
volts = self.random_goal
elif self.shape == self.WAVE_SHAPE_SMOOTH_RANDOM:
slope = (self.random_goal - self.prev_random_goal) / self.cycle_ticks
volts = slope * self.current_tick + self.prev_random_goal
else:
volts = 0
self.current_tick = self.current_tick + 1
if self.current_tick >= self.cycle_ticks:
self.current_tick = 0
self.prev_random_goal = self.random_goal
self.random_goal = random() * MAX_OUTPUT_VOLTAGE
self.cv_output.voltage(volts)
return volts
class Lutra(EuroPiScript):
"""The main class for this script; handles running the main loop, configuring I/O, loading, and saving state.
"""
# We support CV control over either LFO speed OR LFO spread via AIN. This option is not exposed through a menu
# and must be configured via the config file. See lutra.md for details
AIN_MODE_SPREAD = 0
AIN_MODE_SPEED = 1
AIN_MODE_NAMES = [
"Spread",
"Speed"
]
# The maximum and minimum cycle time for the LFOs
MIN_CYCLE_TICKS = 250
MAX_CYCLE_TICKS = 10000
# Maximum wave speed multipliers relative to cv1
MAX_SPEED_MULTIPLIERS = [
1/1,
6/5,
5/4,
4/3,
3/2,
2/1
]
def __init__(self):
"""Constructor
This creates all of the necessary objects, but does not create the separate thread for handling the GUI;
see @main for that.
"""
super().__init__()
self.waves = [
WaveGenerator(cv) for cv in cvs
]
self.config_dirty = False
self.hold_low = False
self.last_wave_change_at = time.ticks_ms()
# Connect the B2 handler to our digital input helper
# B1 and DIN are handled differently since they interact with each other
# See @wave_generator_thread
self.digital_input_state = DigitalInputHelper(
on_b2_rising = self.on_b2_rising
)
# Save the last screen width's worth of output voltages converted to pixel heights
# This speeds up rendering
self.display_pixels = [
[] for cv in cvs
]
# To coordinate access to self.display_pixels between threads we need a mutex to make sure
# we don't read the array while it's being modified
self.pixel_lock = _thread.allocate_lock()
self.load()
@classmethod
def config_points(cls):
"""Return the static configuration options for this class
"""
return [
configuration.choice(name="AIN_MODE", choices=["spread", "speed"], default="spread")
]
def load(self):
"""Load and apply the saved state
@exception ValueError if the configuration contains invalid values
"""
cfg = self.load_state_json()
shape = cfg.get("wave", "sine").lower()
if shape in WaveGenerator.WAVE_SHAPES_NAMES_TO_SHAPES.keys():
for wave in self.waves:
wave.shape = WaveGenerator.WAVE_SHAPES_NAMES_TO_SHAPES[shape]
else:
raise ValueError(f"Unknown wave shape: {shape}")
def save(self):
"""Write the saved-state file & set config_dirty to False
"""
cfg = {
"wave": WaveGenerator.WAVE_SHAPES_NAMES[self.waves[0].shape].lower()
}
self.save_state_json(cfg)
self.config_dirty = False
def on_digital_in_rising(self):
"""Called when either B1 or DIN goes high
Signals the wave generator thread that all outputs should be forced low
"""
self.hold_low = True
def on_digital_in_falling(self):
"""Called when both B1 and DIN are low
Signals the wave generator thread that all outputs should reset
"""
self.hold_low = False
for wave in self.waves:
wave.reset()
def on_b2_rising(self):
"""Called when either B2 goes high
Cycles through the active wave shape
"""
shape = (self.waves[0].shape + 1) % WaveGenerator.NUM_WAVE_SHAPES
for wave in self.waves:
wave.shape = shape
self.last_wave_change_at = time.ticks_ms()
self.config_dirty = True
def gui_render_thread(self):
"""A thread function that handles drawing the GUI
"""
SHOW_WAVE_TIMEOUT = 3000
while True:
now = time.ticks_ms()
oled.fill(0)
with self.pixel_lock:
for channel in self.display_pixels:
for px in range(len(channel)):
oled.pixel(px, channel[px], 1)
if time.ticks_diff(now, self.last_wave_change_at) < SHOW_WAVE_TIMEOUT:
oled.blit(WaveGenerator.WAVE_SHAPE_IMAGES[self.waves[0].shape], 0, 0)
oled.show()
def wave_generation_thread(self):
"""A thread function that handles the underlying math of generating the waveforms
"""
usb_connected_at_start = usb_connected.value()
# To prevent the module locking up when we connect the USB for e.g. debugging, kill this thread
# if the USB state changes. Otherwise the second core will continue being busy, which makes connecting
# to the Python terminal impossible
while usb_connected.value() == usb_connected_at_start:
# Read the digital inputs
self.digital_input_state.update()
# Manually handle B1 and DIN rising & falling
# If either goes high, signal that we want to old the outputs low
# If both become low, signal that all outputs should reset & output normally
if self.digital_input_state.b1_rising or self.digital_input_state.din_rising:
self.on_digital_in_rising()
elif (
(self.digital_input_state.b1_falling and not self.digital_input_state.din_high) or
(self.digital_input_state.din_falling and not self.digital_input_state.b1_pressed)
):
self.on_digital_in_falling()
# Read the CV inputs and apply them
# Round to 2 decimal places to reduce noise
ain_percent = round(ain.percent(), 2)
k1_percent = round(k1.percent(), 2)
k2_percent = round(k2.percent(), 2)
if self.config.AIN_MODE == self.AIN_MODE_SPREAD:
k_speed = k1_percent
k_spread = clamp(k2_percent + ain_percent, 0, 1)
else:
k_speed = clamp(k1_percent + ain_percent, 0, 1)
k_spread = k2_percent
base_ticks = int((1.0 - k_speed) * (self.MAX_CYCLE_TICKS - self.MIN_CYCLE_TICKS) + self.MIN_CYCLE_TICKS)
for i in range(len(cvs)):
base_tick_multiplier = rescale(k_spread, 0, 1, 1, self.MAX_SPEED_MULTIPLIERS[i])
spread_ticks = int(base_ticks / base_tick_multiplier)
self.waves[i].change_cycle_length(spread_ticks)
for i in range(len(cvs)):
pixel_height = OLED_HEIGHT - 1
if self.hold_low:
cvs[i].off()
else:
volts = self.waves[i].tick()
pixel_height = int(OLED_HEIGHT - 1 - volts / MAX_OUTPUT_VOLTAGE * (OLED_HEIGHT-1))
with self.pixel_lock:
self.display_pixels[i].append(pixel_height)
if len(self.display_pixels[i]) >= OLED_WIDTH:
self.display_pixels[i].pop(0)
if self.config_dirty:
self.save()
def main(self):
gui_thread = _thread.start_new_thread(self.gui_render_thread, ())
self.wave_generation_thread()
if __name__ == "__main__":
Lutra().main()