@@ -253,14 +253,18 @@ def build_dynamic_args(self, run_config, package_conf_object, base_dir):
253
253
if os .path .exists (bootstrap_script_path ):
254
254
bootstrap_stat = os .stat (bootstrap_script_path )
255
255
if platform .system () == 'Windows' :
256
- os .chmod (bootstrap_script_path , bootstrap_stat .st_mode | stat .S_IXUSR | stat .S_IXGRP | stat .S_IXOTH )
256
+ os .chmod (bootstrap_script_path ,
257
+ bootstrap_stat .st_mode | stat .S_IXUSR | stat .S_IXGRP | stat .S_IXOTH )
257
258
bootstrap_scripts = "{}" .format (bootstrap_script_path )
258
259
else :
259
- os .chmod (bootstrap_script_path , bootstrap_stat .st_mode | stat .S_IXUSR | stat .S_IXGRP | stat .S_IXOTH )
260
- bootstrap_scripts = "cd {}; ./{}" .format (bootstrap_script_dir , os .path .basename (bootstrap_script_file ))
260
+ os .chmod (bootstrap_script_path ,
261
+ bootstrap_stat .st_mode | stat .S_IXUSR | stat .S_IXGRP | stat .S_IXOTH )
262
+ bootstrap_scripts = "cd {}; ./{}" .format (bootstrap_script_dir ,
263
+ os .path .basename (bootstrap_script_file ))
261
264
bootstrap_scripts = str (bootstrap_scripts ).replace ('\\ ' , os .sep ).replace ('/' , os .sep )
262
265
logging .info ("Bootstrap scripts are being executed..." )
263
- process = ClientConstants .exec_console_with_script (bootstrap_scripts , should_capture_stdout_err = True )
266
+ process = ClientConstants .exec_console_with_script (bootstrap_scripts , should_capture_stdout = True ,
267
+ should_capture_stderr = True )
264
268
ret_code , out , err = ClientConstants .get_console_pipe_out_err_results (process )
265
269
if out is not None :
266
270
out_str = out .decode (encoding = "utf-8" )
@@ -340,12 +344,15 @@ def run(self):
340
344
str (dynamic_args_config ["rank" ]),
341
345
"--role" ,
342
346
"client" ,
343
- ]
347
+ ],
348
+ should_capture_stdout = False ,
349
+ should_capture_stderr = True
344
350
)
345
351
ClientConstants .save_learning_process (process .pid )
346
352
self .release_client_mqtt_mgr ()
347
- ret_code , out , err = ClientConstants .get_console_sys_out_pipe_err_results (process )
348
- if ret_code != 0 and err is not None and str (err .decode (encoding = "utf-8" )).find ('__finish ' ) == - 1 :
353
+ ret_code , out , err = ClientConstants .get_console_pipe_out_err_results (process )
354
+ if ret_code != 0 and err is not None and str (err .decode (encoding = "utf-8" )).find ('__finish ' ) == - 1 and \
355
+ (out is not None and str (out .decode (encoding = "utf-8" )).find ('__finish ' ) == - 1 ):
349
356
logging .error ("Exception when executing client program: {}" .format (err .decode (encoding = "utf-8" )))
350
357
self .setup_client_mqtt_mgr ()
351
358
self .wait_client_mqtt_connected ()
@@ -365,11 +372,6 @@ def stop_run(self):
365
372
366
373
logging .info ("Stop run successfully." )
367
374
368
- # Stop log processor for current run
369
- MLOpsRuntimeLogDaemon .get_instance (self .args ).stop_log_processor (self .run_id , self .edge_id )
370
-
371
- time .sleep (2 )
372
-
373
375
# Notify MLOps with the stopping message
374
376
self .mlops_metrics .report_client_training_status (self .edge_id , ClientConstants .MSG_MLOPS_CLIENT_STATUS_STOPPING )
375
377
@@ -391,11 +393,6 @@ def stop_run_with_killed_status(self):
391
393
392
394
logging .info ("Stop run successfully." )
393
395
394
- # Stop log processor for current run
395
- MLOpsRuntimeLogDaemon .get_instance (self .args ).stop_log_processor (self .run_id , self .edge_id )
396
-
397
- time .sleep (2 )
398
-
399
396
# Notify MLOps with the stopping message
400
397
self .mlops_metrics .report_client_training_status (self .edge_id , ClientConstants .MSG_MLOPS_CLIENT_STATUS_STOPPING )
401
398
@@ -435,11 +432,6 @@ def cleanup_run_when_starting_failed(self):
435
432
436
433
logging .info ("Cleanup run successfully when starting failed." )
437
434
438
- # Stop log processor for current run
439
- MLOpsRuntimeLogDaemon .get_instance (self .args ).stop_log_processor (self .run_id , self .edge_id )
440
-
441
- time .sleep (2 )
442
-
443
435
self .reset_devices_status (self .edge_id , ClientConstants .MSG_MLOPS_CLIENT_STATUS_FAILED )
444
436
445
437
time .sleep (2 )
@@ -465,11 +457,6 @@ def cleanup_run_when_finished(self):
465
457
466
458
logging .info ("Cleanup run successfully when finished." )
467
459
468
- # Stop log processor for current run
469
- MLOpsRuntimeLogDaemon .get_instance (self .args ).stop_log_processor (self .run_id , self .edge_id )
470
-
471
- time .sleep (2 )
472
-
473
460
self .reset_devices_status (self .edge_id , ClientConstants .MSG_MLOPS_CLIENT_STATUS_FINISHED )
474
461
475
462
time .sleep (2 )
@@ -616,6 +603,9 @@ def callback_stop_train(self, topic, payload):
616
603
except Exception as e :
617
604
pass
618
605
606
+ # Stop log processor for current run
607
+ MLOpsRuntimeLogDaemon .get_instance (self .args ).stop_log_processor (run_id , self .edge_id )
608
+
619
609
def callback_exit_train_with_exception (self , topic , payload ):
620
610
logging .info ("callback_exit_train_with_exception: topic = %s, payload = %s" % (topic , payload ))
621
611
@@ -666,7 +656,12 @@ def callback_runner_id_status(self, topic, payload):
666
656
run_id = run_id ,
667
657
)
668
658
client_runner .device_status = status
669
- Process (target = client_runner .cleanup_client_with_status ).start ()
659
+ status_process = Process (target = client_runner .cleanup_client_with_status )
660
+ status_process .start ()
661
+ status_process .join (15 )
662
+
663
+ # Stop log processor for current run
664
+ MLOpsRuntimeLogDaemon .get_instance (self .args ).stop_log_processor (run_id , edge_id )
670
665
671
666
def report_client_status (self ):
672
667
self .send_agent_active_msg ()
@@ -768,7 +763,7 @@ def GetUUID():
768
763
def bind_account_and_device_id (self , url , account_id , device_id , os_name , role = "client" ):
769
764
ip = requests .get ('https://checkip.amazonaws.com' ).text .strip ()
770
765
fedml_ver , exec_path , os_ver , cpu_info , python_ver , torch_ver , mpi_installed , \
771
- cpu_usage , available_mem , total_mem , gpu_info , gpu_available_mem , gpu_total_mem = get_sys_runner_info ()
766
+ cpu_usage , available_mem , total_mem , gpu_info , gpu_available_mem , gpu_total_mem = get_sys_runner_info ()
772
767
json_params = {
773
768
"accountid" : account_id ,
774
769
"deviceid" : device_id ,
@@ -794,7 +789,7 @@ def bind_account_and_device_id(self, url, account_id, device_id, os_name, role="
794
789
if gpu_available_mem is not None :
795
790
json_params ["extra_infos" ]["gpu_available_mem" ] = gpu_available_mem
796
791
if gpu_total_mem is not None :
797
- json_params ["extra_infos" ]["gpu_total_mem" ] = gpu_total_mem
792
+ json_params ["extra_infos" ]["gpu_total_mem" ] = gpu_total_mem
798
793
else :
799
794
json_params ["gpu" ] = "None"
800
795
@@ -803,12 +798,14 @@ def bind_account_and_device_id(self, url, account_id, device_id, os_name, role="
803
798
try :
804
799
requests .session ().verify = cert_path
805
800
response = requests .post (
806
- url , json = json_params , verify = True , headers = {"content-type" : "application/json" , "Connection" : "close" }
801
+ url , json = json_params , verify = True ,
802
+ headers = {"content-type" : "application/json" , "Connection" : "close" }
807
803
)
808
804
except requests .exceptions .SSLError as err :
809
805
MLOpsConfigs .install_root_ca_file ()
810
806
response = requests .post (
811
- url , json = json_params , verify = True , headers = {"content-type" : "application/json" , "Connection" : "close" }
807
+ url , json = json_params , verify = True ,
808
+ headers = {"content-type" : "application/json" , "Connection" : "close" }
812
809
)
813
810
else :
814
811
response = requests .post (url , json = json_params , headers = {"Connection" : "close" })
@@ -826,9 +823,9 @@ def send_agent_active_msg(self):
826
823
active_topic = "/flclient_agent/active"
827
824
status = MLOpsStatus .get_instance ().get_client_agent_status (self .edge_id )
828
825
if (
829
- status is not None
830
- and status != ClientConstants .MSG_MLOPS_CLIENT_STATUS_OFFLINE
831
- and status != ClientConstants .MSG_MLOPS_CLIENT_STATUS_IDLE
826
+ status is not None
827
+ and status != ClientConstants .MSG_MLOPS_CLIENT_STATUS_OFFLINE
828
+ and status != ClientConstants .MSG_MLOPS_CLIENT_STATUS_IDLE
832
829
):
833
830
return
834
831
status = ClientConstants .MSG_MLOPS_CLIENT_STATUS_IDLE
@@ -925,12 +922,11 @@ def setup_agent_mqtt_connection(self, service_config):
925
922
MLOpsStatus .get_instance ().set_client_agent_status (self .edge_id , ClientConstants .MSG_MLOPS_CLIENT_STATUS_IDLE )
926
923
self .release_client_mqtt_mgr ()
927
924
925
+ MLOpsRuntimeLogDaemon .get_instance (self .args ).stop_all_log_processor ()
926
+
928
927
def start_agent_mqtt_loop (self ):
929
928
# Start MQTT message loop
930
929
try :
931
930
self .mqtt_mgr .loop_forever ()
932
931
except Exception as e :
933
932
pass
934
-
935
-
936
-
0 commit comments