Skip to content

Commit 23d88fc

Browse files
committed
[Deploy] Remove unnecessary logic.
1 parent 34fdba0 commit 23d88fc

File tree

3 files changed

+10
-239
lines changed

3 files changed

+10
-239
lines changed

python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py

Lines changed: 7 additions & 225 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
import fedml
2+
13
import logging
24
import os
3-
import pickle
4-
import platform
55
import shutil
66
import time
77
import traceback
88
import yaml
99
import datetime
10+
import docker
1011

1112
import requests
1213
import torch
@@ -15,27 +16,18 @@
1516

1617
import collections.abc
1718

18-
import fedml
1919
from fedml.computing.scheduler.comm_utils import sys_utils, security_utils
20-
from fedml.computing.scheduler.comm_utils.container_utils import ContainerUtils
2120
from fedml.computing.scheduler.comm_utils.hardware_utils import HardwareUtil
2221
from fedml.computing.scheduler.comm_utils.job_utils import JobRunnerUtils
23-
24-
for type_name in collections.abc.__all__:
25-
setattr(collections, type_name, getattr(collections.abc, type_name))
26-
2722
from fedml.computing.scheduler.comm_utils.constants import SchedulerConstants
2823
from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants
29-
import io
30-
31-
import docker
32-
from ..scheduler_core.compute_cache_manager import ComputeCacheManager
24+
from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache
3325
from ..scheduler_core.compute_utils import ComputeUtils
3426
from ..comm_utils.container_utils import ContainerUtils
35-
3627
from .device_http_inference_protocol import FedMLHttpInference
3728

38-
from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache
29+
for type_name in collections.abc.__all__:
30+
setattr(collections, type_name, getattr(collections.abc, type_name))
3931

4032
no_real_gpu_allocation = None
4133

@@ -432,8 +424,6 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng
432424
if cmd_type == ClientConstants.CMD_TYPE_RUN_DEFAULT_SERVER:
433425
# TODO: Exited Quickly if the container is Exited or Removed
434426
# If the container has exited, return True, means we should exit the logs
435-
# container_name = "{}".format(ClientConstants.FEDML_DEFAULT_SERVER_CONTAINER_NAME_PREFIX) + "__" + \
436-
# security_utils.get_content_hash(model_name)
437427
try:
438428
inference_output_url, model_version, model_metadata, model_config = \
439429
get_model_info(model_name, inference_engine, inference_port, infer_host,
@@ -554,8 +544,6 @@ def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type,
554544

555545
def is_client_inference_container_ready(infer_url_host, inference_http_port, inference_model_name, local_infer_url,
556546
inference_type="default", model_version="", request_input_example=None):
557-
# logging.info(f"Inference type: {inference_type}, infer_url_host {infer_url_host}, \
558-
# inference_http_port: {inference_http_port}, local_infer_url {local_infer_url}")
559547

560548
if inference_type == "default":
561549
default_client_container_ready_url = "http://{}:{}/ready".format("0.0.0.0", inference_http_port)
@@ -631,211 +619,5 @@ def run_http_inference_with_curl_request(inference_url, inference_input_list, in
631619
inference_type=inference_type, engine_type=engine_type, timeout=timeout)
632620

633621

634-
def convert_model_to_onnx(
635-
torch_model, output_path: str, dummy_input_list, input_size: int, input_is_tensor=True
636-
) -> None:
637-
from collections import OrderedDict
638-
import torch
639-
from torch.onnx import TrainingMode
640-
641-
torch.onnx.export(torch_model, # model being run
642-
dummy_input_list if input_is_tensor else tuple(dummy_input_list),
643-
# model input (or a tuple for multiple inputs)
644-
f=output_path, # where to save the model (can be a file or file-like object)
645-
export_params=True, # store the trained parameter weights inside the model file
646-
opset_version=11, # the ONNX version to export the model to
647-
do_constant_folding=False, # whether to execute constant folding for optimization
648-
input_names=["input1", "input2"],
649-
# the model's input names
650-
output_names=['output'], # the model's output names
651-
training=TrainingMode.EVAL,
652-
verbose=True,
653-
dynamic_axes={"input1": {0: "batch_size"},
654-
"input2": {0: "batch_size"},
655-
"output": {0: "batch_size"}}
656-
)
657-
658-
659-
def test_start_triton_server(model_serving_dir):
660-
sudo_prefix = "sudo "
661-
sys_name = platform.system()
662-
if sys_name == "Darwin":
663-
sudo_prefix = ""
664-
gpu_attach_cmd = ""
665-
666-
triton_server_container_name = "{}".format(ClientConstants.FEDML_TRITON_SERVER_CONTAINER_NAME_PREFIX)
667-
triton_server_cmd = "{}docker stop {}; {}docker rm {}; {}docker run --name {} {} -p{}:8000 " \
668-
"-p{}:8001 -p{}:8002 " \
669-
"--shm-size {} " \
670-
"-v {}:/models {} " \
671-
"bash -c \"pip install transformers && tritonserver --strict-model-config=false " \
672-
"--model-control-mode=poll --repository-poll-secs={} " \
673-
"--model-repository=/models\" ".format(sudo_prefix, triton_server_container_name,
674-
sudo_prefix, triton_server_container_name,
675-
sudo_prefix, triton_server_container_name,
676-
gpu_attach_cmd,
677-
ClientConstants.INFERENCE_HTTP_PORT,
678-
ClientConstants.INFERENCE_GRPC_PORT,
679-
8002,
680-
"4096m",
681-
model_serving_dir,
682-
ClientConstants.INFERENCE_SERVER_IMAGE,
683-
ClientConstants.FEDML_MODEL_SERVING_REPO_SCAN_INTERVAL)
684-
logging.info("Run triton inference server: {}".format(triton_server_cmd))
685-
triton_server_process = ClientConstants.exec_console_with_script(triton_server_cmd,
686-
should_capture_stdout=False,
687-
should_capture_stderr=False,
688-
no_sys_out_err=True)
689-
690-
691-
def test_convert_pytorch_model_to_onnx(model_net_file, model_bin_file, model_name, model_in_params):
692-
torch_model = torch.jit.load(model_net_file)
693-
with open(model_bin_file, 'rb') as model_pkl_file:
694-
model_state_dict = pickle.load(model_pkl_file)
695-
torch_model.load_state_dict(model_state_dict)
696-
torch_model.eval()
697-
698-
input_size = model_in_params["input_size"]
699-
input_types = model_in_params["input_types"]
700-
701-
dummy_input_list = []
702-
for index, input_i in enumerate(input_size):
703-
if input_types[index] == "int":
704-
this_input = torch.tensor(torch.randint(0, 1, input_i))
705-
else:
706-
this_input = torch.tensor(torch.zeros(input_i))
707-
dummy_input_list.append(this_input)
708-
709-
onnx_model_dir = os.path.join(ClientConstants.get_model_cache_dir(),
710-
ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME,
711-
model_name, ClientConstants.INFERENCE_MODEL_VERSION)
712-
if not os.path.exists(onnx_model_dir):
713-
os.makedirs(onnx_model_dir, exist_ok=True)
714-
onnx_model_path = os.path.join(onnx_model_dir, "model.onnx")
715-
716-
convert_model_to_onnx(torch_model, onnx_model_path, dummy_input_list, input_size,
717-
input_is_tensor=True)
718-
719-
model_serving_dir = os.path.join(ClientConstants.get_model_cache_dir(),
720-
ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME)
721-
return model_serving_dir
722-
723-
724-
def start_gpu_model_load_process():
725-
from multiprocessing import Process
726-
import time
727-
process = Process(target=load_gpu_model_to_cpu_device)
728-
process.start()
729-
while True:
730-
time.sleep(1)
731-
732-
733-
def load_gpu_model_to_cpu_device():
734-
import pickle
735-
import io
736-
import torch
737-
738-
class CPU_Unpickler(pickle.Unpickler):
739-
def find_class(self, module, name):
740-
if module == 'torch.storage' and name == '_load_from_bytes':
741-
return lambda b: torch.load(io.BytesIO(b), map_location='cpu')
742-
else:
743-
return super().find_class(module, name)
744-
745-
model_file = "/home/fedml/.fedml/fedml-client/fedml/models/theta_rec_auc_81_single_label/theta_rec_auc_81_single_label"
746-
with open(model_file, "rb") as model_pkl_file:
747-
if not torch.cuda.is_available():
748-
model = CPU_Unpickler(model_pkl_file).load()
749-
if model is None:
750-
print("Failed to load gpu model to cpu device")
751-
else:
752-
print("Succeeded to load gpu model to cpu device")
753-
754-
755622
if __name__ == "__main__":
756-
start_gpu_model_load_process()
757-
758-
model_serving_dir = test_convert_pytorch_model_to_onnx("./sample-open-training-model-net",
759-
"./sample-open-training-model",
760-
"rec-model",
761-
{"input_size": [[1, 24], [1, 2]],
762-
"input_types": ["int", "float"]})
763-
764-
test_start_triton_server(model_serving_dir)
765-
766-
# input_data = {"model_version": "v0-Sun Feb 05 12:17:16 GMT 2023",
767-
# "model_name": "model_414_45_open-model-test_v0-Sun-Feb-05-12-17-16-GMT-2023",
768-
# # "data": "file:///Users/alexliang/fedml_data/mnist-image.png",
769-
# "data": "https://raw.githubusercontent.com/niyazed/triton-mnist-example/master/images/sample_image.png",
770-
# "end_point_id": 414, "model_id": 45, "token": "a09a18a14c4c4d89a8d5f9515704c073"}
771-
#
772-
# data_list = list()
773-
# data_list.append(input_data["data"])
774-
# run_http_inference_with_lib_http_api_with_image_data(input_data["model_name"],
775-
# 5001, 1, data_list, "")
776-
#
777-
#
778-
# class LogisticRegression(torch.nn.Module):
779-
# def __init__(self, input_dim, output_dim):
780-
# super(LogisticRegression, self).__init__()
781-
# self.linear = torch.nn.Linear(input_dim, output_dim)
782-
#
783-
# def forward(self, x):
784-
# outputs = torch.sigmoid(self.linear(x))
785-
# return outputs
786-
#
787-
#
788-
# model = LogisticRegression(28 * 28, 10)
789-
# checkpoint = {'model': model}
790-
# model_net_file = "/Users/alexliang/fedml-client/fedml/models/open-model-test/model-net.pt"
791-
# torch.save(checkpoint, model_net_file)
792-
#
793-
# with open("/Users/alexliang/fedml-client/fedml/models/open-model-test/open-model-test", 'rb') as model_pkl_file:
794-
# model_params = pickle.load(model_pkl_file)
795-
# # torch.save(model_params, "/Users/alexliang/fedml-client/fedml/models/open-model-test/a.pt")
796-
# # model = torch.load("/Users/alexliang/fedml-client/fedml/models/open-model-test/a.pt")
797-
# loaded_checkpoint = torch.load(model_net_file)
798-
# loaded_model = loaded_checkpoint["model"]
799-
# loaded_model.load_state_dict(model_params)
800-
# for parameter in loaded_model.parameters():
801-
# parameter.requires_grad = False
802-
# loaded_model.eval()
803-
# input_names = {"x": 0}
804-
# convert_model_to_onnx(loaded_model, "/Users/alexliang/fedml-client/fedml/models/open-model-test/a.onnx",
805-
# input_names, 28 * 28)
806-
807-
# parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
808-
# parser.add_argument("--cf", "-c", help="config file")
809-
# parser.add_argument("--role", "-r", type=str, default="client", help="role")
810-
# parser.add_argument("--model_storage_local_path", "-url", type=str, default="/home/ubuntu",
811-
# help="model storage local path")
812-
# parser.add_argument("--inference_model_name", "-n", type=str, default="fedml-model",
813-
# help="inference model name")
814-
# parser.add_argument("--inference_engine", "-engine", type=str, default="ONNX", help="inference engine")
815-
# parser.add_argument("--inference_http_port", "-http", type=int, default=8000, help="inference http port")
816-
# parser.add_argument("--inference_grpc_port", "-gprc", type=int, default=8001, help="inference grpc port")
817-
# parser.add_argument("--inference_metric_port", "-metric", type=int, default=8002, help="inference metric port")
818-
# parser.add_argument("--inference_use_gpu", "-gpu", type=str, default="gpu", help="inference use gpu")
819-
# parser.add_argument("--inference_memory_size", "-mem", type=str, default="256m", help="inference memory size")
820-
# parser.add_argument("--inference_convertor_image", "-convertor", type=str,
821-
# default=ClientConstants.INFERENCE_CONVERTOR_IMAGE, help="inference convertor image")
822-
# parser.add_argument("--inference_server_image", "-server", type=str,
823-
# default=ClientConstants.INFERENCE_SERVER_IMAGE, help="inference server image")
824-
# args = parser.parse_args()
825-
# args.user = args.user
826-
#
827-
# pip_source_dir = os.path.dirname(__file__)
828-
# __running_model_name, __inference_output_url, __model_version, __model_metadata, __model_config = \
829-
# start_deployment(
830-
# args.model_storage_local_path,
831-
# args.inference_model_name,
832-
# args.inference_engine,
833-
# args.inference_http_port,
834-
# args.inference_grpc_port,
835-
# args.inference_metric_port,
836-
# args.inference_use_gpu,
837-
# args.inference_memory_size,
838-
# args.inference_convertor_image,
839-
# args.inference_server_image)
840-
# print("Model deployment results, running model name: {}, url: {}, model metadata: {}, model config: {}".format(
841-
# __running_model_name, __inference_output_url, __model_metadata, __model_config))
623+
pass

python/fedml/computing/scheduler/model_scheduler/master_job_runner.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,6 @@ def process_deployment_result_message(self, topic=None, payload=None):
453453
time.sleep(3)
454454
self.trigger_completed_event()
455455

456-
457456
def cleanup_runner_process(self, run_id):
458457
ServerConstants.cleanup_run_process(run_id, not_kill_subprocess=True)
459458

python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
294294
json.dumps(result_payload), replica_no=rank + 1)
295295

296296
logging.info(f"Deploy replica {rank + 1} / {prev_rank + 1 + op_num} successfully.")
297-
time.sleep(5)
298297

299-
time.sleep(1)
300298
self.status_reporter.run_id = self.run_id
301299
self.status_reporter.report_client_id_status(
302300
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED,
@@ -348,7 +346,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
348346

349347
# TODO (Raphael) check if this will allow another job to seize the gpu during high concurrency:
350348
try:
351-
JobRunnerUtils.get_instance().release_partial_job_gpu(run_id, self.edge_id, replica_occupied_gpu_ids)
349+
JobRunnerUtils.get_instance().release_partial_job_gpu(
350+
run_id, self.edge_id, replica_occupied_gpu_ids)
352351
except Exception as e:
353352
if op == "rollback":
354353
pass
@@ -395,7 +394,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
395394
JobRunnerUtils.get_instance().release_partial_job_gpu(
396395
run_id, self.edge_id, replica_occupied_gpu_ids)
397396

398-
result_payload = self.send_deployment_results(
397+
self.send_deployment_results(
399398
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,
400399
model_id, model_name, inference_output_url, inference_model_version, inference_port,
401400
inference_engine, model_metadata, model_config)
@@ -496,15 +495,6 @@ def send_deployment_results(self, end_point_name, device_id, model_status,
496495
self.message_center.send_message_json(deployment_results_topic, json.dumps(deployment_results_payload))
497496
return deployment_results_payload
498497

499-
def send_deployment_status(self, end_point_name, device_id,
500-
model_id, model_name, model_version,
501-
model_inference_url, model_status,
502-
inference_port=ClientConstants.MODEL_INFERENCE_DEFAULT_PORT,
503-
replica_no=1, # start from 1
504-
):
505-
# Deprecated
506-
pass
507-
508498
def reset_devices_status(self, edge_id, status):
509499
self.status_reporter.run_id = self.run_id
510500
self.status_reporter.edge_id = edge_id

0 commit comments

Comments
 (0)