-
Notifications
You must be signed in to change notification settings - Fork 1
/
example.py
91 lines (72 loc) · 2.45 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent))
import pyrtma
# Import the users compiled message defintions
import msg_defs.example_messages as md
from typing import cast
def publisher(server="127.0.0.1:7111", timecode=False):
# Setup Client
mod = pyrtma.Client(timecode=timecode)
mod.connect(server_name=server)
# Build a packet to send
msg = md.MDF_PERSON_MESSAGE()
msg.name = "Alice"
msg.age = 37
while True:
c = input("Hit enter to publish a message. (Q)uit.")
if c not in ["Q", "q"]:
msg.age += 1
mod.send_message(msg)
print("Sent:")
print(msg.to_json())
else:
mod.send_signal(md.MT_EXIT)
print("Goodbye")
break
def subscriber(server="127.0.0.1:7111", timecode=False):
# Setup Client
mod = pyrtma.Client(timecode=timecode)
mod.connect(server_name=server)
# Select the messages to receive
mod.subscribe([md.MT_PERSON_MESSAGE, md.MT_EXIT])
print("Waiting for packets...")
while True:
try:
msg = mod.read_message(timeout=0.200)
if msg is not None:
if msg.type_id == md.MT_PERSON_MESSAGE:
# optional cast for better type hinting
msg.data = cast(md.MDF_PERSON_MESSAGE, msg.data)
print("Recv:")
print(msg.data.to_json())
elif msg.type_id == md.MT_EXIT:
print("Goodbye.")
break
except KeyboardInterrupt:
break
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--server", default="127.0.0.1:7111", help="RTMA Message Manager ip address."
)
parser.add_argument(
"-t", "--timecode", action="store_true", help="Use timecode in message header"
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--pub", default=False, action="store_true", help="Run as publisher."
)
group.add_argument(
"--sub", default=False, action="store_true", help="Run as subscriber."
)
args = parser.parse_args()
if args.pub:
print("pyrtma Publisher")
publisher(args.server, timecode=args.timecode)
elif args.sub:
print("pyrtma Subscriber")
subscriber(args.server, timecode=args.timecode)
else:
print("Unknown input")