Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.

Commit 79c4ada

Browse files
markurtzBenjamin
andauthored
StagedTimer + TimerManager Impl + Pipeline.__call__ integration (#1062)
* Standardize pipeline timing to enable pipeline benchmarking * code review suggestion 1 * refactor to associate single StagedTimer per inference to avoid race conditions with prop * quality, generalization, cleanup, docstrings * fix top level init file * additional cleanup * update pipeline timer to use contextvars * make timer_manager for a pipeline as read only * add full doc strings * add in unit tests * fixes for quality --------- Co-authored-by: Benjamin <ben@neuralmagic.com>
1 parent ac4062c commit 79c4ada

File tree

10 files changed

+570
-366
lines changed

10 files changed

+570
-366
lines changed

src/deepsparse/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
)
3333
from .engine import *
3434
from .tasks import *
35-
from .timing import *
3635
from .pipeline import *
3736
from .loggers import *
3837
from .version import __version__, is_release

src/deepsparse/pipeline.py

Lines changed: 119 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
validate_identifier,
3737
)
3838
from deepsparse.tasks import SupportedTasks, dynamic_import_task
39-
from deepsparse.timing import InferencePhases, Timer
39+
from deepsparse.utils import InferenceStages, StagedTimer, TimerManager
4040

4141

4242
__all__ = [
@@ -155,13 +155,16 @@ def __init__(
155155
context: Optional[Context] = None,
156156
executor: Optional[Union[ThreadPoolExecutor, int]] = None,
157157
logger: Optional[Union[BaseLogger, str]] = None,
158+
benchmark: bool = False,
158159
_delay_engine_initialize: bool = False, # internal use only
159160
):
161+
self._benchmark = benchmark
160162
self._model_path_orig = model_path
161163
self._model_path = model_path
162164
self._engine_type = engine_type
163165
self._batch_size = batch_size
164166
self._alias = alias
167+
self._timer_manager = TimerManager(enabled=True, multi=benchmark)
165168
self.context = context
166169
self.logger = (
167170
logger
@@ -213,111 +216,89 @@ def __init__(
213216
)
214217

215218
def __call__(self, *args, **kwargs) -> BaseModel:
216-
if "engine_inputs" in kwargs:
217-
raise ValueError(
218-
"invalid kwarg engine_inputs. engine inputs determined "
219-
f"by {self.__class__.__qualname__}.parse_inputs"
220-
)
221-
timer = Timer()
222-
223-
timer.start(InferencePhases.TOTAL_INFERENCE)
224-
225-
# ------ PREPROCESSING ------
226-
timer.start(InferencePhases.PRE_PROCESS)
227-
# parse inputs into input_schema
228-
pipeline_inputs = self.parse_inputs(*args, **kwargs)
229-
230-
self.log(
231-
identifier="pipeline_inputs",
232-
value=pipeline_inputs,
233-
category=MetricCategories.DATA,
234-
)
219+
with self.timer_manager.new_timer_context() as timer:
220+
if "engine_inputs" in kwargs:
221+
raise ValueError(
222+
"invalid kwarg engine_inputs. engine inputs determined "
223+
f"by {self.__class__.__qualname__}.parse_inputs"
224+
)
235225

236-
if not isinstance(pipeline_inputs, self.input_schema):
237-
raise RuntimeError(
238-
f"Unable to parse {self.__class__} inputs into a "
239-
f"{self.input_schema} object. Inputs parsed to {type(pipeline_inputs)}"
226+
# ------ PREPROCESSING ------
227+
timer.start(InferenceStages.PRE_PROCESS)
228+
# parse inputs into input_schema
229+
pipeline_inputs = self.parse_inputs(*args, **kwargs)
230+
self.log(
231+
identifier="pipeline_inputs",
232+
value=pipeline_inputs,
233+
category=MetricCategories.DATA,
240234
)
241-
# batch size of the inputs may be `> self._batch_size` at this point
242-
engine_inputs: List[numpy.ndarray] = self.process_inputs(pipeline_inputs)
243-
if isinstance(engine_inputs, tuple):
244-
engine_inputs, postprocess_kwargs = engine_inputs
245-
else:
246-
postprocess_kwargs = {}
247-
timer.stop(InferencePhases.PRE_PROCESS)
248235

249-
self.log(
250-
identifier="engine_inputs",
251-
value=engine_inputs,
252-
category=MetricCategories.DATA,
253-
)
254-
self.log(
255-
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.PRE_PROCESS}_seconds", # noqa E501
256-
value=timer.time_delta(InferencePhases.PRE_PROCESS),
257-
category=MetricCategories.SYSTEM,
258-
)
259-
260-
# ------ INFERENCE ------
261-
# split inputs into batches of size `self._batch_size`
262-
timer.start(InferencePhases.ENGINE_FORWARD)
263-
batches = self.split_engine_inputs(engine_inputs, self._batch_size)
264-
265-
# submit split batches to engine threadpool
266-
batch_outputs = list(self.executor.map(self.engine_forward, batches))
267-
268-
# join together the batches of size `self._batch_size`
269-
engine_outputs = self.join_engine_outputs(batch_outputs)
270-
timer.stop(InferencePhases.ENGINE_FORWARD)
271-
272-
self.log(
273-
identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total",
274-
# to get the batch size of the inputs, we need to look
275-
# to multiply the engine batch size (self._batch_size)
276-
# by the number of batches processed by the engine during
277-
# a single inference call
278-
value=len(batch_outputs) * self._batch_size,
279-
category=MetricCategories.SYSTEM,
280-
)
236+
if not isinstance(pipeline_inputs, self.input_schema):
237+
raise RuntimeError(
238+
f"Unable to parse {self.__class__} inputs into a "
239+
f"{self.input_schema} object. "
240+
f"Inputs parsed to {type(pipeline_inputs)}"
241+
)
242+
# batch size of the inputs may be `> self._batch_size` at this point
243+
engine_inputs: List[numpy.ndarray] = self.process_inputs(pipeline_inputs)
244+
if isinstance(engine_inputs, tuple):
245+
engine_inputs, postprocess_kwargs = engine_inputs
246+
else:
247+
postprocess_kwargs = {}
248+
249+
timer.stop(InferenceStages.PRE_PROCESS)
250+
self.log(
251+
identifier="engine_inputs",
252+
value=engine_inputs,
253+
category=MetricCategories.DATA,
254+
)
281255

282-
self.log(
283-
identifier="engine_outputs",
284-
value=engine_outputs,
285-
category=MetricCategories.DATA,
286-
)
287-
self.log(
288-
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.ENGINE_FORWARD}_seconds", # noqa E501
289-
value=timer.time_delta(InferencePhases.ENGINE_FORWARD),
290-
category=MetricCategories.SYSTEM,
291-
)
256+
# ------ INFERENCE ------
257+
# split inputs into batches of size `self._batch_size`
258+
timer.start(InferenceStages.ENGINE_FORWARD)
259+
batches = self.split_engine_inputs(engine_inputs, self._batch_size)
260+
261+
# submit split batches to engine threadpool
262+
batch_outputs = list(self.executor.map(self.engine_forward, batches))
263+
264+
# join together the batches of size `self._batch_size`
265+
engine_outputs = self.join_engine_outputs(batch_outputs)
266+
timer.stop(InferenceStages.ENGINE_FORWARD)
267+
268+
self.log(
269+
identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total",
270+
# to get the batch size of the inputs, we need to look
271+
# to multiply the engine batch size (self._batch_size)
272+
# by the number of batches processed by the engine during
273+
# a single inference call
274+
value=len(batch_outputs) * self._batch_size,
275+
category=MetricCategories.SYSTEM,
276+
)
277+
self.log(
278+
identifier="engine_outputs",
279+
value=engine_outputs,
280+
category=MetricCategories.DATA,
281+
)
292282

293-
# ------ POSTPROCESSING ------
294-
timer.start(InferencePhases.POST_PROCESS)
295-
pipeline_outputs = self.process_engine_outputs(
296-
engine_outputs, **postprocess_kwargs
297-
)
298-
if not isinstance(pipeline_outputs, self.output_schema):
299-
raise ValueError(
300-
f"Outputs of {self.__class__} must be instances of "
301-
f"{self.output_schema} found output of type {type(pipeline_outputs)}"
283+
# ------ POSTPROCESSING ------
284+
timer.start(InferenceStages.POST_PROCESS)
285+
pipeline_outputs = self.process_engine_outputs(
286+
engine_outputs, **postprocess_kwargs
287+
)
288+
if not isinstance(pipeline_outputs, self.output_schema):
289+
raise ValueError(
290+
f"Outputs of {self.__class__} must be instances of "
291+
f"{self.output_schema} found output of type "
292+
f"{type(pipeline_outputs)}"
293+
)
294+
timer.stop(InferenceStages.POST_PROCESS)
295+
self.log(
296+
identifier="pipeline_outputs",
297+
value=pipeline_outputs,
298+
category=MetricCategories.DATA,
302299
)
303-
timer.stop(InferencePhases.POST_PROCESS)
304-
timer.stop(InferencePhases.TOTAL_INFERENCE)
305300

306-
self.log(
307-
identifier="pipeline_outputs",
308-
value=pipeline_outputs,
309-
category=MetricCategories.DATA,
310-
)
311-
self.log(
312-
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.POST_PROCESS}_seconds", # noqa E501
313-
value=timer.time_delta(InferencePhases.POST_PROCESS),
314-
category=MetricCategories.SYSTEM,
315-
)
316-
self.log(
317-
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.TOTAL_INFERENCE}_seconds", # noqa E501
318-
value=timer.time_delta(InferencePhases.TOTAL_INFERENCE),
319-
category=MetricCategories.SYSTEM,
320-
)
301+
self.log_inference_times(timer)
321302

322303
return pipeline_outputs
323304

@@ -704,6 +685,31 @@ def engine_type(self) -> str:
704685
"""
705686
return self._engine_type
706687

688+
@property
689+
def timer_manager(self) -> TimerManager:
690+
return self._timer_manager
691+
692+
@property
693+
def current_timer(self) -> Optional[StagedTimer]:
694+
"""
695+
:return: current timer for the pipeline, if any
696+
"""
697+
timer = self.timer_manager.current
698+
699+
if timer is None:
700+
timer = self.timer_manager.latest
701+
702+
return timer
703+
704+
@property
705+
def benchmark(self) -> bool:
706+
return self._benchmark
707+
708+
@benchmark.setter
709+
def benchmark(self, value: bool):
710+
self._benchmark = value
711+
self.timer_manager.multi = value
712+
707713
def to_config(self) -> "PipelineConfig":
708714
"""
709715
:return: PipelineConfig that can be used to reload this object
@@ -739,7 +745,7 @@ def log(
739745
self,
740746
identifier: str,
741747
value: Any,
742-
category: str,
748+
category: Union[str, MetricCategories],
743749
):
744750
"""
745751
Pass the logged data to the DeepSparse logger object (if present).
@@ -791,6 +797,19 @@ def engine_forward(self, engine_inputs: List[numpy.ndarray]) -> List[numpy.ndarr
791797
"""
792798
return self.engine(engine_inputs)
793799

800+
def log_inference_times(self, timer: StagedTimer):
801+
"""
802+
logs stage times in the given timer
803+
804+
:param timer: timer to log
805+
"""
806+
for stage, time in timer.times.items():
807+
self.log(
808+
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{stage}_seconds",
809+
value=time,
810+
category=MetricCategories.SYSTEM,
811+
)
812+
794813
def _initialize_engine(self) -> Union[Engine, ORTEngine]:
795814
engine_type = self.engine_type.lower()
796815

src/deepsparse/timing/__init__.py

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/deepsparse/timing/inference_phases.py

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)