@@ -162,16 +162,16 @@ def run(self):
162
162
self .wait_client_mqtt_connected ()
163
163
164
164
self .mlops_metrics .report_client_training_status (self .edge_id ,
165
- ClientConstants .MSG_MLOPS_CLIENT_STATUS_INITIALIZING )
165
+ ClientConstants .MSG_MLOPS_CLIENT_STATUS_RUNNING )
166
166
self .send_deployment_status (self .edge_id , model_name , model_id , "" ,
167
167
ClientConstants .MSG_MODELOPS_DEPLOYMENT_STATUS_INITIALIZING )
168
168
169
169
# update local config with real time parameters from server and dynamically replace variables value
170
170
logging .info ("Download and unzip model to local..." )
171
171
unzip_package_path , fedml_config_object = self .update_local_fedml_config (run_id , model_config )
172
- ClientConstants .cleanup_learning_process ()
173
172
174
173
inference_output_url , model_version , model_metadata , model_config = start_deployment (
174
+ inference_end_point_id , model_id ,
175
175
unzip_package_path , model_name , inference_engine ,
176
176
ClientConstants .INFERENCE_HTTP_PORT ,
177
177
ClientConstants .INFERENCE_GRPC_PORT ,
@@ -181,24 +181,28 @@ def run(self):
181
181
ClientConstants .INFERENCE_SERVER_IMAGE ,
182
182
self .infer_host )
183
183
if inference_output_url == "" :
184
- self .setup_client_mqtt_mgr ()
185
- self .wait_client_mqtt_connected ()
186
- self .mlops_metrics .report_client_id_status (run_id , self .edge_id ,
187
- ClientConstants .MSG_MLOPS_CLIENT_STATUS_FAILED )
188
184
self .send_deployment_status (self .edge_id , model_id , model_name , inference_output_url ,
189
185
ClientConstants .MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED )
190
186
self .send_deployment_results (self .edge_id , model_id , model_name , inference_output_url , model_version ,
191
187
ClientConstants .INFERENCE_HTTP_PORT , inference_engine ,
192
188
model_metadata , model_config )
189
+ self .setup_client_mqtt_mgr ()
190
+ self .wait_client_mqtt_connected ()
191
+ self .mlops_metrics .run_id = self .run_id
192
+ self .mlops_metrics .broadcast_client_training_status (self .edge_id ,
193
+ ClientConstants .MSG_MLOPS_CLIENT_STATUS_FAILED )
193
194
self .release_client_mqtt_mgr ()
194
195
else :
195
196
self .send_deployment_status (self .edge_id , model_id , model_name , inference_output_url ,
196
197
ClientConstants .MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED )
197
198
self .send_deployment_results (self .edge_id , model_id , model_name , inference_output_url , model_version ,
198
199
ClientConstants .INFERENCE_HTTP_PORT , inference_engine ,
199
200
model_metadata , model_config )
200
- self .mlops_metrics .report_client_id_status (run_id , self .edge_id ,
201
- ClientConstants .MSG_MLOPS_CLIENT_STATUS_RUNNING )
201
+ time .sleep (1 )
202
+ self .broadcast_client_training_status (self .edge_id , ClientConstants .MSG_MLOPS_CLIENT_STATUS_FINISHED )
203
+
204
+ while True :
205
+ time .sleep (1 )
202
206
203
207
def send_deployment_results (self , device_id , model_id , model_name , model_inference_url ,
204
208
model_version , inference_port , inference_engine ,
@@ -225,6 +229,19 @@ def send_deployment_status(self, device_id, model_id, model_name, model_inferenc
225
229
self .client_mqtt_mgr .send_message_json (deployment_status_topic , json .dumps (deployment_status_payload ))
226
230
self .release_client_mqtt_mgr ()
227
231
232
+ def broadcast_client_training_status (self , edge_id , status ):
233
+ run_id = 0
234
+ if self .run_id is not None :
235
+ run_id = self .run_id
236
+ topic_name = "fl_client/mlops/status"
237
+ msg = {"edge_id" : edge_id , "run_id" : run_id , "status" : status }
238
+ message_json = json .dumps (msg )
239
+ logging .info ("report_client_training_status. message_json = %s" % message_json )
240
+ self .setup_client_mqtt_mgr ()
241
+ self .wait_client_mqtt_connected ()
242
+ self .client_mqtt_mgr .send_message_json (topic_name , message_json )
243
+ self .release_client_mqtt_mgr ()
244
+
228
245
def reset_devices_status (self , edge_id , status ):
229
246
self .mlops_metrics .run_id = self .run_id
230
247
self .mlops_metrics .edge_id = edge_id
@@ -244,11 +261,6 @@ def stop_run(self):
244
261
245
262
time .sleep (1 )
246
263
247
- try :
248
- ClientConstants .cleanup_learning_process ()
249
- except Exception as e :
250
- pass
251
-
252
264
self .release_client_mqtt_mgr ()
253
265
254
266
def stop_run_with_killed_status (self ):
@@ -265,11 +277,6 @@ def stop_run_with_killed_status(self):
265
277
266
278
time .sleep (1 )
267
279
268
- try :
269
- ClientConstants .cleanup_learning_process ()
270
- except Exception as e :
271
- pass
272
-
273
280
self .release_client_mqtt_mgr ()
274
281
275
282
def exit_run_with_exception (self ):
@@ -279,7 +286,6 @@ def exit_run_with_exception(self):
279
286
280
287
logging .info ("Exit run successfully." )
281
288
282
- ClientConstants .cleanup_learning_process ()
283
289
ClientConstants .cleanup_run_process ()
284
290
285
291
# Notify MLOps with the stopping message
@@ -308,11 +314,6 @@ def cleanup_run_when_starting_failed(self):
308
314
309
315
time .sleep (1 )
310
316
311
- try :
312
- ClientConstants .cleanup_learning_process ()
313
- except Exception as e :
314
- pass
315
-
316
317
self .release_client_mqtt_mgr ()
317
318
318
319
def cleanup_run_when_finished (self ):
@@ -333,11 +334,6 @@ def cleanup_run_when_finished(self):
333
334
334
335
time .sleep (1 )
335
336
336
- try :
337
- ClientConstants .cleanup_learning_process ()
338
- except Exception as e :
339
- pass
340
-
341
337
self .release_client_mqtt_mgr ()
342
338
343
339
def callback_server_status_msg (self , topic = None , payload = None ):
@@ -475,7 +471,7 @@ def callback_start_deployment(self, topic, payload):
475
471
)
476
472
client_runner .infer_host = self .infer_host
477
473
self .process = Process (target = client_runner .run )
478
- #client_runner.run()
474
+ # client_runner.run()
479
475
self .process .start ()
480
476
ClientConstants .save_run_process (self .process .pid )
481
477
0 commit comments