Skip to content

Commit

Permalink
update parrot login and cli.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexliang committed Aug 8, 2022
1 parent 8c72a39 commit 878979f
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def train(self, train_data, device, args):
loss.backward()
optimizer.step()

logging.info(
"Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
(batch_idx + 1) * args.batch_size,
len(train_data) * args.batch_size,
100.0 * (batch_idx + 1) / len(train_data),
loss.item(),
)
)
# logging.info(
# "Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
# epoch,
# (batch_idx + 1) * args.batch_size,
# len(train_data) * args.batch_size,
# 100.0 * (batch_idx + 1) / len(train_data),
# loss.item(),
# )
# )
batch_loss.append(loss.item())
epoch_loss.append(sum(batch_loss) / len(batch_loss))
logging.info(
Expand Down
9 changes: 2 additions & 7 deletions python/fedml/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,8 @@ def display_server_logs():
@click.option(
"--docker-rank", "-dr", default="1", help="docker client rank index (from 1 to n).",
)
@click.option(
"--run_id", "-ri", type=int, default=0, help="run id.",
)
def mlops_login(
userid, version, client, server, local_server, role, runner_cmd, device_id, os_name, docker, docker_rank, run_id
userid, version, client, server, local_server, role, runner_cmd, device_id, os_name, docker, docker_rank
):
account_id = userid[0]
platform_url = "open.fedml.ai"
Expand Down Expand Up @@ -219,9 +216,7 @@ def mlops_login(
"-id",
device_id,
"-os",
os_name,
"--run_id",
str(run_id)
os_name
]
).pid
sys_utils.save_login_process(ClientConstants.LOCAL_HOME_RUNNER_DIR_NAME,
Expand Down
5 changes: 1 addition & 4 deletions python/fedml/cli/edge_deployment/client_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
parser.add_argument("--role", "-r", type=str, default="client")
parser.add_argument("--device_id", "-id", type=str, default="0")
parser.add_argument("--os_name", "-os", type=str, default="")
parser.add_argument("--run_id", "-ri", type=str, default="0")
args = parser.parse_args()
args.user = args.user

Expand All @@ -40,9 +39,7 @@
"-id",
args.device_id,
"-os",
args.os_name,
"--run_id",
str(args.run_id)
args.os_name
]
)
ret_code, exec_out, exec_err = ClientConstants.get_console_sys_out_pipe_err_results(login_pid)
Expand Down
3 changes: 1 addition & 2 deletions python/fedml/cli/edge_deployment/client_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def __login_as_simulator(args, userid, version, mqtt_connection=True):
runner.unique_device_id = unique_device_id

if mqtt_connection:
ClientConstants.save_runner_infos(args.device_id + "." + args.os_name, edge_id, run_id=args.run_id)
ClientConstants.save_runner_infos(args.device_id + "." + args.os_name, edge_id, run_id=0)

# Setup MQTT connection for communication with the FedML server.
runner.setup_agent_mqtt_connection(service_config)
Expand Down Expand Up @@ -274,7 +274,6 @@ def logout():
parser.add_argument("--role", "-r", type=str, default="client")
parser.add_argument("--device_id", "-id", type=str, default="0")
parser.add_argument("--os_name", "-os", type=str, default="")
parser.add_argument("--run_id", "-ri", type=str, default="0")
args = parser.parse_args()
args.user = args.user
if args.type == 'login':
Expand Down
12 changes: 8 additions & 4 deletions python/fedml/core/distributed/communication/mqtt/mqtt_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ def loop_stop(self):
def loop_forever(self):
self._client.loop_forever(retry_first_connection=True)

def send_message(self, topic, message):
def send_message(self, topic, message, wait_for_publish=True):
mqtt_send_start_time = time.time()
self._client.publish(topic, payload=message, qos=2)
ret_info = self._client.publish(topic, payload=message, qos=2)
if wait_for_publish:
ret_info.wait_for_publish(1)
MLOpsProfilerEvent.log_to_wandb({"Comm/send_delay_mqtt": time.time() - mqtt_send_start_time})

def send_message_json(self, topic, message):
self._client.publish(topic, payload=message, qos=2)
def send_message_json(self, topic, message, wait_for_publish=True):
ret_info = self._client.publish(topic, payload=message, qos=2)
if wait_for_publish:
ret_info.wait_for_publish(1)

def on_connect(self, client, userdata, flags, rc):
# Callback connected listeners
Expand Down
105 changes: 14 additions & 91 deletions python/fedml/core/mlops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ def log_aggregation_status(status, run_id=None):
ClientConstants.LOCAL_RUNNER_INFO_DIR_NAME, os.getpid(),
str(MLOpsStore.mlops_run_id),
run_status=status)

# Start log processor for current run
if status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FINISHED or \
status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FAILED:
MLOpsRuntimeLogDaemon.get_instance(MLOpsStore.mlops_args).stop_log_processor(MLOpsStore.mlops_run_id,
MLOpsStore.mlops_edge_id)
else:
MLOpsStore.mlops_metrics.report_server_training_status(MLOpsStore.mlops_run_id, status, role=device_role)
release_log_mqtt_mgr()
Expand Down Expand Up @@ -411,6 +417,7 @@ def log_round_info(total_rounds, round_index):

if round_index == -1:
MLOpsStore.mlops_log_round_start_time = time.time()
return

setup_log_mqtt_mgr()
wait_log_mqtt_connected()
Expand Down Expand Up @@ -622,6 +629,10 @@ def init_logs(args, edge_id):
client_ids.append(edge_id)
args.client_id_list = json.dumps(client_ids)
MLOpsRuntimeLog.get_instance(args).init_logs()

# Start log processor for current run
MLOpsRuntimeLogDaemon.get_instance(args).start_log_processor(MLOpsStore.mlops_run_id, MLOpsStore.mlops_edge_id)

logging.info("client ids:{}".format(args.client_id_list))


Expand Down Expand Up @@ -658,8 +669,8 @@ def bind_simulation_device(args, userid, version="release"):
service_config["docker_config"] = docker_config
runner.agent_config = service_config
MLOpsStore.mlops_log_agent_config = service_config
setattr(args, "mqtt_config_path", mqtt_config)
setattr(args, "s3_config_path", s3_config)
# setattr(args, "mqtt_config_path", mqtt_config)
# setattr(args, "s3_config_path", s3_config)
setattr(args, "log_server_url", mlops_config["LOG_SERVER_URL"])
break
except Exception as e:
Expand Down Expand Up @@ -710,93 +721,6 @@ def bind_simulation_device(args, userid, version="release"):
return True


def bind_real_device(args, userid, version="release"):
setattr(args, "account_id", userid)
setattr(args, "current_running_dir", ClientConstants.get_fedml_home_dir())

sys_name = platform.system()
if sys_name == "Darwin":
sys_name = "MacOS"
setattr(args, "os_name", sys_name)
setattr(args, "version", version)
if args.rank == 0:
setattr(args, "log_file_dir", ServerConstants.get_log_file_dir())
setattr(args, "device_id", FedMLServerRunner.get_device_id())
runner = FedMLServerRunner(args)
else:
setattr(args, "log_file_dir", ClientConstants.get_log_file_dir())
setattr(args, "device_id", FedMLClientRunner.get_device_id())
runner = FedMLClientRunner(args)
setattr(args, "config_version", version)
setattr(args, "cloud_region", "")

# Fetch configs from the MLOps config server.
service_config = dict()
config_try_count = 0
edge_id = 0
while config_try_count < 5:
try:
mqtt_config, s3_config, mlops_config, docker_config = runner.fetch_configs()
service_config["mqtt_config"] = mqtt_config
service_config["s3_config"] = s3_config
service_config["ml_ops_config"] = mlops_config
service_config["docker_config"] = docker_config
runner.agent_config = service_config
MLOpsStore.mlops_log_agent_config = service_config
setattr(args, "mqtt_config_path", mqtt_config)
setattr(args, "s3_config_path", s3_config)
setattr(args, "log_server_url", mlops_config["LOG_SERVER_URL"])
break
except Exception as e:
config_try_count += 1
time.sleep(0.5)
continue

if config_try_count >= 5:
click.echo("\nNote: Internet is not connected. "
"Experimental tracking results will not be synchronized to the MLOps (open.fedml.ai).\n")
return False

# Build unique device id
if args.device_id is not None and len(str(args.device_id)) > 0:
if args.rank == 0:
device_role = "Edge.Device"
else:
device_role = "Edge.Server"
unique_device_id = "{}@{}.{}".format(args.device_id, args.os_name, device_role)

# Bind account id to the MLOps platform.
register_try_count = 0
edge_id = 0
while register_try_count < 5:
try:
edge_id = runner.bind_account_and_device_id(
service_config["ml_ops_config"]["EDGE_BINDING_URL"],
args.account_id, unique_device_id, args.os_name,
role="simulator"
)
if edge_id > 0:
runner.edge_id = edge_id
break
except Exception as e:
register_try_count += 1
time.sleep(3)
continue

if edge_id <= 0:
click.echo("Oops, you failed to login the FedML MLOps platform.")
click.echo("Please check whether your network is normal!")
return False
MLOpsStore.mlops_edge_id = edge_id

# Log arguments and binding results.
runner.unique_device_id = unique_device_id

MLOpsStore.mlops_args = args

return True


def fetch_config(args, version="release"):
setattr(args, "current_running_dir", ClientConstants.get_fedml_home_dir())
sys_name = platform.system()
Expand Down Expand Up @@ -881,8 +805,7 @@ def mlops_simulator_login(userid, run_id):
subprocess.Popen(
["fedml", "login", str(userid),
"-v", MLOpsStore.mlops_args.version,
"-c", "-r", "edge_simulator",
"-ri", str(run_id)])
"-c", "-r", "edge_simulator"])

sys_utils.save_simulator_process(ClientConstants.get_data_dir(),
ClientConstants.LOCAL_RUNNER_INFO_DIR_NAME, os.getpid(), str(run_id))
Expand Down
18 changes: 9 additions & 9 deletions python/fedml/ml/trainer/my_model_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ def train(self, train_data, device, args):
# torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)

optimizer.step()
logging.info(
"Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
(batch_idx + 1) * args.batch_size,
len(train_data) * args.batch_size,
100.0 * (batch_idx + 1) / len(train_data),
loss.item(),
)
)
# logging.info(
# "Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
# epoch,
# (batch_idx + 1) * args.batch_size,
# len(train_data) * args.batch_size,
# 100.0 * (batch_idx + 1) / len(train_data),
# loss.item(),
# )
# )
batch_loss.append(loss.item())
epoch_loss.append(sum(batch_loss) / len(batch_loss))
# logging.info('Client Index = {}\tEpoch: {}\tLoss: {:.6f}'.format(
Expand Down
18 changes: 9 additions & 9 deletions python/fedml/ml/trainer/my_model_trainer_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ def train(self, train_data, device, args):
# torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)

optimizer.step()
logging.info(
"Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
(batch_idx + 1) * args.batch_size,
len(train_data) * args.batch_size,
100.0 * (batch_idx + 1) / len(train_data),
loss.item(),
)
)
# logging.info(
# "Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
# epoch,
# (batch_idx + 1) * args.batch_size,
# len(train_data) * args.batch_size,
# 100.0 * (batch_idx + 1) / len(train_data),
# loss.item(),
# )
# )
batch_loss.append(loss.item())
epoch_loss.append(sum(batch_loss) / len(batch_loss))
logging.info(
Expand Down
20 changes: 10 additions & 10 deletions python/fedml/simulation/mpi/fedgkt/GKTClientTrainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@ def train(self):
loss.backward()
self.optimizer.step()

logging.info(
"client {} - Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
self.client_index,
epoch,
batch_idx * len(images),
len(self.local_training_data.dataset),
100.0 * batch_idx / len(self.local_training_data),
loss.item(),
)
)
# logging.info(
# "client {} - Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
# self.client_index,
# epoch,
# batch_idx * len(images),
# len(self.local_training_data.dataset),
# 100.0 * batch_idx / len(self.local_training_data),
# loss.item(),
# )
# )
batch_loss.append(loss.item())
epoch_loss.append(sum(batch_loss) / len(batch_loss))

Expand Down

0 comments on commit 878979f

Please sign in to comment.