-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathserver.py
109 lines (103 loc) · 2.99 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import asyncio
import logging
import sys
from viam.proto.common import GeoPoint, Orientation, Vector3
from viam.rpc.server import Server
from .components import (
ExampleAnalog,
ExampleArm,
ExampleAudioInput,
ExampleBase,
ExampleBoard,
ExampleCamera,
ExampleDigitalInterrupt,
ExampleEncoder,
ExampleGantry,
ExampleGPIOPin,
ExampleGripper,
ExampleMotor,
ExampleMovementSensor,
ExamplePoseTracker,
ExampleSensor,
ExampleServo,
MovementSensor,
)
from .services import ExampleMLModel, ExampleSLAM
async def run(host: str, port: int, log_level: int):
my_arm = ExampleArm("arm0")
my_audio_input = ExampleAudioInput("audio_input0")
my_base = ExampleBase("base0")
my_board = ExampleBoard(
name="board",
analogs={
"reader1": ExampleAnalog("reader1", 3),
},
digital_interrupts={
"interrupt1": ExampleDigitalInterrupt("interrupt1"),
},
gpio_pins={
"pin1": ExampleGPIOPin("pin1"),
},
)
my_camera = ExampleCamera("camera0")
my_encoder = ExampleEncoder("encoder0")
my_gantry = ExampleGantry("gantry0", [1, 2, 3], [4, 5, 6])
my_gripper = ExampleGripper("gripper0")
my_mlmodel = ExampleMLModel("mlmodel0")
my_motor = ExampleMotor("motor0")
my_movement_sensor = ExampleMovementSensor(
"movement_sensor0",
GeoPoint(latitude=40.664679865782624, longitude=-73.97668056188789),
15,
Vector3(x=1, y=2, z=3),
Vector3(x=1, y=2, z=3),
Vector3(x=1, y=2, z=3),
182,
Orientation(o_x=1, o_y=2, o_z=3, theta=5),
MovementSensor.Properties(
linear_acceleration_supported=False,
linear_velocity_supported=False,
angular_velocity_supported=True,
orientation_supported=False,
position_supported=True,
compass_heading_supported=False,
),
{"foo": 0.1, "bar": 2.0, "baz": 3.14},
)
my_pose_tracker = ExamplePoseTracker("pose_tracker0")
my_sensor = ExampleSensor("sensor0")
my_servo = ExampleServo("servo0")
my_slam = ExampleSLAM("slam0")
server = Server(
resources=[
my_arm,
my_audio_input,
my_base,
my_board,
my_camera,
my_encoder,
my_gantry,
my_gripper,
my_mlmodel,
my_motor,
my_movement_sensor,
my_pose_tracker,
my_sensor,
my_servo,
my_slam,
]
)
await server.serve(host=host, port=port, log_level=log_level)
if __name__ == "__main__":
host = "localhost"
port = 9090
log_level = logging.DEBUG
try:
host = sys.argv[1]
port = int(sys.argv[2])
level = sys.argv[3]
if level.lower() == "q" or level.lower() == "quiet":
log_level = logging.FATAL
except (IndexError, ValueError):
pass
asyncio.run(run(host, port, log_level))