-
Notifications
You must be signed in to change notification settings - Fork 0
/
terminal.py
115 lines (89 loc) · 3.24 KB
/
terminal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from threading import Thread
import time
import cmd, serial, sys, threading, readline, time, ConfigParser
import wx, os, glob
from wxPython.wx import *
from test_parser import Tester
files_directory = "./logs/"
tester = Tester()
class Terminal():
def __init__(self, display, name):
self.display = display
self.name = name
def loadFromFile(self, filename):
self.file_name = filename
self.f = open(files_directory + self.file_name, 'r')
for line in self.f.readlines():
self.process_line(line)
def connect(self, port):
self.file_name = port.replace("/dev/tty", "")
self.port= port
self.open_file()
try:
self.ser = serial.Serial(port=self.port, baudrate=115200, dsrdtr=0, rtscts=0)
except serial.serialutil.SerialException:
self.out("Could not open serial port " + self.port + ". Device not available")
return
self.ser.setDTR(0)
self.ser.setRTS(0)
receiver_thread = TerminalReader(self.ser, self) # threading.Thread(target=self.reader, args=(self.ser, self, ))
receiver_thread.setDaemon(1)
receiver_thread.start()
def open_file(self):
try:
self.f.close()
except:
pass
self.f = open(files_directory + self.file_name, 'w')
self.f_read = open(files_directory + self.file_name, 'r')
def send(self, text):
# self.out(">>> " + text + "\n")
# tester.trigger_onchanged(self.port + ":LINE", text)
# try:
# self.f.flush()
self.newline_received(text)
self.ser.write(text)
# except:
# print "Exception"
def read(self, file):
pass
def out(self, c):
try:
if self.f:
self.f.write(c)
self.f.flush()
except:
print "No file handle"
def newline_received(self, theline):
# theline = self.f_read.readline()
self.process_line(theline)
def process_line(self, theline):
theline = theline.strip("\n")
theline = theline.strip("\r")
console_line = theline
if (console_line.find('@') >= 0):
console_line = console_line[0:console_line.find('@')]
console_line = console_line.strip()
print self.name + ": " + console_line
self.display.WriteLine(theline)
tester.trigger_onchanged(self.name + ":LINE", theline)
class TerminalReader(Thread):
def __init__ (self, ser, callback):
Thread.__init__(self)
self.ser = ser
self.callback = callback
self.cur_line = ""
def run(self):
while (1):
c = self.ser.read(1)
if not c == '\0':
# self.callback.out(c)
self.cur_line = self.cur_line + c
# print c
if c == '\n':
# Write to file
self.callback.out(self.cur_line)
# self.cur_line = ""
# os.fsync(self.callback.f.fileno())
wx.CallAfter(self.callback.newline_received, self.cur_line)
self.cur_line = ""