@@ -144,85 +144,88 @@ def get_msg_df(log_path, op_df):
144144 op_info [opid ] = {'op_type' :op_type , 'topic' :topic }
145145
146146 rows = []
147+ def process_message (node_id , jsons ):
148+ msg = jsons ['msg' ]
149+ if msg .startswith ('>> ' ):
150+ direction = 'out'
151+ elif msg .startswith ('<< ' ):
152+ direction = 'in'
153+ else :
154+ return # it's not a message sent between peers
155+
156+ row = {
157+ 'node_id' : node_id ,
158+ 'in_out' : direction ,
159+ # get peer ID from the port number
160+ 'peer_id' : int (jsons ['addr' ].split (':' )[1 ]) - 30200 ,
161+ 'timestamp' : parse (jsons ['t' ]),
162+ 'msg_type' : jsons ['msg' ].split (' ' )[1 ].split (':' )[0 ],
163+ }
164+
165+ if ('req' in jsons ):
166+ row ['req_id' ] = jsons ['req' ]
167+ if ('total-wtime' in jsons ):
168+ row ['total_wtime' ] = jsons ['total-wtime' ]
169+ if ('wtime' in jsons ):
170+ row ['wtime' ] = jsons ['wtime' ]
171+ if ('ok' in jsons ):
172+ row ['ok' ] = jsons ['ok' ]
173+
174+ if ('opid' in jsons ):
175+ op = jsons ['opid' ]
176+ row ['opid' ] = op
177+ # add other attributes known about this operation
178+ row ['topic' ] = op_info [op ]['topic' ]
179+ row ['op_type' ] = op_info [op ]['op_type' ]
180+
181+ # we have a key to the message specified
182+ # currently it can only be the topic
183+ if ('topic' in jsons ):
184+ # replace topic digest by topic name
185+ topic = jsons ['topic' ]
186+ row ['key' ] = topic_mapping [topic ]
187+
188+ rows .append (row )
189+
147190 for log_file in os .listdir (log_path ):
148191 if (not log_file .startswith ("node-" )):
149192 continue
193+
150194 print ("Reading" , log_file )
151195 node_id = log_file .split ('-' )[1 ].split ('.' )[0 ] #node-10.log
152- for line in open (log_path + '/' + log_file , 'r' ).readlines ():
153- #not a json line
154- if (line [0 ] != '{' ):
155- continue
156- row = {}
157- row ['node_id' ] = node_id
158- #print("\t", line)
159- jsons = json .loads (line )
160- #it's not a message sent between peers
161- if ('addr' not in jsons ):
162- continue
163- #get peer ID from the port number
164- row ['peer_id' ] = int (jsons ['addr' ].split (':' )[1 ]) - 30200
165- in_out_s = jsons ['msg' ].split (' ' )[0 ]
166- if (in_out_s == '<<' ):
167- row ['in_out' ] = 'in'
168- elif (in_out_s == '>>' ):
169- row ['in_out' ] = 'out'
170- else :
171- #it's not a message sent between peers
172- continue
173- row ['timestamp' ] = parse (jsons ['t' ])
174- row ['msg_type' ] = jsons ['msg' ].split (' ' )[1 ].split (':' )[0 ]
175- if ('req' in jsons ):
176- row ['req_id' ] = jsons ['req' ]
177- #print(row)
178- if ('opid' in jsons ):
179- row ['opid' ] = jsons ['opid' ]
180-
181- if ("total-wtime" in jsons ):
182- row ['total_wtime' ] = jsons ['total-wtime' ]
183- if ("wtime" in jsons ):
184- row ['wtime' ] = jsons ['wtime' ]
185- if ("ok" in jsons ):
186- row ['ok' ] = jsons ['ok' ]
187-
188- #we have a key to the message specified
189- #currently it can only be the topic
190- if ("topic" in jsons ):
191- #replace topic digest by topic name
192- topic = jsons ['topic' ]
193- row ['key' ] = topic_mapping [topic ]
194- #print(row)
195- rows .append (row )
196-
197-
196+ with open (log_path + '/' + log_file , 'r' ) as f :
197+ for line in f :
198+ if line [0 ] == '{' :
199+ jsons = json .loads (line )
200+ if 'addr' in jsons :
201+ process_message (node_id , jsons )
202+
203+ print ('Constructing the dataframe' )
198204 msg_df = pd .DataFrame (rows )
199- #keep only the send messages (as they have the opid)
200- msg_df = msg_df [msg_df ['in_out' ] == 'out' ]
201205
202- mapping = {}
206+ # associate op_id, topic, op_type with all messages.
207+ print ('Propagating message op_ids' )
208+ mapping = {} # req_id -> opid
203209 def process (row ):
204- op_id = row ['opid' ]
205- req_id = row ['req_id' ]
206-
207- if (not numpy .isnan (op_id )):
208- mapping [req_id ] = op_id
210+ op_id = row .get ('opid' , numpy .NaN )
211+ if not numpy .isnan (op_id ):
212+ mapping [row ['req_id' ]] = op_id
213+ return # fields already set by process_message
209214
210- if (req_id in mapping ):
211- row ['tmp' ] = mapping [req_id ]
212- if (not numpy .isnan (row ['tmp' ])):
213- row ['topic' ] = op_info [row ['tmp' ]]['topic' ]
214- row ['op_type' ] = op_info [row ['tmp' ]]['op_type' ]
215+ req_id = row ['req_id' ]
216+ if req_id in mapping :
217+ op_id = mapping [req_id ]
218+ row ['opid' ] = op_id
219+ row ['topic' ] = op_info [op_id ]['topic' ]
220+ row ['op_type' ] = op_info [op_id ]['op_type' ]
215221 else :
216- row ['tmp ' ] = numpy .NaN
222+ row ['opid ' ] = numpy .NaN
217223 row ['topic' ] = numpy .NaN
218224 row ['op_type' ] = numpy .NaN
219225 return row
220226
221227 msg_df = msg_df .apply (lambda row : process (row ), axis = 1 )
222- msg_df ['opid' ] = msg_df ['tmp' ]
223- msg_df .drop ('tmp' , axis = 1 , inplace = True )
224228 msg_df = msg_df .dropna (subset = ['opid' ])
225-
226229 return msg_df
227230
228231def get_op_df (log_path ):
0 commit comments