Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StagedTimer + TimerManager Impl + Pipeline.__call__ integration #1062

Merged
merged 12 commits into from
Jun 23, 2023
1 change: 0 additions & 1 deletion src/deepsparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)
from .engine import *
from .tasks import *
from .timing import *
from .pipeline import *
from .loggers import *
from .version import __version__, is_release
Expand Down
75 changes: 40 additions & 35 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
validate_identifier,
)
from deepsparse.tasks import SupportedTasks, dynamic_import_task
from deepsparse.timing import InferencePhases, Timer
from deepsparse.utils import InferenceStages, StagedTimer, TimerManager


__all__ = [
Expand Down Expand Up @@ -155,13 +155,16 @@ def __init__(
context: Optional[Context] = None,
executor: Optional[Union[ThreadPoolExecutor, int]] = None,
logger: Optional[Union[BaseLogger, str]] = None,
benchmark: bool = False,
_delay_engine_initialize: bool = False, # internal use only
):
self._benchmark = benchmark
self._model_path_orig = model_path
self._model_path = model_path
self._engine_type = engine_type
self._batch_size = batch_size
self._alias = alias
self.timer_manager = TimerManager(enabled=True, multi=benchmark)
self.context = context
self.logger = (
logger
Expand Down Expand Up @@ -212,21 +215,22 @@ def __init__(
category=MetricCategories.SYSTEM,
)

def __call__(self, *args, **kwargs) -> BaseModel:
def __call__(
self, *args, timer: Optional[StagedTimer] = None, **kwargs
) -> BaseModel:
if "engine_inputs" in kwargs:
raise ValueError(
"invalid kwarg engine_inputs. engine inputs determined "
f"by {self.__class__.__qualname__}.parse_inputs"
)
timer = Timer()

timer.start(InferencePhases.TOTAL_INFERENCE)
timer = timer or self.timer_manager.new_timer()
timer.start(InferenceStages.TOTAL_INFERENCE)

# ------ PREPROCESSING ------
timer.start(InferencePhases.PRE_PROCESS)
timer.start(InferenceStages.PRE_PROCESS)
# parse inputs into input_schema
pipeline_inputs = self.parse_inputs(*args, **kwargs)

self.log(
identifier="pipeline_inputs",
value=pipeline_inputs,
Expand All @@ -244,30 +248,25 @@ def __call__(self, *args, **kwargs) -> BaseModel:
engine_inputs, postprocess_kwargs = engine_inputs
else:
postprocess_kwargs = {}
timer.stop(InferencePhases.PRE_PROCESS)

timer.stop(InferenceStages.PRE_PROCESS)
self.log(
identifier="engine_inputs",
value=engine_inputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.PRE_PROCESS}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.PRE_PROCESS),
category=MetricCategories.SYSTEM,
)

# ------ INFERENCE ------
# split inputs into batches of size `self._batch_size`
timer.start(InferencePhases.ENGINE_FORWARD)
timer.start(InferenceStages.ENGINE_FORWARD)
batches = self.split_engine_inputs(engine_inputs, self._batch_size)

# submit split batches to engine threadpool
batch_outputs = list(self.executor.map(self.engine_forward, batches))

# join together the batches of size `self._batch_size`
engine_outputs = self.join_engine_outputs(batch_outputs)
timer.stop(InferencePhases.ENGINE_FORWARD)
timer.stop(InferenceStages.ENGINE_FORWARD)

self.log(
identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total",
Expand All @@ -278,20 +277,14 @@ def __call__(self, *args, **kwargs) -> BaseModel:
value=len(batch_outputs) * self._batch_size,
category=MetricCategories.SYSTEM,
)

self.log(
identifier="engine_outputs",
value=engine_outputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.ENGINE_FORWARD}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.ENGINE_FORWARD),
category=MetricCategories.SYSTEM,
)

# ------ POSTPROCESSING ------
timer.start(InferencePhases.POST_PROCESS)
timer.start(InferenceStages.POST_PROCESS)
pipeline_outputs = self.process_engine_outputs(
engine_outputs, **postprocess_kwargs
)
Expand All @@ -300,24 +293,16 @@ def __call__(self, *args, **kwargs) -> BaseModel:
f"Outputs of {self.__class__} must be instances of "
f"{self.output_schema} found output of type {type(pipeline_outputs)}"
)
timer.stop(InferencePhases.POST_PROCESS)
timer.stop(InferencePhases.TOTAL_INFERENCE)

timer.stop(InferenceStages.POST_PROCESS)
self.log(
identifier="pipeline_outputs",
value=pipeline_outputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.POST_PROCESS}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.POST_PROCESS),
category=MetricCategories.SYSTEM,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.TOTAL_INFERENCE}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.TOTAL_INFERENCE),
category=MetricCategories.SYSTEM,
)

# ------ INFERENCE FINALIZATION ------
timer.stop(InferenceStages.TOTAL_INFERENCE)
self.log_inference_times(timer)

return pipeline_outputs

Expand Down Expand Up @@ -704,6 +689,15 @@ def engine_type(self) -> str:
"""
return self._engine_type

@property
def benchmark(self) -> bool:
return self._benchmark

@benchmark.setter
def benchmark(self, value: bool):
self._benchmark = value
self.timer_manager.multi = value

def to_config(self) -> "PipelineConfig":
"""
:return: PipelineConfig that can be used to reload this object
Expand Down Expand Up @@ -739,7 +733,7 @@ def log(
self,
identifier: str,
value: Any,
category: str,
category: Union[str, MetricCategories],
):
"""
Pass the logged data to the DeepSparse logger object (if present).
Expand Down Expand Up @@ -791,6 +785,17 @@ def engine_forward(self, engine_inputs: List[numpy.ndarray]) -> List[numpy.ndarr
"""
return self.engine(engine_inputs)

def log_inference_times(self, timer: StagedTimer):
"""
logs stage times in the given timer
"""
for stage, time in timer.times.items():
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{stage}_seconds",
value=time,
category=MetricCategories.SYSTEM,
)

def _initialize_engine(self) -> Union[Engine, ORTEngine]:
engine_type = self.engine_type.lower()

Expand Down
18 changes: 0 additions & 18 deletions src/deepsparse/timing/__init__.py

This file was deleted.

26 changes: 0 additions & 26 deletions src/deepsparse/timing/inference_phases.py

This file was deleted.

119 changes: 0 additions & 119 deletions src/deepsparse/timing/timer.py

This file was deleted.

1 change: 1 addition & 0 deletions src/deepsparse/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
from .cli_helpers import *
from .data import *
from .onnx import *
from .timer import *
Loading