-
Notifications
You must be signed in to change notification settings - Fork 1
/
ds_stream.py
110 lines (83 loc) · 2.95 KB
/
ds_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import sys
class TasbotStreamerBase(object):
def __init__(self, serialInterface):
self.ser = serialInterface
self.framecount = 0
def runStream(self):
"""Play the stream"""
# send the first 20 frames
for i in range(1, 20):
self.sendNextFrame()
print ""
while(True):
#Wait until board has something to say
while (self.ser.inWaiting() == 0):
pass
#The board sends an f when it is ready for the next frame
c = self.ser.read()
if (c == 'f'):
self.framecount += 1
self.sendNextFrame()
def sendNextFrame(self):
"""reads the next input and returns appropriately formatted data"""
fbuf = self.getNextLine()
if (int(fbuf[24:25]) == 1):
print "c Y %0.3X" % (int(fbuf[16:19]) * 16) + " " + "%0.3X" % (int(fbuf[20:23]) * 16 + 512) + " f";
self.ser.write("c Y %0.3X" % (int(fbuf[16:19]) * 16) + " " + "%0.3X" % (int(fbuf[20:23]) * 16 + 512) + " f")
else:
print "c N f";
self.ser.write("c N f")
def getNextLine(self):
"""Virtual"""
raise NotImplementedError("getNextLine is virtual in base class")
class TasbotFileStreamer(TasbotStreamerBase):
def __init__(self, serialInterface, inputFileOrList):
super(TasbotFileStreamer, self).__init__(serialInterface)
self.inputFileList = inputFileOrList if hasattr(inputFileOrList, "__iter__") else [inputFileOrList]
self.nextFileIndex = 0
self.inputFh = None
def getNextLine(self):
while (True):
if self.inputFh is None:
if self.nextFileIndex > len(self.inputFileList) - 1:
sys.exit(0)
self.inputFh = open(self.inputFileList[self.nextFileIndex], 'r')
self.nextFileIndex += 1
fbuf = self.inputFh.readline()
if (not fbuf):
self.inputFh.close()
self.inputFh = None
elif (fbuf[0:1] == "#"):
pass
else:
return fbuf
class FakeSerialInterface(object):
def __init__(self):
pass
def inWaiting(self):
return 1
def read(self):
return 'f'
def write(self, buf):
pass
#print "wrote to fake serial output: " + buf
def main():
"""Load serial interface and files from command line args"""
if len(sys.argv) < 3:
sys.stderr.write('Usage: ' + sys.argv[0] + ' <interface> <replayfile> [replayfile] ...\n\n')
sys.exit(0)
serialName = sys.argv[1]
if serialName == "fake":
ser = FakeSerialInterface()
else:
import serial
baud = 9600
ser = serial.Serial(sys.argv[1], baud, timeout = 0)
fileList = []
for i in xrange(2, len(sys.argv)):
fileList.append(sys.argv[i])
streamer = TasbotFileStreamer(ser, fileList)
streamer.runStream()
if __name__ == "__main__":
main()