forked from viamrobotics/viam-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathboard.py
414 lines (292 loc) · 14 KB
/
board.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
import abc
import sys
from datetime import timedelta
from typing import Any, Dict, Final, List, Optional
from viam.proto.component.board import PowerMode, ReadAnalogReaderResponse, StreamTicksResponse
from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT
from viam.streams import Stream
from ..component_base import ComponentBase
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
Tick = StreamTicksResponse
TickStream = Stream[Tick]
class Board(ComponentBase):
"""
Board represents a physical general purpose compute board that contains various
components such as analog readers/writers, and digital interrupts.
This acts as an abstract base class for any drivers representing specific
board implementations. This cannot be used on its own. If the ``__init__()`` function is
overridden, it must call the ``super().__init__()`` function.
::
from viam.components.board import Board
For more information, see `Board component <https://docs.viam.com/dev/reference/apis/components/board/>`_.
"""
API: Final = API( # pyright: ignore [reportIncompatibleVariableOverride]
RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT, "board"
)
class Analog:
"""
Analog represents an analog pin reader or writer that resides on a Board.
"""
name: str
"""The name of the analog pin"""
Value: "TypeAlias" = ReadAnalogReaderResponse
"""
Value contains the result of reading an analog reader. It contains the raw data read,
the reader's minimum and maximum possible values, and its step size (the minimum possible
change between values it can read).
For more information, see `analogs <https://docs.viam.com/dev/reference/apis/components/board/#analogs>`_.
"""
def __init__(self, name: str):
self.name = name
@abc.abstractmethod
async def read(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> Value:
"""
Read the current value from the reader.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the Analog "my_example_analog_reader".
reader = await my_board.analog_by_name(
name="my_example_analog_reader")
# Get the value of the digital signal "my_example_analog_reader" has most
# recently measured.
reading = await reader.read()
Returns:
Value: The current value, including the min, max, and step_size of the reader.
For more information, see `Board component Analog API <https://docs.viam.com/dev/reference/apis/components/board/#analog-api>`_.
"""
...
@abc.abstractmethod
async def write(self, value: int, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs):
"""
Write a value to the Analog writer.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the Analog "my_example_analog_writer".
writer = await my_board.analog_by_name(
name="my_example_analog_writer")
await writer.write(42)
Args:
value (int): Value to write to the analog writer.
For more information, see `Board component Analog API <https://docs.viam.com/dev/reference/apis/components/board/#analog-api>`_.
"""
...
class DigitalInterrupt:
"""
DigitalInterrupt represents a configured interrupt on the Board that
when interrupted, calls the added callbacks. Post processors can
be added to modify what Value it ultimately returns.
For more information, see `digital_interrupts <https://docs.viam.com/dev/reference/apis/components/board/#digital_interrupts>`_.
"""
name: str
"""The name of the digital interrupt."""
def __init__(self, name: str):
self.name = name
@abc.abstractmethod
async def value(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
"""
Get the current value of the interrupt,
which is based on the type of interrupt.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the DigitalInterrupt "my_example_digital_interrupt".
interrupt = await my_board.digital_interrupt_by_name(
name="my_example_digital_interrupt")
# Get the amount of times this DigitalInterrupt has been interrupted with a
# tick.
count = await interrupt.value()
Returns:
int: The current value.
For more information, see
`Board component DigitalInterrupt API <https://docs.viam.com/dev/reference/apis/components/board/#digitalinterrupt-api>`_.
"""
...
class GPIOPin:
"""
Abstract representation of an individual GPIO pin on a board.
"""
name: str
"""The name of the GPIO pin."""
def __init__(self, name: str):
self.name = name
@abc.abstractmethod
async def set(self, high: bool, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs):
"""
Set the pin to either low or high.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the GPIOPin with pin number 15.
pin = await my_board.gpio_pin_by_name(name="15")
# Set the pin to high.
await pin.set(high=True)
Args:
high (bool): When true, sets the pin to high. When false, sets the pin to low.
For more information, see `GPIOPin API <https://docs.viam.com/dev/reference/apis/components/board/#gpiopin-api>`_.
"""
...
@abc.abstractmethod
async def get(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> bool:
"""
Get the high/low state of the pin.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the GPIOPin with pin number 15.
pin = await my_board.gpio_pin_by_name(name="15")
# Get if it is true or false that the state of the pin is high.
high = await pin.get()
Returns:
bool: Indicates if the state of the pin is high.
For more information, see `GPIOPin API <https://docs.viam.com/dev/reference/apis/components/board/#gpiopin-api>`_.
"""
...
@abc.abstractmethod
async def get_pwm(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> float:
"""
Get the pin's given duty cycle.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the GPIOPin with pin number 15.
pin = await my_board.gpio_pin_by_name(name="15")
# Get if it is true or false that the state of the pin is high.
duty_cycle = await pin.get_pwm()
Returns:
float: The duty cycle.
For more information, see `GPIOPin API <https://docs.viam.com/dev/reference/apis/components/board/#gpiopin-api>`_.
"""
...
@abc.abstractmethod
async def set_pwm(self, duty_cycle: float, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs):
"""
Set the pin to the given ``duty_cycle``.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the GPIOPin with pin number 15.
pin = await my_board.gpio_pin_by_name(name="15")
# Set the duty cycle to .6, meaning that this pin will be in the high state for
# 60% of the duration of the PWM interval period.
await pin.set_pwm(duty_cycle=.6)
Args:
duty_cycle (float): The duty cycle.
For more information, see `GPIOPin API <https://docs.viam.com/dev/reference/apis/components/board/#gpiopin-api>`_.
"""
...
@abc.abstractmethod
async def get_pwm_frequency(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
"""
Get the PWM frequency of the pin.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the GPIOPin with pin number 15.
pin = await my_board.gpio_pin_by_name(name="15")
# Get the PWM frequency of this pin.
freq = await pin.get_pwm_frequency()
Returns:
int: The PWM frequency.
For more information, see `GPIOPin API <https://docs.viam.com/dev/reference/apis/components/board/#gpiopin-api>`_.
"""
...
@abc.abstractmethod
async def set_pwm_frequency(
self,
frequency: int,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
"""
Set the pin to the given PWM ``frequency`` (in Hz).
When ``frequency`` is 0, it will use the board's default PWM frequency.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the GPIOPin with pin number 15.
pin = await my_board.gpio_pin_by_name(name="15")
# Set the PWM frequency of this pin to 1600 Hz.
high = await pin.set_pwm_frequency(frequency=1600)
Args:
frequency (int): The frequency, in Hz.
For more information, see `GPIOPin API <https://docs.viam.com/dev/reference/apis/components/board/#gpiopin-api>`_.
"""
...
@abc.abstractmethod
async def analog_by_name(self, name: str) -> Analog:
"""
Get an Analog (reader or writer) by ``name``.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the Analog "my_example_analog_reader".
reader = await my_board.analog_by_name(name="my_example_analog_reader")
Args:
name (str): Name of the analog reader to be retrieved.
Returns:
Analog: The analog reader or writer.
For more information, see `Board component <https://docs.viam.com/dev/reference/apis/components/board/#analogbyname>`_.
"""
...
@abc.abstractmethod
async def digital_interrupt_by_name(self, name: str) -> DigitalInterrupt:
"""
Get a DigitalInterrupt by ``name``.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the DigitalInterrupt "my_example_digital_interrupt".
interrupt = await my_board.digital_interrupt_by_name(
name="my_example_digital_interrupt")
Args:
name (str): Name of the digital interrupt.
Returns:
DigitalInterrupt: The digital interrupt.
For more information, see `Board component <https://docs.viam.com/dev/reference/apis/components/board/#digitalinterruptbyname>`_.
"""
...
@abc.abstractmethod
async def gpio_pin_by_name(self, name: str) -> GPIOPin:
"""
Get a GPIO Pin by ``name``.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Get the GPIOPin with pin number 15.
pin = await my_board.gpio_pin_by_name(name="15")
Args:
name (str): Name of the GPIO pin.
Returns:
GPIOPin: The pin.
For more information, see `Board component <https://docs.viam.com/dev/reference/apis/components/board/#gpiopinbyname>`_.
"""
...
@abc.abstractmethod
async def set_power_mode(
self, mode: PowerMode.ValueType, duration: Optional[timedelta] = None, *, timeout: Optional[float] = None, **kwargs
):
"""
Set the board to the indicated power mode.
::
my_board = Board.from_robot(robot=machine, name="my_board")
# Set the power mode of the board to OFFLINE_DEEP.
status = await my_board.set_power_mode(mode=PowerMode.POWER_MODE_OFFLINE_DEEP)
Args:
mode (PowerMode): The desired power mode.
duration (Optional[timedelta]): Requested duration to stay in power mode.
For more information, see `Board component <https://docs.viam.com/dev/reference/apis/components/board/#setpowermode>`_.
"""
...
@abc.abstractmethod
async def stream_ticks(self, interrupts: List[DigitalInterrupt], *, timeout: Optional[float] = None, **kwargs) -> TickStream:
"""
Stream digital interrupt ticks.
::
my_board = Board.from_robot(robot=machine, name="my_board")
di8 = await my_board.digital_interrupt_by_name(name="8")
di11 = await my_board.digital_interrupt_by_name(name="11")
# Iterate over stream of ticks from pins 8 and 11.
async for tick in await my_board.stream_ticks([di8, di11]):
print(f"Pin {tick.pin_name} changed to {'high' if tick.high else 'low'} at {tick.time}")
Args:
interrupts (List[DigitalInterrupt]) : list of digital interrupts to receive ticks from.
Returns:
TickStream: stream of ticks.
For more information, see `Board component <https://docs.viam.com/dev/reference/apis/components/board/#streamticks>`_.
"""
...