Skip to content

support infer pipeline #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
650 changes: 0 additions & 650 deletions deploy/infer_pipeline/README.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
History: NA
"""

from src.data_type.message_data import StopSign, ProfilingData
from src.data_type.process_data import ProcessData, StopData
from .message_data import StopSign, ProfilingData
from .process_data import ProcessData, StopData
14 changes: 14 additions & 0 deletions deploy/infer_pipeline/framework/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved.
Description: framework package manager
Author: MindX SDK
Create: 2022
History: NA
"""

from .module_base import ModuleBase
from .module_data_type import ModuleInitArgs, ModuleOutputInfo, ModulesInfo, ModuleDesc, \
ModuleConnectDesc, ConnectType, SupportedTaskOrder, InferModelComb
from .module_manager import ModuleManager
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
from ctypes import c_longdouble
from multiprocessing import Manager

from src.data_type import ProfilingData
from src.framework.module_data_type import ModuleInitArgs
from src.utils import log
from .module_data_type import ModuleInitArgs

from deploy.infer_pipeline.data_type import ProfilingData
from deploy.infer_pipeline.utils import log


class ModuleBase(object):
def __init__(self, config_path, msg_queue):
self.config_path = config_path
def __init__(self, args, msg_queue):
self.args = args
self.pipeline_name = ''
self.module_name = ''
self.without_input_queue = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@
from enum import Enum


class InferModelComb(Enum):
DET = 0 # Detection Model
REC = 1 # Recognition Model
CLS = 2 # Classifier Model
DET_REC = 3 # Detection, Classifier And Detection Model
DET_CLS_REC = 4 # Detection, Recognition Model


SupportedTaskOrder = {
InferModelComb.DET: [InferModelComb.DET],
InferModelComb.REC: [InferModelComb.REC],
InferModelComb.DET_REC: [InferModelComb.DET, InferModelComb.REC],
InferModelComb.DET_CLS_REC: [InferModelComb.DET, InferModelComb.CLS, InferModelComb.REC]
}

class ConnectType(Enum):
MODULE_CONNECT_ONE = 0
MODULE_CONNECT_CHANNEL = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@
from collections import defaultdict, namedtuple
from multiprocessing import Queue, Process

from src.framework.module_data_type import ModulesInfo, ModuleInitArgs
from src.processors import processor_initiator
from src.utils import log
from .module_data_type import ModulesInfo, ModuleInitArgs
from deploy.infer_pipeline.processors import processor_initiator
from deploy.infer_pipeline.utils import log

OutputRegisterInfo = namedtuple('OutputRegisterInfo', ['pipeline_name', 'module_send', 'module_recv'])


class ModuleManager:
MODULE_QUEUE_MAX_SIZE = 16

def __init__(self, msg_queue: Queue, task_queue: Queue, config_path: str = None, infer_res_save_path: str = None):
def __init__(self, msg_queue: Queue, task_queue: Queue, args):
self.device_id = 0
self.pipeline_map = defaultdict(lambda: defaultdict(ModulesInfo))
self.msg_queue = msg_queue
self.stop_manager = Queue(1)
self.stop_manager.put('-')
self.config_path = config_path
self.args = args
self.pipeline_name = ''
self.process_list = []
self.queue_list = []
self.pipeline_queue_map = defaultdict(lambda: defaultdict(list))
self.task_queue = task_queue
self.infer_res_save_path = infer_res_save_path
self.infer_res_save_path = args.res_save_dir

@staticmethod
def stop_module(module):
Expand All @@ -60,7 +60,7 @@ def register_modules(self, pipeline_name: str, module_desc_list: list,
module_count = default_count if module_desc.module_count == -1 else module_desc.module_count
module_info = ModulesInfo()
for instance_id in range(module_count):
module_instance = processor_initiator(module_desc.module_name)(self.config_path, self.msg_queue)
module_instance = processor_initiator(module_desc.module_name)(self.args, self.msg_queue)
self.init_module_instance(module_instance, instance_id,
pipeline_name,
module_desc.module_name)
Expand Down
90 changes: 90 additions & 0 deletions deploy/infer_pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import time
from collections import defaultdict
from multiprocessing import Process, Queue

from deploy.infer_pipeline.data_type import StopSign
from deploy.infer_pipeline.framework import ModuleDesc, ModuleConnectDesc, ModuleManager, SupportedTaskOrder
from deploy.infer_pipeline.processors import MODEL_DICT
from deploy.infer_pipeline.utils import log, profiling, safe_div, TASK_QUEUE_SIZE


def image_sender(images_path, send_queue):
if os.path.isdir(images_path):
input_image_list = [os.path.join(images_path, path) for path in os.listdir(images_path)]
for image_path in input_image_list:
send_queue.put(image_path, block=True)
else:
send_queue.put(images_path, block=True)


def build_pipeline_kernel(args, input_queue):
task_type = args.task_type
parallel_num = args.parallel_num
module_desc_list = [ModuleDesc('HandoutProcess', 1), ModuleDesc('DecodeProcess', parallel_num), ]

module_order = SupportedTaskOrder[task_type]

for model_name in module_order:
model_name = model_name
for name, count in MODEL_DICT.get(model_name, []):
module_desc_list.append(ModuleDesc(name, count * parallel_num))

module_desc_list.append(ModuleDesc('CollectProcess', 1))
module_connect_desc_list = []
for i in range(len(module_desc_list) - 1):
module_connect_desc_list.append(ModuleConnectDesc(module_desc_list[i].module_name,
module_desc_list[i + 1].module_name))

module_size = sum(desc.module_count for desc in module_desc_list)
log.info(f'module_size: {module_size}')
msg_queue = Queue(module_size)

manager = ModuleManager(msg_queue, input_queue, args)
manager.register_modules(str(os.getpid()), module_desc_list, 1)
manager.register_module_connects(str(os.getpid()), module_connect_desc_list)

# start the pipeline, init start
manager.run_pipeline()

# waiting for task receive
while not msg_queue.full():
continue

start_time = time.time()
# release all init sign
for _ in range(module_size):
msg_queue.get()

# release the stop sign, infer start
manager.stop_manager.get(block=False)

manager.deinit_pipeline_module()

cost_time = time.time() - start_time

# collect the profiling data
profiling_data = defaultdict(lambda: [0, 0])
image_total = 0
for _ in range(module_size):
msg_info = msg_queue.get()
profiling_data[msg_info.module_name][0] += msg_info.process_cost_time
profiling_data[msg_info.module_name][1] += msg_info.send_cost_time
if msg_info.module_name != -1:
image_total = msg_info.image_total

profiling(profiling_data, image_total)

log.info(f'total cost {cost_time:.2f}s, FPS: {safe_div(image_total, cost_time):.2f}')
msg_queue.close()
msg_queue.join_thread()


def build_pipeline(args):
task_queue = Queue(TASK_QUEUE_SIZE)
process = Process(target=build_pipeline_kernel, args=(args, task_queue))
process.start()
image_sender(images_path=args.input_images_dir, send_queue=task_queue)
task_queue.put(StopSign(), block=True)
process.join()
process.close()
41 changes: 41 additions & 0 deletions deploy/infer_pipeline/processors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved.
Description:
Author: MindX SDK
Create: 2022
History: NA
"""

from sys import modules

from deploy.infer_pipeline.utils import log
from deploy.infer_pipeline.framework import InferModelComb

from .classification import CLSPreProcess, CLSInferProcess
from .common import HandoutProcess, CollectProcess, DecodeProcess
from .detection import DetPreProcess, DetInferProcess, DetPostProcess
from .recognition import RecPreProcess, RecInferProcess, RecPostProcess


DET_DESC = [('DetPreProcess', 1), ('DetInferProcess', 1), ('DetPostProcess', 1)]
REC_DESC = [('RecPreProcess', 1), ('RecInferProcess', 1), ('RecPostProcess', 1)]
CLS_DESC = [('CLSPreProcess', 1), ('CLSInferProcess', 1)]

MODEL_DICT = {
InferModelComb.DET: DET_DESC,
InferModelComb.REC: REC_DESC,
InferModelComb.CLS: CLS_DESC
}


def processor_initiator(classname):
try:
processor = getattr(modules.get(__name__), classname)
except AttributeError as error:
log.error("%s doesn't exist.", classname)
raise error
if isinstance(processor, type):
return processor
raise TypeError("%s doesn't exist.", classname)
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
History: NA
"""

from src.processors.classification.cls.cls_infer_process import CLSInferProcess
from src.processors.classification.cls.cls_pre_process import CLSPreProcess
from .cls_infer_process import CLSInferProcess
from .cls_pre_process import CLSPreProcess
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,22 @@
import numpy as np
from mindx.sdk import base, Tensor

from src.framework import ModuleBase
from src.utils import safe_load_yaml, get_device_id, check_valid_file
from deploy.infer_pipeline.framework import ModuleBase
from deploy.infer_pipeline.utils import check_valid_file


class CLSInferProcess(ModuleBase):
def __init__(self, config_path, msg_queue):
super(CLSInferProcess, self).__init__(config_path, msg_queue)
def __init__(self, args, msg_queue):
super(CLSInferProcess, self).__init__(args, msg_queue)
self.model = None
self.static_method = True
self.thresh = 0.9

def init_self_args(self):
device_id = self.args.device_id
model_path = self.args.cls_model_path

base.mx_init()
config = safe_load_yaml(self.config_path)
cls_config = config.get('cls', {})
if not cls_config:
raise ValueError(f'cannot find the cls related config in config file')
device_id = get_device_id(config, 'cls')
device_id = device_id if isinstance(device_id, int) else device_id[self.instance_id % len(device_id)]
model_path = cls_config.get('model_path', '')
if model_path and os.path.isfile(model_path):
check_valid_file(model_path)
self.model = base.model(model_path, device_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
import numpy as np
from mindx.sdk import base

from src.data_type.process_data import ProcessData
from src.framework import ModuleBase
from src.utils import get_batch_list_greedy, get_hw_of_img, safe_div, padding_with_cv, normalize, \
to_chw_image, expand, padding_batch, bgr_to_gray, safe_load_yaml, get_batch_from_gear, get_device_id, \
from deploy.infer_pipeline.data_type.process_data import ProcessData
from deploy.infer_pipeline.framework import ModuleBase
from deploy.infer_pipeline.utils import get_batch_list_greedy, get_hw_of_img, safe_div, padding_with_cv, normalize, \
to_chw_image, expand, padding_batch, bgr_to_gray, get_shape_info, \
check_valid_file, NORMALIZE_MEAN, NORMALIZE_STD, NORMALIZE_SCALE


class CLSPreProcess(ModuleBase):
def __init__(self, config_path, msg_queue):
super(CLSPreProcess, self).__init__(config_path, msg_queue)
def __init__(self, args, msg_queue):
super(CLSPreProcess, self).__init__(args, msg_queue)
self.without_input_queue = False
self.gear_list = []
self.batchsize_list = []
Expand All @@ -36,25 +36,24 @@ def __init__(self, config_path, msg_queue):
self.mean = np.array(NORMALIZE_MEAN).astype(np.float32)

def init_self_args(self):
device_id = self.args.device_id
model_path = self.args.cls_model_path

base.mx_init()
config = safe_load_yaml(self.config_path)
cls_config = config.get('cls', {})
if not cls_config:
raise ValueError(f'cannot find the cls related config in config file')
device_id = get_device_id(config, 'cls')
device_id = device_id if isinstance(device_id, int) else device_id[0]
model_path = cls_config.get('model_path', '')
if model_path and os.path.isfile(model_path):
check_valid_file(model_path)
model = base.model(model_path, device_id)
else:
raise FileNotFoundError('cls model path must be a file')
batchsize_list, channel, height, width = get_batch_from_gear(model.model_gear())
self.batchsize_list = batchsize_list
self.model_channel = channel
self.model_height = height
self.model_width = width

desc, shape_info = get_shape_info(model.input_shape(0), model.model_gear())
del model

if desc != "dynamic_batch_size":
raise ValueError("model input shape must be dynamic batch_size with gear")

self.batchsize_list = list(shape_info[0])
_, self.model_channel, self.model_height, self.model_width = shape_info
self.batchsize_list.sort()
super().init_self_args()

Expand Down
13 changes: 13 additions & 0 deletions deploy/infer_pipeline/processors/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved.
Description:
Author: MindX SDK
Create: 2022
History: NA
"""

from .collect_process import CollectProcess
from .decode_process import DecodeProcess
from .handout_process import HandoutProcess
Loading