forked from viamrobotics/viam-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase.py
123 lines (89 loc) · 3.63 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, ClassVar, Mapping, Optional, Protocol, runtime_checkable
from logging import Logger
from typing_extensions import Self
from viam.operations import Operation
from viam.proto.common import ResourceName
from .types import Subtype
if TYPE_CHECKING:
from viam.robot.client import RobotClient
from viam.utils import ValueTypes
@runtime_checkable
class ResourceBase(Protocol):
"""
The base requirements for a Resource.
"""
SUBTYPE: ClassVar["Subtype"]
"""The Subtype of the Resource"""
name: str
"""The name of the Resource"""
logger: Logger
"""A logger allowing for setting log levels on a per-resource basis"""
@classmethod
def get_resource_name(cls, name: str) -> ResourceName:
"""
Get the ResourceName for this Resource with the given name
::
# Can be used with any resource, using an arm as an example
my_arm_name = Arm.get_resource_name("my_arm")
Args:
name (str): The name of the Resource
Returns:
ResourceName: The ResourceName of this Resource
"""
return ResourceName(
namespace=cls.SUBTYPE.namespace,
type=cls.SUBTYPE.resource_type,
subtype=cls.SUBTYPE.resource_subtype,
name=name,
)
@classmethod
@abstractmethod
def from_robot(cls, robot: "RobotClient", name: str) -> Self:
"""Get the Resource named ``name`` from the provided robot.
::
# Can be used with any resource, using an arm as an example
my_arm = Arm.from_robot(robot, "my_arm")
Args:
robot (RobotClient): The robot
name (str): The name of the Resource
Returns:
Self: The Resource, if it exists on the robot
"""
...
@abstractmethod
async def do_command(
self, command: Mapping[str, "ValueTypes"], *, timeout: Optional[float] = None, **kwargs
) -> Mapping[str, "ValueTypes"]:
"""Send/Receive arbitrary commands to the Resource
::
command = {"cmd": "test", "data1": 500}
result = await component.do_command(command)
Args:
command (Mapping[str, ValueTypes]): The command to execute
Raises:
NotImplementedError: Raised if the Resource does not support arbitrary commands
Returns:
Mapping[str, ValueTypes]: Result of the executed command
"""
...
def get_operation(self, kwargs: Mapping[str, Any]) -> Operation:
"""Get the ``Operation`` associated with the currently running function.
When writing custom resources, you should get the ``Operation`` by calling this function and check to see if it's cancelled.
If the ``Operation`` is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc.
).
Args:
kwargs (Mapping[str, Any]): The kwargs object containing the operation
Returns:
viam.operations.Operation: The operation associated with this function
"""
return kwargs.get(Operation.ARG_NAME, Operation._noop())
async def close(self):
"""Safely shut down the resource and prevent further use.
Close must be idempotent. Later configuration may allow a resource to be "open" again.
If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future
non-Close methods are called.
::
await component.close()
"""
return