-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathncfparser.py
241 lines (212 loc) · 8.87 KB
/
ncfparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
"""This file is intended to parse an NCF into its components"""
import os
def trim(data):
"""removes spaces from a list"""
ret_data = []
for item in data:
ret_data += item.replace(' ', '')
return ret_data
class NCFParser:
"""parses NCF files into usable data"""
# pylint: disable=too-many-instance-attributes
# These are all reasonable instance attributes to initialize
def __init__(self, starting_file=None):
"""Set up all needed parameters"""
self.loaded = False
self.parsed = False
self.frames = {}
self.nodes = {}
self.signals = {}
self.attributes = {}
self.all_text = None
self.current_file = starting_file
if starting_file:
self.set_file(starting_file)
#utility
def _reset_data(self):
"""When setting a new file, clear all data"""
self.loaded = False
self.parsed = False
self.frames = {}
self.nodes = {}
self.signals = {}
def set_file(self, file_name):
"""takes a path to an NCF file, then reads and parses it"""
if str(file_name[-3:]).lower() == 'ncf':
if os.path.exists(file_name):
self.current_file = file_name
self._reset_data()
self._read_file()
else:
raise FileNotFoundError(file_name+" doesn't exist")
else:
raise ValueError('Incorrect file type')
def _read_file(self):
"""reads the text from the NCF file"""
self.loaded = False
with open(self.current_file) as file:
self.all_text = file.read()
self.loaded = True
self._parse_file()
def _find_ends(self, term, text=None):
"""utility function to find the brackets for a term in text"""
if not text:
text = self.all_text
#add len of term since we know what we asked for and add 1 for the space
start = text.find(term+' {')+len(term)+1
if start == len(term):
start = text.find(term+'{')+len(term)
if start == len(term)-1:
raise Exception('Term not found')
end = temp = start
while text[start:end].count('{') > text[start:end].count('}') or start == end:
temp = end+1
end = text.find('}', temp)+1
if(end == 0) or (end == len(text)):
break
return (start+1, end-1)
def _find_single_line_value(self, search_term, text=None):
"""find the data on a known signal line term"""
if not text:
text = self.all_text[::]
return text[text.find(search_term):].split(';')[0].split('=')[-1].replace(' ', '')
#parsing/data collection
def _parse_file(self):
"""parses the text from the NCF into nodes, signals, frames, and attributes"""
text = self.all_text[::]
while 'node ' in text:
#find the node name
node_name_start = text.find('node ')
node_name_end = text.find(' {', node_name_start)
search_term = text[node_name_start:node_name_end]
name = search_term.split(' ')[1]
#find the node text
node_start, node_end = self._find_ends(search_term, text)
node_text = text[node_start:node_end]
#update the search text to look after the current node
text = text[node_end+1:]
#find the frame text and parse it
frame_text_pos = self._find_ends('frames', node_text)
self.nodes[name] = {}
self.nodes[name]['frames'] = self._parse_all_frames(*frame_text_pos, node_text)
#search node for specific values
terms = ['NAD', 'LIN_protocol', 'bitrate']
for term in terms:
self.nodes[name][term] = self._find_single_line_value(term, node_text)
self.parsed = True
del self.all_text
def _parse_all_frames(self, start, end, text):
"""initiates the parsing of all frames"""
frame_text = text[start:end]
pub_end = -1
sub_end = -1
self.frames['publish'] = {}
self.frames['subscribe'] = {}
#we run forever until we have processed all published and subscribed frames
while True:
pub_start = frame_text.find('publish ', pub_end+1)
sub_start = frame_text.find('subscribe ', sub_end+1)
#check if there is a publish frame to process
if pub_start != -1:
#process the publish frame
pub_name = frame_text[pub_start:].split(' ')[1]
pub_start, pub_end = self._find_ends('publish '+pub_name, frame_text[pub_end+1:])
data = frame_text[pub_start:pub_end]
self.frames['publish'][pub_name] = self._parse_frame(data)
#check if there is a subscribe frame left to process
if sub_start != -1:
#process it if there are any
sub_name = frame_text[sub_start:].split(' ')[1]
sub_start, sub_end = self._find_ends('subscribe '+sub_name, frame_text[sub_end+1:])
data = frame_text[sub_start:sub_end]
self.frames['subscribe'][sub_name] = self._parse_frame(data)
#if there are no publish or subscribe frames, we are don parsing and can break
if (pub_start == -1) and (sub_start == -1):
break
return self.frames
def _parse_frame(self, frame):
"""parses frames into name, id, publisher, length, and composing signals with offsets"""
raw = {}
raw['ID'] = frame[frame.find('message_ID'):].split(';')[0].split('=')[1].replace(' ', '')
raw['len'] = frame[frame.find('length'):].split(';')[0].split('=')[1].replace(' ', '')
raw['signals'] = {}
signal_start, signal_end = self._find_ends('signals', frame)
#add 1 to start to ignore the opening {
signals = frame[signal_start:signal_end]
while '{' in signals:
signal_name = signals[:signals.find('{', 1)]
signal_name = signal_name.replace(' ', '').replace('\n', '').replace('\t', '')
raw['signals'][signal_name] = {}
signal_data = signals[slice(*self._find_ends(signal_name, signals))]
raw['signals'][signal_name]['encoding'] = self._parse_encoding(signal_data)
signal_data = signal_data[:signal_data.find('encoding ')]
signal_data = signal_data.replace('\n', '').replace('\t', '').replace(' ', '')
for data in signal_data.split(';')[:3]:
name, value = data.split('=')[:2]
raw['signals'][signal_name][name] = value
signals = signals[signals.find('}', signals.find('}')+1)+1:]
self.signals[signal_name] = raw['signals'][signal_name]
return raw
# pylint: disable=no-self-use
# This is still part of the overall class
def _parse_encoding(self, encoding):
"""
parses the encoded values for the signal. Shows the message if a logical value or the
min, max, and init if a physical value
"""
raw = {}
encoding = encoding[encoding.find('{'):]
encoding = encoding.replace('\t', '').replace('\n', '').replace('}', '').replace('{', '')
_type = encoding.split(',')[0].split('_')[0]
raw['type'] = _type
encodings = encoding.split(';')
if raw['type'].lower() == 'logical':
for data in encodings:
if data:
value, msg = data.split(',')[1:3]
raw[int(value)] = msg.replace('"', '')
elif raw['type'].lower() == 'physical':
data = encodings[0].split(',')
raw['min'] = data[1].replace(' ', '')
raw['max'] = data[2].replace(' ', '')
raw['init'] = data[4].replace(' ', '')
else:
return None
return raw
#data retrieval
def get_nodes(self):
"""return all nodes"""
return self.nodes
def get_signals(self):
"""return all signals"""
return self.signals
def get_signals_by_publish_node(self, node):
"""return all signals for a given node"""
data = {}
for key, val in self.signals:
if val['publisher'] == node:
data[key] = val
return data
def get_frames(self):
"""return all frames"""
return self.frames
def get_frames_by_publish_node(self, node):
"""return all frames for a given publisher node"""
data = {}
for key, val in self.frames:
if val['publisher'] == node:
data[key] = val
return data
def get_all(self):
"""return all parsed data"""
data = {
'nodes' : self.nodes,
'frames' : self.frames,
'signals' : self.signals}
return data
"""
Example use:
path = path_to_ncf_file
parser = NCFParser(path)
parsed_data = parser.get_all()
"""