diff --git a/development/stream_interface/workflows_demo.py b/development/stream_interface/workflows_demo.py index 583002061..8ea822b79 100644 --- a/development/stream_interface/workflows_demo.py +++ b/development/stream_interface/workflows_demo.py @@ -1,4 +1,3 @@ -import os from threading import Thread from typing import List, Optional @@ -7,12 +6,13 @@ from inference import InferencePipeline from inference.core.interfaces.camera.entities import VideoFrame +from inference.core.interfaces.camera.video_source import BufferFillingStrategy, BufferConsumptionStrategy from inference.core.interfaces.stream.watchdog import PipelineWatchDog, BasePipelineWatchDog from inference.core.utils.drawing import create_tiles STOP = False ANNOTATOR = sv.BoundingBoxAnnotator() -TARGET_PROJECT = os.environ["TARGET_PROJECT"] +fps_monitor = sv.FPSMonitor() def main() -> None: @@ -32,36 +32,25 @@ def main() -> None: "confidence": 0.5, }, { - "type": "RoboflowDatasetUpload", - "name": "roboflow_dataset_upload", - "images": "$inputs.image", + "type": "roboflow_core/bounding_box_visualization@v1", + "name": "bbox_visualiser", "predictions": "$steps.step_1.predictions", - "target_project": TARGET_PROJECT, - "usage_quota_name": "upload_quota_XXX", - "fire_and_forget": True, - }, - { - "type": "RoboflowCustomMetadata", - "name": "metadata_upload", - "predictions": "$steps.step_1.predictions", - "field_name": "dummy", - "field_value": "dummy", - "fire_and_forget": True, - }, + "image": "$inputs.image" + } ], "outputs": [ {"type": "JsonField", "name": "predictions", "selector": "$steps.step_1.predictions"}, - {"type": "JsonField", "name": "upload_error", "selector": "$steps.roboflow_dataset_upload.error_status"}, - {"type": "JsonField", "name": "upload_message", "selector": "$steps.roboflow_dataset_upload.message"}, - {"type": "JsonField", "name": "metadata_error", "selector": "$steps.metadata_upload.error_status"}, - {"type": "JsonField", "name": "metadata_message", "selector": "$steps.metadata_upload.message"}, + {"type": "JsonField", "name": "preview", "selector": "$steps.bbox_visualiser.image"}, + ], } pipeline = InferencePipeline.init_with_workflow( - video_reference=[os.environ["VIDEO_REFERENCE"]] * 2, + video_reference=["rtsp://localhost:8554/live.stream"], workflow_specification=workflow_specification, watchdog=watchdog, on_prediction=workflows_sink, + source_buffer_filling_strategy=BufferFillingStrategy.DROP_OLDEST, + source_buffer_consumption_strategy=BufferConsumptionStrategy.EAGER, ) control_thread = Thread(target=command_thread, args=(pipeline, watchdog)) control_thread.start() @@ -91,6 +80,10 @@ def workflows_sink( predictions: List[Optional[dict]], video_frames: List[Optional[VideoFrame]], ) -> None: + fps_monitor.tick() + if not isinstance(predictions, list): + predictions = [predictions] + video_frames = [video_frames] images_to_show = [] for prediction, frame in zip(predictions, video_frames): if prediction is None or frame is None: @@ -98,10 +91,14 @@ def workflows_sink( detections: sv.Detections = prediction["predictions"] visualised = ANNOTATOR.annotate(frame.image.copy(), detections) images_to_show.append(visualised) - print(prediction["upload_message"], prediction["metadata_message"]) tiles = create_tiles(images=images_to_show) cv2.imshow(f"Predictions", tiles) cv2.waitKey(1) + if hasattr(fps_monitor, "fps"): + fps_value = fps_monitor.fps + else: + fps_value = fps_monitor() + print(f"FPS: {fps_value}") if __name__ == '__main__': diff --git a/docker/config/cpu_http.py b/docker/config/cpu_http.py index 862dce75e..bd8302239 100644 --- a/docker/config/cpu_http.py +++ b/docker/config/cpu_http.py @@ -1,5 +1,8 @@ +from multiprocessing import Process + from inference.core.cache import cache from inference.core.interfaces.http.http_api import HttpInterface +from inference.core.interfaces.stream_manager.manager_app.app import start from inference.core.managers.active_learning import ActiveLearningManager, BackgroundTaskActiveLearningManager from inference.core.managers.base import ModelManager from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache @@ -9,7 +12,7 @@ import os from prometheus_fastapi_instrumentator import Instrumentator -from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA +from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA, ENABLE_STREAM_API from inference.models.utils import ROBOFLOW_MODEL_TYPES model_registry = RoboflowModelRegistry(ROBOFLOW_MODEL_TYPES) @@ -38,3 +41,9 @@ @app.on_event("startup") async def _startup(): instrumentor.expose(app) + +if ENABLE_STREAM_API: + stream_manager_process = Process( + target=start, + ) + stream_manager_process.start() diff --git a/docker/config/gpu_http.py b/docker/config/gpu_http.py index ceec15720..bea5b6833 100644 --- a/docker/config/gpu_http.py +++ b/docker/config/gpu_http.py @@ -1,9 +1,12 @@ import os +from multiprocessing import Process + from prometheus_fastapi_instrumentator import Instrumentator from inference.core.cache import cache -from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA +from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA, ENABLE_STREAM_API from inference.core.interfaces.http.http_api import HttpInterface +from inference.core.interfaces.stream_manager.manager_app.app import start from inference.core.managers.active_learning import ActiveLearningManager, BackgroundTaskActiveLearningManager from inference.core.managers.base import ModelManager from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache @@ -41,3 +44,9 @@ @app.on_event("startup") async def _startup(): instrumentor.expose(app) + +if ENABLE_STREAM_API: + stream_manager_process = Process( + target=start, + ) + stream_manager_process.start() diff --git a/docker/dockerfiles/Dockerfile.onnx.cpu b/docker/dockerfiles/Dockerfile.onnx.cpu index 23a88e297..5a72e494a 100644 --- a/docker/dockerfiles/Dockerfile.onnx.cpu +++ b/docker/dockerfiles/Dockerfile.onnx.cpu @@ -73,5 +73,6 @@ ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1 ENV API_LOGGING_ENABLED=True ENV CORE_MODEL_SAM2_ENABLED=True ENV CORE_MODEL_OWLV2_ENABLED=True +ENV ENABLE_STREAM_API=True ENTRYPOINT uvicorn cpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT \ No newline at end of file diff --git a/docker/dockerfiles/Dockerfile.onnx.gpu b/docker/dockerfiles/Dockerfile.onnx.gpu index 83bede9f2..8ebf3ed1c 100644 --- a/docker/dockerfiles/Dockerfile.onnx.gpu +++ b/docker/dockerfiles/Dockerfile.onnx.gpu @@ -78,5 +78,6 @@ ENV API_LOGGING_ENABLED=True ENV LMM_ENABLED=True ENV CORE_MODEL_SAM2_ENABLED=True ENV CORE_MODEL_OWLV2_ENABLED=True +ENV ENABLE_STREAM_API=True ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT \ No newline at end of file diff --git a/docker/dockerfiles/Dockerfile.onnx.jetson.4.5.0 b/docker/dockerfiles/Dockerfile.onnx.jetson.4.5.0 index 3b6fa882c..309390f07 100644 --- a/docker/dockerfiles/Dockerfile.onnx.jetson.4.5.0 +++ b/docker/dockerfiles/Dockerfile.onnx.jetson.4.5.0 @@ -70,5 +70,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1 ENV API_LOGGING_ENABLED=True ENV CORE_MODEL_TROCR_ENABLED=false +ENV RUNS_ON_JETSON=True +ENV ENABLE_STREAM_API=True ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT \ No newline at end of file diff --git a/docker/dockerfiles/Dockerfile.onnx.jetson.4.6.1 b/docker/dockerfiles/Dockerfile.onnx.jetson.4.6.1 index a83a1ca2d..848c9083e 100644 --- a/docker/dockerfiles/Dockerfile.onnx.jetson.4.6.1 +++ b/docker/dockerfiles/Dockerfile.onnx.jetson.4.6.1 @@ -85,5 +85,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1 ENV API_LOGGING_ENABLED=True ENV CORE_MODEL_TROCR_ENABLED=false +ENV RUNS_ON_JETSON=True +ENV ENABLE_STREAM_API=True ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT \ No newline at end of file diff --git a/docker/dockerfiles/Dockerfile.onnx.jetson.5.1.1 b/docker/dockerfiles/Dockerfile.onnx.jetson.5.1.1 index 877b30882..3b2677ac5 100644 --- a/docker/dockerfiles/Dockerfile.onnx.jetson.5.1.1 +++ b/docker/dockerfiles/Dockerfile.onnx.jetson.5.1.1 @@ -81,5 +81,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1 ENV API_LOGGING_ENABLED=True ENV CORE_MODEL_TROCR_ENABLED=false +ENV RUNS_ON_JETSON=True +ENV ENABLE_STREAM_API=True ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT \ No newline at end of file diff --git a/docs/workflows/video_processing/overview.md b/docs/workflows/video_processing/overview.md new file mode 100644 index 000000000..20d46fa2d --- /dev/null +++ b/docs/workflows/video_processing/overview.md @@ -0,0 +1,142 @@ +# Video Processing with Workflows + +We've begun our journey into video processing using Workflows. Over time, we've expanded the number of +video-specific blocks (e.g., the ByteTracker block) and continue to dedicate efforts toward improving +their performance and robustness. The current state of this work is as follows: + +* We've introduced the `WorkflowVideoMetadata` input to store metadata related to video frames, +including FPS, timestamp, video source identifier, and file/stream flags. While this may not be the final approach +for handling video metadata, it allows us to build stateful video-processing blocks at this stage. +If your Workflow includes any blocks requiring input of kind `video_metadata`, you must define this input in +your Workflow. The metadata functions as a batch-oriented parameter, treated by the Execution Engine in the same +way as `WorkflowImage`. + +* The `InferencePipeline` supports +[video processing with Workflows](/using_inference/inference_pipeline/#inferencepipeline-and-roboflow-workflows) +by automatically injecting `WorkflowVideoMetadata` into the `video_metadata` field. This allows you to seamlessly run +your Workflow using the `InferencePipeline` within the `inference` Python package. + +* In the `0.21.0` release, we've initiated efforts to enable video processing management via the `inference` server API. +This means that eventually, no custom scripts will be required to process video using Workflows and `InferencePipeline`. +You'll simply call an endpoint, specify the video source and the workflow, and the server will handle the rest—allowing +you to focus on consuming the results. + + +## Video management API - comments and status update + +This is experimental feature, breaking changes may be introduced over time. There is a list of +[known issues](https://github.com/roboflow/inference/issues?q=is%3Aopen+is%3Aissue+label%3A%22Video+Management+API+issues%22). +Please visit the page to raise new issues or comment on existing ones. + +### Release `0.21.0` + +* Added basic endpoints to `list`, `start`, `pause`, `resume`, `terminate` and `consume` results of `InferencePipelines` +running under control of `inference` server. Endpoints are enabled in `inference` server docker images for CPU, GPU +and Jetson devices. Running inference server there would let you call +[`http://127.0.0.1:9001/docs`](http://127.0.0.1:9001/docs) to retrieve OpenAPI schemas for endpoints. + +* Added HTTP client for new endpoints into `InferenceHTTPClient` from `inference_sdk`. Here you may find examples on +how to use the client and API to start processing videos today: + + +!!! Note + + Package in version `inference~=0.21.0` used in examlpe, due to experimental nature of the feature, the code + may evolve over time. + +```python +from inference_sdk import InferenceHTTPClient + +client = InferenceHTTPClient( + api_url="http://192.168.0.115:9001", + api_key="" +) + +# to list active pipelines +client.list_inference_pipelines() + +# start processing - single stream +client.start_inference_pipeline_with_workflow( + video_reference=["rtsp://192.168.0.73:8554/live0.stream"], + workspace_name="", + workflow_id="", + results_buffer_size=5, # results are consumed from in-memory buffer - optionally you can control its size +) + +# start processing - one RTSP stream and one camera +# USB camera cannot be passed easily to docker running on MacBook, but on Jetson devices it works :) + +client.start_inference_pipeline_with_workflow( + video_reference=["rtsp://192.168.0.73:8554/live0.stream", 0], + workspace_name="", + workflow_id="", + batch_collection_timeout=0.05, # for consumption of multiple video sources it is ADVISED to + # set batch collection timeout (defined as fraction of seconds - 0.05 = 50ms) +) + +# start_inference_pipeline_with_workflow(...) will provide you pipeline_id which may be used to: + +# * get pipeline status +client.get_inference_pipeline_status( + pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1", +) + +# * pause pipeline +client.pause_inference_pipeline( + pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1", +) + +# * resume pipeline +client.resume_inference_pipeline( + pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1", +) + +# * terminate pipeline +client.terminate_inference_pipeline( + pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1", +) + +# * consume pipeline results +client.consume_inference_pipeline_result( + pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1", + excluded_fields=["workflow_output_field_to_exclude"] # this is optional + # if you wanted to get rid of some outputs to save bandwidth - feel free to discard them +) +``` + + +The client presented above, may be used preview workflow outputs in a very **naive** way. Let's assume +that the Workflow you defined runs object-detection model and renders it's output using Workflows visualisation +blocks registering output image in `preview` field. You can use the following script to pool and display processed video +frames: + +```python +import cv2 +from inference_sdk import InferenceHTTPClient +from inference.core.utils.image_utils import load_image + + +client = InferenceHTTPClient( + api_url=f"http://127.0.0.1:9001", + api_key="", +) + +while True: + result = client.consume_inference_pipeline_result(pipeline_id="") + if not result["outputs"] or not result["outputs"][0]: + # "outputs" key contains list of workflow results - why list? InferencePipeline can + # run on multiple video sources at the same time - each "round" it attempts to + # grab single frame from all sources and run through Workflows Execution Engine + # * when sources are inactive that may not be possible, hence empty list can be returned + # * when one of the source do not provide frame (for instance due to batch collection timeout) + # outputs list may contain `None` values! + continue + # let's assume single source + source_result = result["outputs"][0] + image, _ = load_image(source_result["preview"]) # "preview" is the name of workflow output with image + cv2.imshow("frame", image) + cv2.waitKey(1) +``` + + + diff --git a/inference/core/env.py b/inference/core/env.py index ec45655c3..f91edf096 100644 --- a/inference/core/env.py +++ b/inference/core/env.py @@ -424,3 +424,7 @@ DEDICATED_DEPLOYMENT_WORKSPACE_URL = os.environ.get( "DEDICATED_DEPLOYMENT_WORKSPACE_URL", None ) + +ENABLE_STREAM_API = str2bool(os.getenv("ENABLE_STREAM_API", "False")) + +RUNS_ON_JETSON = str2bool(os.getenv("RUNS_ON_JETSON", "False")) diff --git a/inference/core/interfaces/camera/video_source.py b/inference/core/interfaces/camera/video_source.py index 1516d18d9..bae167651 100644 --- a/inference/core/interfaces/camera/video_source.py +++ b/inference/core/interfaces/camera/video_source.py @@ -17,6 +17,7 @@ DEFAULT_BUFFER_SIZE, DEFAULT_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW, DEFAULT_MINIMUM_ADAPTIVE_MODE_SAMPLES, + RUNS_ON_JETSON, ) from inference.core.interfaces.camera.entities import ( SourceProperties, @@ -132,7 +133,10 @@ def locked_executor(video_source: "VideoSource", *args, **kwargs) -> None: class CV2VideoFrameProducer(VideoFrameProducer): def __init__(self, video: Union[str, int]): - self.stream = cv2.VideoCapture(video) + if _consumes_camera_on_jetson(video=video): + self.stream = cv2.VideoCapture(video, cv2.CAP_V4L2) + else: + self.stream = cv2.VideoCapture(video) def isOpened(self) -> bool: return self.stream.isOpened() @@ -165,6 +169,14 @@ def release(self): self.stream.release() +def _consumes_camera_on_jetson(video: Union[str, int]) -> bool: + if not RUNS_ON_JETSON: + return False + if isinstance(video, int): + return True + return video.startswith("/dev/video") + + class VideoSource: @classmethod def init( diff --git a/inference/core/interfaces/http/http_api.py b/inference/core/interfaces/http/http_api.py index 4a72ca564..18e5de6d3 100644 --- a/inference/core/interfaces/http/http_api.py +++ b/inference/core/interfaces/http/http_api.py @@ -1,4 +1,5 @@ import base64 +import os import traceback from functools import partial, wraps from time import sleep @@ -109,6 +110,7 @@ CORE_MODELS_ENABLED, DEDICATED_DEPLOYMENT_WORKSPACE_URL, DISABLE_WORKFLOW_ENDPOINTS, + ENABLE_STREAM_API, LAMBDA, LEGACY_ROUTE_ENABLED, LMM_ENABLED, @@ -154,6 +156,30 @@ orjson_response, serialise_workflow_result, ) +from inference.core.interfaces.stream_manager.api.entities import ( + CommandResponse, + ConsumePipelineResponse, + InferencePipelineStatusResponse, + ListPipelinesResponse, +) +from inference.core.interfaces.stream_manager.api.errors import ( + ProcessesManagerAuthorisationError, + ProcessesManagerClientError, + ProcessesManagerInvalidPayload, + ProcessesManagerNotFoundError, +) +from inference.core.interfaces.stream_manager.api.stream_manager_client import ( + StreamManagerClient, +) +from inference.core.interfaces.stream_manager.manager_app.entities import ( + ConsumeResultsPayload, + InitialisePipelinePayload, +) +from inference.core.interfaces.stream_manager.manager_app.errors import ( + CommunicationProtocolError, + MalformedPayloadError, + MessageToBigError, +) from inference.core.managers.base import ModelManager from inference.core.roboflow_api import ( get_roboflow_dataset_type, @@ -283,7 +309,20 @@ async def wrapped_route(*args, **kwargs): "inner_error_message": str(error.inner_error), }, ) - except RoboflowAPINotAuthorizedError: + except ( + ProcessesManagerInvalidPayload, + MalformedPayloadError, + MessageToBigError, + ) as error: + resp = JSONResponse( + status_code=400, + content={ + "message": error.public_message, + "error_type": error.__class__.__name__, + "inner_error_type": error.inner_error_type, + }, + ) + except (RoboflowAPINotAuthorizedError, ProcessesManagerAuthorisationError): resp = JSONResponse( status_code=401, content={ @@ -302,6 +341,16 @@ async def wrapped_route(*args, **kwargs): }, ) traceback.print_exc() + except ProcessesManagerNotFoundError as error: + resp = JSONResponse( + status_code=404, + content={ + "message": error.public_message, + "error_type": error.__class__.__name__, + "inner_error_type": error.inner_error_type, + }, + ) + traceback.print_exc() except ( InvalidEnvironmentVariableError, MissingServiceSecretError, @@ -367,6 +416,19 @@ async def wrapped_route(*args, **kwargs): }, ) traceback.print_exc() + except ( + ProcessesManagerClientError, + CommunicationProtocolError, + ) as error: + resp = JSONResponse( + status_code=500, + content={ + "message": error.public_message, + "error_type": error.__class__.__name__, + "inner_error_type": error.inner_error_type, + }, + ) + traceback.print_exc() except Exception: resp = JSONResponse(status_code=500, content={"message": "Internal error."}) traceback.print_exc() @@ -560,6 +622,17 @@ def _unauthorized_response(msg): self.app = app self.model_manager = model_manager + self.stream_manager_client: Optional[StreamManagerClient] = None + + if ENABLE_STREAM_API: + operations_timeout = os.getenv("STREAM_MANAGER_OPERATIONS_TIMEOUT") + if operations_timeout is not None: + operations_timeout = float(operations_timeout) + self.stream_manager_client = StreamManagerClient.init( + host=os.getenv("STREAM_MANAGER_HOST", "127.0.0.1"), + port=int(os.getenv("STREAM_MANAGER_PORT", "7070")), + operations_timeout=operations_timeout, + ) async def process_inference_request( inference_request: InferenceRequest, **kwargs @@ -1184,6 +1257,96 @@ async def validate_workflow( ) return WorkflowValidationStatus(status="ok") + if ENABLE_STREAM_API: + + @app.get( + "/inference_pipelines/list", + response_model=ListPipelinesResponse, + summary="[EXPERIMENTAL] List active InferencePipelines", + description="[EXPERIMENTAL] Listing all active InferencePipelines processing videos", + ) + @with_route_exceptions + async def list_pipelines(_: Request) -> ListPipelinesResponse: + return await self.stream_manager_client.list_pipelines() + + @app.get( + "/inference_pipelines/{pipeline_id}/status", + response_model=InferencePipelineStatusResponse, + summary="[EXPERIMENTAL] Get status of InferencePipeline", + description="[EXPERIMENTAL] Get status of InferencePipeline", + ) + @with_route_exceptions + async def get_status(pipeline_id: str) -> InferencePipelineStatusResponse: + return await self.stream_manager_client.get_status( + pipeline_id=pipeline_id + ) + + @app.post( + "/inference_pipelines/initialise", + response_model=CommandResponse, + summary="[EXPERIMENTAL] Starts new InferencePipeline", + description="[EXPERIMENTAL] Starts new InferencePipeline", + ) + @with_route_exceptions + async def initialise(request: InitialisePipelinePayload) -> CommandResponse: + return await self.stream_manager_client.initialise_pipeline( + initialisation_request=request + ) + + @app.post( + "/inference_pipelines/{pipeline_id}/pause", + response_model=CommandResponse, + summary="[EXPERIMENTAL] Pauses the InferencePipeline", + description="[EXPERIMENTAL] Pauses the InferencePipeline", + ) + @with_route_exceptions + async def pause(pipeline_id: str) -> CommandResponse: + return await self.stream_manager_client.pause_pipeline( + pipeline_id=pipeline_id + ) + + @app.post( + "/inference_pipelines/{pipeline_id}/resume", + response_model=CommandResponse, + summary="[EXPERIMENTAL] Resumes the InferencePipeline", + description="[EXPERIMENTAL] Resumes the InferencePipeline", + ) + @with_route_exceptions + async def resume(pipeline_id: str) -> CommandResponse: + return await self.stream_manager_client.resume_pipeline( + pipeline_id=pipeline_id + ) + + @app.post( + "/inference_pipelines/{pipeline_id}/terminate", + response_model=CommandResponse, + summary="[EXPERIMENTAL] Terminates the InferencePipeline", + description="[EXPERIMENTAL] Terminates the InferencePipeline", + ) + @with_route_exceptions + async def terminate(pipeline_id: str) -> CommandResponse: + return await self.stream_manager_client.terminate_pipeline( + pipeline_id=pipeline_id + ) + + @app.get( + "/inference_pipelines/{pipeline_id}/consume", + response_model=ConsumePipelineResponse, + summary="[EXPERIMENTAL] Consumes InferencePipeline result", + description="[EXPERIMENTAL] Consumes InferencePipeline result", + ) + @with_route_exceptions + async def consume( + pipeline_id: str, + request: Optional[ConsumeResultsPayload] = None, + ) -> ConsumePipelineResponse: + if request is None: + request = ConsumeResultsPayload() + return await self.stream_manager_client.consume_pipeline_result( + pipeline_id=pipeline_id, + excluded_fields=request.excluded_fields, + ) + if CORE_MODELS_ENABLED: if CORE_MODEL_CLIP_ENABLED: diff --git a/inference/core/interfaces/stream/inference_pipeline.py b/inference/core/interfaces/stream/inference_pipeline.py index 59fd41dcb..07756f224 100644 --- a/inference/core/interfaces/stream/inference_pipeline.py +++ b/inference/core/interfaces/stream/inference_pipeline.py @@ -444,6 +444,7 @@ def init_with_workflow( workflows_thread_pool_workers: int = 4, cancel_thread_pool_tasks_on_exit: bool = True, video_metadata_input_name: str = "video_metadata", + batch_collection_timeout: Optional[float] = None, ) -> "InferencePipeline": """ This class creates the abstraction for making inferences from given workflow against video stream. @@ -502,6 +503,10 @@ def init_with_workflow( video_metadata_input_name (str): Name of input for video metadata defined in `workflow_specification` or Workflow definition saved on the Roboflow Platform. `InferencePipeline` will be injecting video frames metadata to workflows through that parameter name. + batch_collection_timeout (Optional[float]): Parameter of multiplex_videos(...) dictating how long process + to grab frames from multiple sources can wait for batch to be filled before yielding already collected + frames. Please set this value in PRODUCTION to avoid performance drops when specific sources shows + unstable latency. Visit `multiplex_videos(...)` for more information about multiplexing process. Other ENV variables involved in low-level configuration: * INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching * INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop @@ -598,6 +603,7 @@ def init_with_workflow( source_buffer_filling_strategy=source_buffer_filling_strategy, source_buffer_consumption_strategy=source_buffer_consumption_strategy, video_source_properties=video_source_properties, + batch_collection_timeout=batch_collection_timeout, ) @classmethod @@ -915,7 +921,7 @@ def _use_sink( payload=payload, status_update_handlers=self._status_update_handlers, ) - logger.warning(f"Error in results dispatching - {error}") + logger.exception(f"Error in results dispatching - {error}") def _generate_frames( self, diff --git a/inference/core/interfaces/stream/model_handlers/workflows.py b/inference/core/interfaces/stream/model_handlers/workflows.py index 1e2eeb082..d6052db9f 100644 --- a/inference/core/interfaces/stream/model_handlers/workflows.py +++ b/inference/core/interfaces/stream/model_handlers/workflows.py @@ -1,4 +1,3 @@ -import asyncio from typing import List, Optional from inference.core.interfaces.camera.entities import VideoFrame diff --git a/inference/core/interfaces/stream/sinks.py b/inference/core/interfaces/stream/sinks.py index 3e9affbef..cd6d2a4bb 100644 --- a/inference/core/interfaces/stream/sinks.py +++ b/inference/core/interfaces/stream/sinks.py @@ -1,8 +1,9 @@ import json import socket +from collections import deque from datetime import datetime from functools import partial -from typing import Callable, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Set, Tuple, Union import cv2 import numpy as np @@ -361,8 +362,7 @@ def multi_sink( sink(predictions, video_frame) except Exception as error: logger.error( - f"Could not sent prediction with frame_id={video_frame.frame_id} to sink " - f"due to error: {error}." + f"Could not sent prediction with to sink due to error: {error}." ) @@ -539,3 +539,32 @@ def __enter__(self) -> "VideoFileSink": def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.release() + + +class InMemoryBufferSink: + + @classmethod + def init(cls, queue_size: int): + return cls(queue_size=queue_size) + + def __init__(self, queue_size: int): + self._buffer = deque(maxlen=queue_size) + + def on_prediction( + self, + predictions: Union[dict, List[Optional[dict]]], + video_frame: Union[VideoFrame, List[Optional[VideoFrame]]], + ) -> None: + if not isinstance(predictions, list): + predictions = [predictions] + if not isinstance(video_frame, list): + video_frame = [video_frame] + self._buffer.append((predictions, video_frame)) + + def empty(self) -> bool: + return len(self._buffer) == 0 + + def consume_prediction( + self, + ) -> Tuple[List[Optional[dict]], List[Optional[VideoFrame]]]: + return self._buffer.popleft() diff --git a/inference/core/interfaces/stream_manager/__init__.py b/inference/core/interfaces/stream_manager/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/inference/core/interfaces/stream_manager/api/__init__.py b/inference/core/interfaces/stream_manager/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/inference/core/interfaces/stream_manager/api/entities.py b/inference/core/interfaces/stream_manager/api/entities.py new file mode 100644 index 000000000..7ae7ec10f --- /dev/null +++ b/inference/core/interfaces/stream_manager/api/entities.py @@ -0,0 +1,37 @@ +from datetime import datetime +from typing import Dict, List, Optional + +from pydantic import BaseModel, Field + + +class CommandContext(BaseModel): + request_id: Optional[str] = Field( + description="Server-side request ID", default=None + ) + pipeline_id: Optional[str] = Field( + description="Identifier of pipeline connected to operation", default=None + ) + + +class CommandResponse(BaseModel): + status: str = Field(description="Operation status") + context: CommandContext = Field(description="Context of the command.") + + +class InferencePipelineStatusResponse(CommandResponse): + report: dict + + +class ListPipelinesResponse(CommandResponse): + pipelines: List[str] = Field(description="List IDs of active pipelines") + + +class FrameMetadata(BaseModel): + frame_timestamp: datetime + frame_id: int + source_id: Optional[int] + + +class ConsumePipelineResponse(CommandResponse): + outputs: List[dict] + frames_metadata: List[FrameMetadata] diff --git a/inference/core/interfaces/stream_manager/api/errors.py b/inference/core/interfaces/stream_manager/api/errors.py new file mode 100644 index 000000000..e4781ede8 --- /dev/null +++ b/inference/core/interfaces/stream_manager/api/errors.py @@ -0,0 +1,52 @@ +from typing import Optional + + +class ProcessesManagerClientError(Exception): + + def __init__( + self, + private_message: str, + public_message: Optional[str] = None, + inner_error: Optional[Exception] = None, + ): + super().__init__(private_message) + self._public_message = public_message + self._inner_error = inner_error + + @property + def public_message(self) -> str: + return self._public_message + + @property + def inner_error_type(self) -> Optional[str]: + if self._inner_error is None: + return None + return self._inner_error.__class__.__name__ + + @property + def inner_error(self) -> Optional[Exception]: + return self._inner_error + + +class ConnectivityError(ProcessesManagerClientError): + pass + + +class ProcessesManagerInternalError(ProcessesManagerClientError): + pass + + +class ProcessesManagerOperationError(ProcessesManagerClientError): + pass + + +class ProcessesManagerInvalidPayload(ProcessesManagerClientError): + pass + + +class ProcessesManagerNotFoundError(ProcessesManagerClientError): + pass + + +class ProcessesManagerAuthorisationError(ProcessesManagerClientError): + pass diff --git a/inference/core/interfaces/stream_manager/api/stream_manager_client.py b/inference/core/interfaces/stream_manager/api/stream_manager_client.py new file mode 100644 index 000000000..d544101a0 --- /dev/null +++ b/inference/core/interfaces/stream_manager/api/stream_manager_client.py @@ -0,0 +1,351 @@ +import asyncio +import json +from asyncio import StreamReader, StreamWriter +from enum import Enum +from json import JSONDecodeError +from typing import List, Optional, Tuple + +from inference.core import logger +from inference.core.interfaces.stream_manager.api.entities import ( + CommandContext, + CommandResponse, + ConsumePipelineResponse, + FrameMetadata, + InferencePipelineStatusResponse, + ListPipelinesResponse, +) +from inference.core.interfaces.stream_manager.api.errors import ( + ConnectivityError, + ProcessesManagerAuthorisationError, + ProcessesManagerClientError, + ProcessesManagerInternalError, + ProcessesManagerInvalidPayload, + ProcessesManagerNotFoundError, + ProcessesManagerOperationError, +) +from inference.core.interfaces.stream_manager.manager_app.entities import ( + ERROR_TYPE_KEY, + PIPELINE_ID_KEY, + REQUEST_ID_KEY, + RESPONSE_KEY, + STATUS_KEY, + TYPE_KEY, + CommandType, + ErrorType, + InitialisePipelinePayload, + OperationStatus, +) +from inference.core.interfaces.stream_manager.manager_app.errors import ( + CommunicationProtocolError, + MalformedHeaderError, + MalformedPayloadError, + MessageToBigError, + TransmissionChannelClosed, +) + +BUFFER_SIZE = 16384 +HEADER_SIZE = 4 + +ERRORS_MAPPING = { + ErrorType.INTERNAL_ERROR.value: ProcessesManagerInternalError, + ErrorType.INVALID_PAYLOAD.value: ProcessesManagerInvalidPayload, + ErrorType.NOT_FOUND.value: ProcessesManagerNotFoundError, + ErrorType.OPERATION_ERROR.value: ProcessesManagerOperationError, + ErrorType.AUTHORISATION_ERROR.value: ProcessesManagerAuthorisationError, +} + + +class StreamManagerClient: + @classmethod + def init( + cls, + host: str, + port: int, + operations_timeout: Optional[float] = None, + header_size: int = HEADER_SIZE, + buffer_size: int = BUFFER_SIZE, + ) -> "StreamManagerClient": + return cls( + host=host, + port=port, + operations_timeout=operations_timeout, + header_size=header_size, + buffer_size=buffer_size, + ) + + def __init__( + self, + host: str, + port: int, + operations_timeout: Optional[float], + header_size: int, + buffer_size: int, + ): + self._host = host + self._port = port + self._operations_timeout = operations_timeout + self._header_size = header_size + self._buffer_size = buffer_size + + async def list_pipelines(self) -> ListPipelinesResponse: + command = { + TYPE_KEY: CommandType.LIST_PIPELINES, + } + response = await self._handle_command(command=command) + status = response[RESPONSE_KEY][STATUS_KEY] + context = CommandContext( + request_id=response.get(REQUEST_ID_KEY), + pipeline_id=response.get(PIPELINE_ID_KEY), + ) + pipelines = response[RESPONSE_KEY]["pipelines"] + return ListPipelinesResponse( + status=status, + context=context, + pipelines=pipelines, + ) + + async def initialise_pipeline( + self, initialisation_request: InitialisePipelinePayload + ) -> CommandResponse: + command = initialisation_request.dict(exclude_none=True) + command[TYPE_KEY] = CommandType.INIT + response = await self._handle_command(command=command) + return build_response(response=response) + + async def terminate_pipeline(self, pipeline_id: str) -> CommandResponse: + command = { + TYPE_KEY: CommandType.TERMINATE, + PIPELINE_ID_KEY: pipeline_id, + } + response = await self._handle_command(command=command) + return build_response(response=response) + + async def pause_pipeline(self, pipeline_id: str) -> CommandResponse: + command = { + TYPE_KEY: CommandType.MUTE, + PIPELINE_ID_KEY: pipeline_id, + } + response = await self._handle_command(command=command) + return build_response(response=response) + + async def resume_pipeline(self, pipeline_id: str) -> CommandResponse: + command = { + TYPE_KEY: CommandType.RESUME, + PIPELINE_ID_KEY: pipeline_id, + } + response = await self._handle_command(command=command) + return build_response(response=response) + + async def get_status(self, pipeline_id: str) -> InferencePipelineStatusResponse: + command = { + TYPE_KEY: CommandType.STATUS, + PIPELINE_ID_KEY: pipeline_id, + } + response = await self._handle_command(command=command) + status = response[RESPONSE_KEY][STATUS_KEY] + context = CommandContext( + request_id=response.get(REQUEST_ID_KEY), + pipeline_id=response.get(PIPELINE_ID_KEY), + ) + report = response[RESPONSE_KEY]["report"] + return InferencePipelineStatusResponse( + status=status, + context=context, + report=report, + ) + + async def consume_pipeline_result( + self, + pipeline_id: str, + excluded_fields: List[str], + ) -> ConsumePipelineResponse: + command = { + TYPE_KEY: CommandType.CONSUME_RESULT, + PIPELINE_ID_KEY: pipeline_id, + "excluded_fields": excluded_fields, + } + response = await self._handle_command(command=command) + status = response[RESPONSE_KEY][STATUS_KEY] + context = CommandContext( + request_id=response.get(REQUEST_ID_KEY), + pipeline_id=response.get(PIPELINE_ID_KEY), + ) + return ConsumePipelineResponse( + status=status, + context=context, + outputs=response[RESPONSE_KEY]["outputs"], + frames_metadata=[ + FrameMetadata.model_validate(f) + for f in response[RESPONSE_KEY]["frames_metadata"] + ], + ) + + async def _handle_command(self, command: dict) -> dict: + response = await send_command( + host=self._host, + port=self._port, + command=command, + header_size=self._header_size, + buffer_size=self._buffer_size, + timeout=self._operations_timeout, + ) + if is_request_unsuccessful(response=response): + dispatch_error(error_response=response) + return response + + +async def send_command( + host: str, + port: int, + command: dict, + header_size: int, + buffer_size: int, + timeout: Optional[float] = None, +) -> dict: + try: + reader, writer = await establish_socket_connection( + host=host, port=port, timeout=timeout + ) + await send_message( + writer=writer, message=command, header_size=header_size, timeout=timeout + ) + data = await receive_message( + reader, header_size=header_size, buffer_size=buffer_size, timeout=timeout + ) + writer.close() + await writer.wait_closed() + return json.loads(data) + except JSONDecodeError as error: + raise MalformedPayloadError( + private_message=f"Could not decode response. Cause: {error}", + public_message=f"Could not decode response from InferencePipeline Manager", + inner_error=error, + ) from error + except (OSError, asyncio.TimeoutError) as error: + raise ConnectivityError( + private_message=f"Could not communicate with InferencePipeline Manager", + public_message="Could not establish communication with InferencePipeline Manager", + inner_error=error, + ) from error + + +async def establish_socket_connection( + host: str, port: int, timeout: Optional[float] = None +) -> Tuple[StreamReader, StreamWriter]: + return await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout) + + +async def send_message( + writer: StreamWriter, + message: dict, + header_size: int, + timeout: Optional[float] = None, +) -> None: + try: + body = json.dumps(message, default=_json_serializer).encode("utf-8") + header = len(body).to_bytes(length=header_size, byteorder="big") + payload = header + body + writer.write(payload) + await asyncio.wait_for(writer.drain(), timeout=timeout) + except (TypeError, ValueError) as error: + raise MalformedPayloadError( + private_message=f"Could not serialise message. Details: {error}", + public_message="Could not serialise payload of command that should be sent to InferencePipeline Manager", + inner_error=error, + ) from error + except OverflowError as error: + raise MessageToBigError( + private_message=f"Could not send message due to size overflow. Details: {error}", + public_message="InferencePipeline Manager command payload to big.", + inner_error=error, + ) from error + except asyncio.TimeoutError as error: + raise ConnectivityError( + private_message=f"Could not communicate with InferencePipeline Manager. Error: {error}", + public_message="Could not communicate with InferencePipeline Manager.", + inner_error=error, + ) from error + except Exception as error: + raise CommunicationProtocolError( + private_message=f"Could not send message to InferencePipeline Manager. Cause: {error}", + public_message="Unknown communication error while sending message to InferencePipeline Manager.", + inner_error=error, + ) from error + + +def _json_serializer(o: object) -> str: + if isinstance(o, Enum): + return o.value + raise ValueError(f"Could not serialise object: {o}") + + +async def receive_message( + reader: StreamReader, + header_size: int, + buffer_size: int, + timeout: Optional[float] = None, +) -> bytes: + header = await asyncio.wait_for(reader.read(header_size), timeout=timeout) + if len(header) != header_size: + raise MalformedHeaderError( + private_message="Header size missmatch", + public_message="Internal error in communication with InferencePipeline Manager. Violation of " + "communication protocol - malformed header of message.", + ) + payload_size = int.from_bytes(bytes=header, byteorder="big") + received = b"" + while len(received) < payload_size: + chunk = await asyncio.wait_for(reader.read(buffer_size), timeout=timeout) + if len(chunk) == 0: + raise TransmissionChannelClosed( + private_message="Socket was closed to read before payload was decoded.", + public_message="Internal error in communication with InferencePipeline Manager. Could not receive full " + "message.", + ) + received += chunk + return received + + +def is_request_unsuccessful(response: dict) -> bool: + return ( + response.get(RESPONSE_KEY, {}).get(STATUS_KEY, OperationStatus.FAILURE.value) + != OperationStatus.SUCCESS.value + ) + + +def dispatch_error(error_response: dict) -> None: + response_payload = error_response.get(RESPONSE_KEY, {}) + error_type = response_payload.get(ERROR_TYPE_KEY) + error_class = response_payload.get("error_class", "N/A") + error_message = response_payload.get("error_message", "N/A") + public_error_message = response_payload.get("public_error_message", "N/A") + logger.error( + f"Error with command handling raised by InferencePipeline Manager. " + f"error_type={error_type} error_class={error_class} " + f"error_message={error_message}" + ) + if error_type in ERRORS_MAPPING: + raise ERRORS_MAPPING[error_type]( + private_message=f"Error with command handling raised by InferencePipeline Manager. " + f"Error type: {error_type}. Details: {error_message}", + public_message=f"Error with command handling raised by InferencePipeline Manager. " + f"Error type: {error_type}. Details: {public_error_message}", + ) + raise ProcessesManagerClientError( + private_message=f"Unknown error with command handling raised by InferencePipeline Manager. " + f"Error type: {error_type}. Details: {error_message}", + public_message=f"Unknown error with command handling raised by InferencePipeline Manager. " + f"Raised error type: {error_type}. Details: {public_error_message}", + ) + + +def build_response(response: dict) -> CommandResponse: + status = response[RESPONSE_KEY][STATUS_KEY] + context = CommandContext( + request_id=response.get(REQUEST_ID_KEY), + pipeline_id=response.get(PIPELINE_ID_KEY), + ) + return CommandResponse( + status=status, + context=context, + ) diff --git a/inference/core/interfaces/stream_manager/manager_app/__init__.py b/inference/core/interfaces/stream_manager/manager_app/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/inference/core/interfaces/stream_manager/manager_app/app.py b/inference/core/interfaces/stream_manager/manager_app/app.py new file mode 100644 index 000000000..5cbef9811 --- /dev/null +++ b/inference/core/interfaces/stream_manager/manager_app/app.py @@ -0,0 +1,286 @@ +import os +import signal +import socket +import sys +from functools import partial +from multiprocessing import Process, Queue +from socketserver import BaseRequestHandler, BaseServer +from types import FrameType +from typing import Any, Dict, Optional, Tuple +from uuid import uuid4 + +from inference.core import logger +from inference.core.interfaces.stream_manager.manager_app.communication import ( + receive_socket_data, + send_data_trough_socket, +) +from inference.core.interfaces.stream_manager.manager_app.entities import ( + PIPELINE_ID_KEY, + STATUS_KEY, + TYPE_KEY, + CommandType, + ErrorType, + OperationStatus, +) +from inference.core.interfaces.stream_manager.manager_app.errors import ( + MalformedPayloadError, +) +from inference.core.interfaces.stream_manager.manager_app.inference_pipeline_manager import ( + InferencePipelineManager, +) +from inference.core.interfaces.stream_manager.manager_app.serialisation import ( + describe_error, + prepare_error_response, + prepare_response, +) +from inference.core.interfaces.stream_manager.manager_app.tcp_server import ( + RoboflowTCPServer, +) + +PROCESSES_TABLE: Dict[str, Tuple[Process, Queue, Queue]] = {} +HEADER_SIZE = 4 +SOCKET_BUFFER_SIZE = 16384 +HOST = os.getenv("STREAM_MANAGER_HOST", "127.0.0.1") +PORT = int(os.getenv("STREAM_MANAGER_PORT", "7070")) +SOCKET_TIMEOUT = float(os.getenv("STREAM_MANAGER_SOCKET_TIMEOUT", "5.0")) + + +class InferencePipelinesManagerHandler(BaseRequestHandler): + def __init__( + self, + request: socket.socket, + client_address: Any, + server: BaseServer, + processes_table: Dict[str, Tuple[Process, Queue, Queue]], + ): + self._processes_table = processes_table # in this case it's required to set the state of class before superclass init - as it invokes handle() + super().__init__(request, client_address, server) + + def handle(self) -> None: + pipeline_id: Optional[str] = None + request_id = str(uuid4()) + try: + data = receive_socket_data( + source=self.request, + header_size=HEADER_SIZE, + buffer_size=SOCKET_BUFFER_SIZE, + ) + data[TYPE_KEY] = CommandType(data[TYPE_KEY]) + if data[TYPE_KEY] is CommandType.LIST_PIPELINES: + return self._list_pipelines(request_id=request_id) + if data[TYPE_KEY] is CommandType.INIT: + return self._initialise_pipeline(request_id=request_id, command=data) + pipeline_id = data[PIPELINE_ID_KEY] + if data[TYPE_KEY] is CommandType.TERMINATE: + self._terminate_pipeline( + request_id=request_id, pipeline_id=pipeline_id, command=data + ) + else: + response = handle_command( + processes_table=self._processes_table, + request_id=request_id, + pipeline_id=pipeline_id, + command=data, + ) + serialised_response = prepare_response( + request_id=request_id, response=response, pipeline_id=pipeline_id + ) + send_data_trough_socket( + target=self.request, + header_size=HEADER_SIZE, + data=serialised_response, + request_id=request_id, + pipeline_id=pipeline_id, + ) + except (KeyError, ValueError, MalformedPayloadError) as error: + logger.error( + f"Invalid payload in processes manager. error={error} request_id={request_id}..." + ) + payload = prepare_error_response( + request_id=request_id, + error=error, + error_type=ErrorType.INVALID_PAYLOAD, + pipeline_id=pipeline_id, + ) + send_data_trough_socket( + target=self.request, + header_size=HEADER_SIZE, + data=payload, + request_id=request_id, + pipeline_id=pipeline_id, + ) + except Exception as error: + logger.error( + f"Internal error in processes manager. error={error} request_id={request_id}..." + ) + payload = prepare_error_response( + request_id=request_id, + error=error, + error_type=ErrorType.INTERNAL_ERROR, + pipeline_id=pipeline_id, + ) + send_data_trough_socket( + target=self.request, + header_size=HEADER_SIZE, + data=payload, + request_id=request_id, + pipeline_id=pipeline_id, + ) + + def _list_pipelines(self, request_id: str) -> None: + serialised_response = prepare_response( + request_id=request_id, + response={ + "pipelines": list(self._processes_table.keys()), + STATUS_KEY: OperationStatus.SUCCESS, + }, + pipeline_id=None, + ) + send_data_trough_socket( + target=self.request, + header_size=HEADER_SIZE, + data=serialised_response, + request_id=request_id, + ) + + def _initialise_pipeline(self, request_id: str, command: dict) -> None: + pipeline_id = str(uuid4()) + command_queue = Queue() + responses_queue = Queue() + inference_pipeline_manager = InferencePipelineManager.init( + pipeline_id=pipeline_id, + command_queue=command_queue, + responses_queue=responses_queue, + ) + inference_pipeline_manager.start() + self._processes_table[pipeline_id] = ( + inference_pipeline_manager, + command_queue, + responses_queue, + ) + command_queue.put((request_id, command)) + response = get_response_ignoring_thrash( + responses_queue=responses_queue, matching_request_id=request_id + ) + serialised_response = prepare_response( + request_id=request_id, response=response, pipeline_id=pipeline_id + ) + send_data_trough_socket( + target=self.request, + header_size=HEADER_SIZE, + data=serialised_response, + request_id=request_id, + pipeline_id=pipeline_id, + ) + + def _terminate_pipeline( + self, request_id: str, pipeline_id: str, command: dict + ) -> None: + response = handle_command( + processes_table=self._processes_table, + request_id=request_id, + pipeline_id=pipeline_id, + command=command, + ) + if response[STATUS_KEY] is OperationStatus.SUCCESS: + logger.info( + f"Joining inference pipeline. pipeline_id={pipeline_id} request_id={request_id}" + ) + join_inference_pipeline( + processes_table=self._processes_table, pipeline_id=pipeline_id + ) + logger.info( + f"Joined inference pipeline. pipeline_id={pipeline_id} request_id={request_id}" + ) + serialised_response = prepare_response( + request_id=request_id, response=response, pipeline_id=pipeline_id + ) + send_data_trough_socket( + target=self.request, + header_size=HEADER_SIZE, + data=serialised_response, + request_id=request_id, + pipeline_id=pipeline_id, + ) + + +def handle_command( + processes_table: Dict[str, Tuple[Process, Queue, Queue]], + request_id: str, + pipeline_id: str, + command: dict, +) -> dict: + if pipeline_id not in processes_table: + return describe_error( + exception=None, + error_type=ErrorType.NOT_FOUND, + public_error_message=f"Could not found InferencePipeline with id={pipeline_id}.", + ) + _, command_queue, responses_queue = processes_table[pipeline_id] + command_queue.put((request_id, command)) + return get_response_ignoring_thrash( + responses_queue=responses_queue, matching_request_id=request_id + ) + + +def get_response_ignoring_thrash( + responses_queue: Queue, matching_request_id: str +) -> dict: + while True: + response = responses_queue.get() + if response[0] == matching_request_id: + return response[1] + logger.warning( + f"Dropping response for request_id={response[0]} with payload={response[1]}" + ) + + +def execute_termination( + signal_number: int, + frame: FrameType, + processes_table: Dict[str, Tuple[Process, Queue, Queue]], +) -> None: + pipeline_ids = list(processes_table.keys()) + for pipeline_id in pipeline_ids: + logger.info(f"Terminating pipeline: {pipeline_id}") + processes_table[pipeline_id][0].terminate() + logger.info(f"Pipeline: {pipeline_id} terminated.") + logger.info(f"Joining pipeline: {pipeline_id}") + processes_table[pipeline_id][0].join() + logger.info(f"Pipeline: {pipeline_id} joined.") + logger.info(f"Termination handler completed.") + sys.exit(0) + + +def join_inference_pipeline( + processes_table: Dict[str, Tuple[Process, Queue, Queue]], pipeline_id: str +) -> None: + inference_pipeline_manager, command_queue, responses_queue = processes_table[ + pipeline_id + ] + inference_pipeline_manager.join() + del processes_table[pipeline_id] + + +def start() -> None: + signal.signal( + signal.SIGINT, partial(execute_termination, processes_table=PROCESSES_TABLE) + ) + signal.signal( + signal.SIGTERM, partial(execute_termination, processes_table=PROCESSES_TABLE) + ) + with RoboflowTCPServer( + server_address=(HOST, PORT), + handler_class=partial( + InferencePipelinesManagerHandler, processes_table=PROCESSES_TABLE + ), + socket_operations_timeout=SOCKET_TIMEOUT, + ) as tcp_server: + logger.info( + f"Inference Pipeline Processes Manager is ready to accept connections at {(HOST, PORT)}" + ) + tcp_server.serve_forever() + + +if __name__ == "__main__": + start() diff --git a/inference/core/interfaces/stream_manager/manager_app/communication.py b/inference/core/interfaces/stream_manager/manager_app/communication.py new file mode 100644 index 000000000..a220a1609 --- /dev/null +++ b/inference/core/interfaces/stream_manager/manager_app/communication.py @@ -0,0 +1,83 @@ +import json +import socket +from typing import Optional + +from inference.core import logger +from inference.core.interfaces.stream_manager.manager_app.entities import ErrorType +from inference.core.interfaces.stream_manager.manager_app.errors import ( + MalformedHeaderError, + MalformedPayloadError, + TransmissionChannelClosed, +) +from inference.core.interfaces.stream_manager.manager_app.serialisation import ( + prepare_error_response, +) + + +def receive_socket_data( + source: socket.socket, header_size: int, buffer_size: int +) -> dict: + header = source.recv(header_size) + if len(header) != header_size: + raise MalformedHeaderError( + private_message=f"Expected header size: {header_size}, received: {header}", + public_message=f"Expected header size: {header_size}, received: {header}", + ) + payload_size = int.from_bytes(bytes=header, byteorder="big") + if payload_size <= 0: + raise MalformedHeaderError( + private_message=f"Header is indicating non positive payload size: {payload_size}", + public_message=f"Header is indicating non positive payload size: {payload_size}", + ) + received = b"" + while len(received) < payload_size: + chunk = source.recv(buffer_size) + if len(chunk) == 0: + raise TransmissionChannelClosed( + private_message="Socket was closed to read before payload was decoded.", + public_message="Socket was closed to read before payload was decoded.", + ) + received += chunk + try: + return json.loads(received) + except ValueError as error: + raise MalformedPayloadError( + public_message="Received payload that is not in a JSON format", + private_message="Received payload that is not in a JSON format", + inner_error=error, + ) + + +def send_data_trough_socket( + target: socket.socket, + header_size: int, + data: bytes, + request_id: str, + recover_from_overflow: bool = True, + pipeline_id: Optional[str] = None, +) -> None: + try: + data_size = len(data) + header = data_size.to_bytes(length=header_size, byteorder="big") + payload = header + data + target.sendall(payload) + except OverflowError as error: + if not recover_from_overflow: + logger.error(f"OverflowError was suppressed. {error}") + return None + error_response = prepare_error_response( + request_id=request_id, + error=error, + error_type=ErrorType.INTERNAL_ERROR, + pipeline_id=pipeline_id, + ) + send_data_trough_socket( + target=target, + header_size=header_size, + data=error_response, + request_id=request_id, + recover_from_overflow=False, + pipeline_id=pipeline_id, + ) + except Exception as error: + logger.error(f"Could not send the response through socket. Error: {error}") diff --git a/inference/core/interfaces/stream_manager/manager_app/entities.py b/inference/core/interfaces/stream_manager/manager_app/entities.py new file mode 100644 index 000000000..a9a0b26c7 --- /dev/null +++ b/inference/core/interfaces/stream_manager/manager_app/entities.py @@ -0,0 +1,88 @@ +from enum import Enum +from typing import Any, Dict, List, Literal, Optional, Union + +from pydantic import BaseModel, Field + +from inference.core.interfaces.camera.video_source import ( + BufferConsumptionStrategy, + BufferFillingStrategy, +) + +STATUS_KEY = "status" +TYPE_KEY = "type" +ERROR_TYPE_KEY = "error_type" +REQUEST_ID_KEY = "request_id" +PIPELINE_ID_KEY = "pipeline_id" +COMMAND_KEY = "command" +RESPONSE_KEY = "response" +ENCODING = "utf-8" + + +class OperationStatus(str, Enum): + SUCCESS = "success" + FAILURE = "failure" + + +class ErrorType(str, Enum): + INTERNAL_ERROR = "internal_error" + INVALID_PAYLOAD = "invalid_payload" + NOT_FOUND = "not_found" + OPERATION_ERROR = "operation_error" + AUTHORISATION_ERROR = "authorisation_error" + + +class CommandType(str, Enum): + INIT = "init" + MUTE = "mute" + RESUME = "resume" + STATUS = "status" + TERMINATE = "terminate" + LIST_PIPELINES = "list_pipelines" + CONSUME_RESULT = "consume_result" + + +class VideoConfiguration(BaseModel): + type: Literal["VideoConfiguration"] + video_reference: Union[str, int, List[Union[str, int]]] + max_fps: Optional[Union[float, int]] = None + source_buffer_filling_strategy: Optional[BufferFillingStrategy] = ( + BufferFillingStrategy.DROP_OLDEST + ) + source_buffer_consumption_strategy: Optional[BufferConsumptionStrategy] = ( + BufferConsumptionStrategy.EAGER + ) + video_source_properties: Optional[Dict[str, float]] = None + batch_collection_timeout: Optional[float] = None + + +class MemorySinkConfiguration(BaseModel): + type: Literal["MemorySinkConfiguration"] + results_buffer_size: int = 64 + + +class WorkflowConfiguration(BaseModel): + type: Literal["WorkflowConfiguration"] + workflow_specification: Optional[dict] = None + workspace_name: Optional[str] = None + workflow_id: Optional[str] = None + image_input_name: str = "image" + workflows_parameters: Optional[Dict[str, Any]] = None + workflows_thread_pool_workers: int = 4 + cancel_thread_pool_tasks_on_exit: bool = True + video_metadata_input_name: str = "video_metadata" + + +class InitialisePipelinePayload(BaseModel): + video_configuration: VideoConfiguration + processing_configuration: WorkflowConfiguration + sink_configuration: MemorySinkConfiguration = MemorySinkConfiguration( + type="MemorySinkConfiguration" + ) + api_key: Optional[str] = None + + +class ConsumeResultsPayload(BaseModel): + excluded_fields: List[str] = Field( + default_factory=list, + description="List of workflow output fields to be filtered out from response", + ) diff --git a/inference/core/interfaces/stream_manager/manager_app/errors.py b/inference/core/interfaces/stream_manager/manager_app/errors.py new file mode 100644 index 000000000..725e62aad --- /dev/null +++ b/inference/core/interfaces/stream_manager/manager_app/errors.py @@ -0,0 +1,44 @@ +from typing import Optional + + +class CommunicationProtocolError(Exception): + + def __init__( + self, + private_message: str, + public_message: Optional[str] = None, + inner_error: Optional[Exception] = None, + ): + super().__init__(private_message) + self._public_message = public_message + self._inner_error = inner_error + + @property + def public_message(self) -> str: + return self._public_message + + @property + def inner_error_type(self) -> Optional[str]: + if self._inner_error is None: + return None + return self._inner_error.__class__.__name__ + + @property + def inner_error(self) -> Optional[Exception]: + return self._inner_error + + +class MessageToBigError(CommunicationProtocolError): + pass + + +class MalformedHeaderError(CommunicationProtocolError): + pass + + +class TransmissionChannelClosed(CommunicationProtocolError): + pass + + +class MalformedPayloadError(CommunicationProtocolError): + pass diff --git a/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py b/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py new file mode 100644 index 000000000..2c01dcf57 --- /dev/null +++ b/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py @@ -0,0 +1,352 @@ +import os +import signal +from dataclasses import asdict +from multiprocessing import Process, Queue +from types import FrameType +from typing import Optional, Tuple + +from pydantic import ValidationError + +from inference.core import logger +from inference.core.exceptions import ( + MissingApiKeyError, + RoboflowAPINotAuthorizedError, + RoboflowAPINotNotFoundError, +) +from inference.core.interfaces.camera.exceptions import StreamOperationNotAllowedError +from inference.core.interfaces.http.orjson_utils import ( + serialise_single_workflow_result_element, + serialise_workflow_result, +) +from inference.core.interfaces.stream.inference_pipeline import InferencePipeline +from inference.core.interfaces.stream.sinks import InMemoryBufferSink +from inference.core.interfaces.stream.watchdog import ( + BasePipelineWatchDog, + PipelineWatchDog, +) +from inference.core.interfaces.stream_manager.manager_app.entities import ( + STATUS_KEY, + TYPE_KEY, + CommandType, + ErrorType, + InitialisePipelinePayload, + OperationStatus, +) +from inference.core.interfaces.stream_manager.manager_app.serialisation import ( + describe_error, +) + + +def ignore_signal(signal_number: int, frame: FrameType) -> None: + pid = os.getpid() + logger.info( + f"Ignoring signal {signal_number} in InferencePipelineManager in process:{pid}" + ) + + +class InferencePipelineManager(Process): + @classmethod + def init( + cls, pipeline_id: str, command_queue: Queue, responses_queue: Queue + ) -> "InferencePipelineManager": + return cls( + pipeline_id=pipeline_id, + command_queue=command_queue, + responses_queue=responses_queue, + ) + + def __init__(self, pipeline_id: str, command_queue: Queue, responses_queue: Queue): + super().__init__() + self._pipeline_id = pipeline_id + self._command_queue = command_queue + self._responses_queue = responses_queue + self._inference_pipeline: Optional[InferencePipeline] = None + self._watchdog: Optional[PipelineWatchDog] = None + self._stop = False + self._buffer_sink: Optional[InMemoryBufferSink] = None + + def run(self) -> None: + signal.signal(signal.SIGINT, ignore_signal) + signal.signal(signal.SIGTERM, self._handle_termination_signal) + while not self._stop: + command: Optional[Tuple[str, dict]] = self._command_queue.get() + if command is None: + break + request_id, payload = command + self._handle_command(request_id=request_id, payload=payload) + + def _handle_command(self, request_id: str, payload: dict) -> None: + try: + logger.info(f"Processing request={request_id}...") + command_type = CommandType(payload[TYPE_KEY]) + if command_type is CommandType.INIT: + return self._initialise_pipeline(request_id=request_id, payload=payload) + if command_type is CommandType.TERMINATE: + return self._terminate_pipeline(request_id=request_id) + if command_type is CommandType.MUTE: + return self._mute_pipeline(request_id=request_id) + if command_type is CommandType.RESUME: + return self._resume_pipeline(request_id=request_id) + if command_type is CommandType.STATUS: + return self._get_pipeline_status(request_id=request_id) + if command_type is CommandType.CONSUME_RESULT: + return self._consume_results(request_id=request_id, payload=payload) + raise NotImplementedError( + f"Command type `{command_type}` cannot be handled" + ) + except KeyError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Invalid command sent to InferencePipeline manager - malformed payload", + error_type=ErrorType.INVALID_PAYLOAD, + ) + except NotImplementedError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message=f"Invalid command sent to InferencePipeline manager - {error}", + error_type=ErrorType.INVALID_PAYLOAD, + ) + except Exception as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Unknown internal error. Raise this issue providing as " + "much of a context as possible: https://github.com/roboflow/inference/issues", + error_type=ErrorType.INTERNAL_ERROR, + ) + + def _initialise_pipeline(self, request_id: str, payload: dict) -> None: + try: + parsed_payload = InitialisePipelinePayload.model_validate(payload) + watchdog = BasePipelineWatchDog() + buffer_sink = InMemoryBufferSink.init( + queue_size=parsed_payload.sink_configuration.results_buffer_size, + ) + self._buffer_sink = buffer_sink + self._inference_pipeline = InferencePipeline.init_with_workflow( + video_reference=parsed_payload.video_configuration.video_reference, + workflow_specification=parsed_payload.processing_configuration.workflow_specification, + workspace_name=parsed_payload.processing_configuration.workspace_name, + workflow_id=parsed_payload.processing_configuration.workflow_id, + api_key=parsed_payload.api_key, + image_input_name=parsed_payload.processing_configuration.image_input_name, + workflows_parameters=parsed_payload.processing_configuration.workflows_parameters, + on_prediction=self._buffer_sink.on_prediction, + max_fps=parsed_payload.video_configuration.max_fps, + watchdog=watchdog, + source_buffer_filling_strategy=parsed_payload.video_configuration.source_buffer_filling_strategy, + source_buffer_consumption_strategy=parsed_payload.video_configuration.source_buffer_consumption_strategy, + video_source_properties=parsed_payload.video_configuration.video_source_properties, + workflows_thread_pool_workers=parsed_payload.processing_configuration.workflows_thread_pool_workers, + cancel_thread_pool_tasks_on_exit=parsed_payload.processing_configuration.cancel_thread_pool_tasks_on_exit, + video_metadata_input_name=parsed_payload.processing_configuration.video_metadata_input_name, + batch_collection_timeout=parsed_payload.video_configuration.batch_collection_timeout, + ) + self._watchdog = watchdog + self._inference_pipeline.start(use_main_thread=False) + self._responses_queue.put( + (request_id, {STATUS_KEY: OperationStatus.SUCCESS}) + ) + logger.info(f"Pipeline initialised. request_id={request_id}...") + except ( + ValidationError, + MissingApiKeyError, + KeyError, + NotImplementedError, + ) as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Could not decode InferencePipeline initialisation command payload.", + error_type=ErrorType.INVALID_PAYLOAD, + ) + except RoboflowAPINotAuthorizedError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Invalid API key used or API key is missing. " + "Visit https://docs.roboflow.com/api-reference/authentication#retrieve-an-api-key", + error_type=ErrorType.AUTHORISATION_ERROR, + ) + except RoboflowAPINotNotFoundError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Requested Roboflow resources (models / workflows etc.) not available or " + "wrong API key used.", + error_type=ErrorType.NOT_FOUND, + ) + + def _terminate_pipeline(self, request_id: str) -> None: + if self._inference_pipeline is None: + self._responses_queue.put( + (request_id, {STATUS_KEY: OperationStatus.SUCCESS}) + ) + self._stop = True + return None + try: + self._execute_termination() + logger.info(f"Pipeline terminated. request_id={request_id}...") + self._responses_queue.put( + (request_id, {STATUS_KEY: OperationStatus.SUCCESS}) + ) + except StreamOperationNotAllowedError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Cannot get pipeline status in the current state of InferencePipeline.", + error_type=ErrorType.OPERATION_ERROR, + ) + + def _handle_termination_signal(self, signal_number: int, frame: FrameType) -> None: + try: + pid = os.getpid() + logger.info(f"Terminating pipeline in process:{pid}...") + if self._inference_pipeline is not None: + self._execute_termination() + self._command_queue.put(None) + logger.info(f"Termination successful in process:{pid}...") + except Exception as error: + logger.warning(f"Could not terminate pipeline gracefully. Error: {error}") + + def _execute_termination(self) -> None: + self._inference_pipeline.terminate() + self._inference_pipeline.join() + self._stop = True + + def _mute_pipeline(self, request_id: str) -> None: + if self._inference_pipeline is None: + return self._handle_error( + request_id=request_id, + public_error_message="Cannot retrieve InferencePipeline status. Internal Error. Service misconfigured.", + error_type=ErrorType.OPERATION_ERROR, + ) + try: + self._inference_pipeline.mute_stream() + logger.info(f"Pipeline muted. request_id={request_id}...") + self._responses_queue.put( + (request_id, {STATUS_KEY: OperationStatus.SUCCESS}) + ) + except StreamOperationNotAllowedError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Cannot get pipeline status in the current state of InferencePipeline.", + error_type=ErrorType.OPERATION_ERROR, + ) + + def _resume_pipeline(self, request_id: str) -> None: + if self._inference_pipeline is None: + return self._handle_error( + request_id=request_id, + public_error_message="Cannot retrieve InferencePipeline status. Internal Error. Service misconfigured.", + error_type=ErrorType.OPERATION_ERROR, + ) + try: + self._inference_pipeline.resume_stream() + logger.info(f"Pipeline resumed. request_id={request_id}...") + self._responses_queue.put( + (request_id, {STATUS_KEY: OperationStatus.SUCCESS}) + ) + except StreamOperationNotAllowedError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Cannot get pipeline status in the current state of InferencePipeline.", + error_type=ErrorType.OPERATION_ERROR, + ) + + def _get_pipeline_status(self, request_id: str) -> None: + if self._watchdog is None: + return self._handle_error( + request_id=request_id, + error_type=ErrorType.OPERATION_ERROR, + public_error_message="Cannot retrieve InferencePipeline status. Internal Error. Service misconfigured.", + ) + try: + report = self._watchdog.get_report() + if report is None: + return self._handle_error( + request_id=request_id, + error_type=ErrorType.OPERATION_ERROR, + public_error_message="Cannot retrieve InferencePipeline status. Try again later.", + ) + response_payload = { + STATUS_KEY: OperationStatus.SUCCESS, + "report": asdict(report), + } + self._responses_queue.put((request_id, response_payload)) + logger.info(f"Pipeline status returned. request_id={request_id}...") + except StreamOperationNotAllowedError as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Cannot get pipeline status in the current state of InferencePipeline.", + error_type=ErrorType.OPERATION_ERROR, + ) + + def _consume_results(self, request_id: str, payload: dict) -> None: + try: + if self._buffer_sink.empty(): + response_payload = { + STATUS_KEY: OperationStatus.SUCCESS, + "outputs": [], + "frames_metadata": [], + } + self._responses_queue.put((request_id, response_payload)) + return None + excluded_fields = payload.get("excluded_fields") + predictions, frames = self._buffer_sink.consume_prediction() + predictions = [ + ( + serialise_single_workflow_result_element( + result_element=result_element, + excluded_fields=excluded_fields, + ) + if result_element is not None + else None + ) + for result_element in predictions + ] + frames_metadata = [] + for frame in frames: + if frame is None: + frames_metadata.append(None) + else: + frames_metadata.append( + { + "frame_timestamp": frame.frame_timestamp.isoformat(), + "frame_id": frame.frame_id, + "source_id": frame.source_id, + } + ) + response_payload = { + STATUS_KEY: OperationStatus.SUCCESS, + "outputs": predictions, + "frames_metadata": frames_metadata, + } + self._responses_queue.put((request_id, response_payload)) + except Exception as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Unexpected error with InferencePipeline results consumption.", + error_type=ErrorType.OPERATION_ERROR, + ) + + def _handle_error( + self, + request_id: str, + error: Optional[Exception] = None, + public_error_message: Optional[str] = None, + error_type: ErrorType = ErrorType.INTERNAL_ERROR, + ): + logger.exception( + f"Could not handle Command. request_id={request_id}, error={error}, error_type={error_type}" + ) + response_payload = describe_error( + error, error_type=error_type, public_error_message=public_error_message + ) + self._responses_queue.put((request_id, response_payload)) diff --git a/inference/core/interfaces/stream_manager/manager_app/serialisation.py b/inference/core/interfaces/stream_manager/manager_app/serialisation.py new file mode 100644 index 000000000..46ca8e8aa --- /dev/null +++ b/inference/core/interfaces/stream_manager/manager_app/serialisation.py @@ -0,0 +1,63 @@ +import json +from datetime import date, datetime +from enum import Enum +from typing import Any, Optional + +from inference.core.interfaces.stream_manager.manager_app.entities import ( + ENCODING, + ERROR_TYPE_KEY, + PIPELINE_ID_KEY, + REQUEST_ID_KEY, + RESPONSE_KEY, + STATUS_KEY, + ErrorType, + OperationStatus, +) + + +def serialise_to_json(obj: Any) -> Any: + if isinstance(obj, (datetime, date)): + return obj.isoformat() + if issubclass(type(obj), Enum): + return obj.value + raise TypeError(f"Type {type(obj)} not serializable") + + +def describe_error( + exception: Optional[Exception] = None, + error_type: ErrorType = ErrorType.INTERNAL_ERROR, + public_error_message: Optional[str] = None, +) -> dict: + payload = { + STATUS_KEY: OperationStatus.FAILURE, + ERROR_TYPE_KEY: error_type, + } + if exception is not None: + payload["error_class"] = exception.__class__.__name__ + payload["error_message"] = str(exception) + if public_error_message is not None: + payload["public_error_message"] = public_error_message + return payload + + +def prepare_error_response( + request_id: str, error: Exception, error_type: ErrorType, pipeline_id: Optional[str] +) -> bytes: + error_description = describe_error(exception=error, error_type=error_type) + return prepare_response( + request_id=request_id, response=error_description, pipeline_id=pipeline_id + ) + + +def prepare_response( + request_id: str, response: dict, pipeline_id: Optional[str] +) -> bytes: + payload = json.dumps( + { + REQUEST_ID_KEY: request_id, + RESPONSE_KEY: response, + PIPELINE_ID_KEY: pipeline_id, + }, + default=serialise_to_json, + ) + return payload.encode(ENCODING) diff --git a/inference/core/interfaces/stream_manager/manager_app/tcp_server.py b/inference/core/interfaces/stream_manager/manager_app/tcp_server.py new file mode 100644 index 000000000..5f951b1b6 --- /dev/null +++ b/inference/core/interfaces/stream_manager/manager_app/tcp_server.py @@ -0,0 +1,19 @@ +import socket +from socketserver import BaseRequestHandler, TCPServer +from typing import Any, Optional, Tuple, Type + + +class RoboflowTCPServer(TCPServer): + def __init__( + self, + server_address: Tuple[str, int], + handler_class: Type[BaseRequestHandler], + socket_operations_timeout: Optional[float] = None, + ): + TCPServer.__init__(self, server_address, handler_class) + self._socket_operations_timeout = socket_operations_timeout + + def get_request(self) -> Tuple[socket.socket, Any]: + connection, address = self.socket.accept() + connection.settimeout(self._socket_operations_timeout) + return connection, address diff --git a/inference/core/version.py b/inference/core/version.py index d124e3d98..1c73c483d 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "0.20.1" +__version__ = "0.21.0" if __name__ == "__main__": diff --git a/inference_cli/lib/container_adapter.py b/inference_cli/lib/container_adapter.py index 55349d4a4..82748e28c 100644 --- a/inference_cli/lib/container_adapter.py +++ b/inference_cli/lib/container_adapter.py @@ -1,7 +1,9 @@ +import os import subprocess from typing import Dict, List, Optional, Union import typer +from docker.errors import ImageNotFound from docker.models.containers import Container from rich.progress import Progress, TaskID @@ -84,6 +86,9 @@ def find_running_inference_containers() -> List[Container]: def get_image() -> str: + jetpack_version = os.getenv("JETSON_JETPACK") + if jetpack_version: + return _get_jetpack_image(jetpack_version=jetpack_version) try: subprocess.check_output("nvidia-smi") print("GPU detected. Using a GPU image.") @@ -93,6 +98,16 @@ def get_image() -> str: return "roboflow/roboflow-inference-server-cpu:latest" +def _get_jetpack_image(jetpack_version: str) -> str: + if jetpack_version.startswith("4.5"): + return "roboflow/roboflow-inference-server-jetson-4.5.0:latest" + if jetpack_version.startswith("4.6"): + return "roboflow/roboflow-inference-server-jetson-4.6.1:latest" + if jetpack_version.startswith("5.1"): + return "roboflow/roboflow-inference-server-jetson-5.1.1:latest" + raise RuntimeError(f"Jetpack version: {jetpack_version} not supported") + + def start_inference_container( image: Optional[str] = None, port: int = 9001, @@ -104,6 +119,7 @@ def start_inference_container( api_key: Optional[str] = None, env_file_path: Optional[str] = None, development: bool = False, + use_local_images: bool = False, ) -> None: containers = find_running_inference_containers() if len(containers) > 0: @@ -117,11 +133,15 @@ def start_inference_container( device_requests = None privileged = False + docker_run_kwargs = {} if "gpu" in image: privileged = True device_requests = [ docker.types.DeviceRequest(device_ids=["all"], capabilities=[["gpu"]]) ] + if "jetson" in image: + privileged = True + docker_run_kwargs = {"runtime": "nvidia"} environment = prepare_container_environment( port=port, project=project, @@ -132,7 +152,7 @@ def start_inference_container( env_file_path=env_file_path, development=development, ) - pull_image(image) + pull_image(image, use_local_images=use_local_images) print(f"Starting inference server container...") ports = {"9001": port} if development: @@ -146,6 +166,7 @@ def start_inference_container( ports=ports, device_requests=device_requests, environment=environment, + **docker_run_kwargs, ) @@ -213,10 +234,17 @@ def check_inference_server_status(): print("No inference server container running.") -def pull_image(image: str) -> None: +def pull_image(image: str, use_local_images: bool = False) -> None: docker_client = docker.from_env() - print(f"Pulling image: {image}") progress_tasks = {} + try: + _ = docker_client.images.get(image) + if use_local_images: + print(f"Using locally cached image: {use_local_images}") + return None + except ImageNotFound: + pass + print(f"Pulling image: {image}") with Progress() as progress: logs_stream = docker_client.api.pull(image, stream=True, decode=True) for line in logs_stream: diff --git a/inference_cli/server.py b/inference_cli/server.py index 8ea1d800a..d5c442df4 100644 --- a/inference_cli/server.py +++ b/inference_cli/server.py @@ -67,6 +67,21 @@ def start( "https://.roboflow.run endpoint", ), ] = False, + image: Annotated[ + Optional[str], + typer.Option( + "--image", + help="Point specific docker image you would like to run with command (useful for development of custom " + "builds of inference server)", + ), + ] = None, + use_local_images: Annotated[ + bool, + typer.Option( + "--use-local-images/--not-use-local-images", + help="Flag to allow using local images (if set False image is always attempted to be pulled)", + ), + ] = False, ) -> None: try: @@ -77,11 +92,13 @@ def start( try: start_inference_container( + image=image, port=port, project=rf_env, env_file_path=env_file_path, development=development, api_key=api_key, + use_local_images=use_local_images, ) except Exception as container_error: typer.echo(container_error) diff --git a/inference_sdk/http/client.py b/inference_sdk/http/client.py index 109b1ae69..f036b00da 100644 --- a/inference_sdk/http/client.py +++ b/inference_sdk/http/client.py @@ -1,5 +1,5 @@ from contextlib import contextmanager -from typing import Any, Dict, Generator, List, Optional, Tuple, Union +from typing import Any, Dict, Generator, List, Literal, Optional, Tuple, Union import aiohttp import numpy as np @@ -65,7 +65,7 @@ deduct_api_key_from_string, inject_images_into_payload, ) -from inference_sdk.utils.decorators import deprecated +from inference_sdk.utils.decorators import deprecated, experimental SUCCESSFUL_STATUS_CODE = 200 DEFAULT_HEADERS = { @@ -79,6 +79,11 @@ } CLIP_ARGUMENT_TYPES = {"image", "text"} +BufferFillingStrategy = Literal[ + "WAIT", "DROP_OLDEST", "ADAPTIVE_DROP_OLDEST", "DROP_LATEST", "ADAPTIVE_DROP_LATEST" +] +BufferConsumptionStrategy = Literal["LAZY", "EAGER"] + def wrap_errors(function: callable) -> callable: def decorate(*args, **kwargs) -> Any: @@ -1239,6 +1244,165 @@ async def infer_from_yolo_world_async( max_concurrent_requests=self.__inference_configuration.max_concurrent_requests, ) + @experimental( + info="Video processing in inference server is under development. Breaking changes are possible." + ) + @wrap_errors + def start_inference_pipeline_with_workflow( + self, + video_reference: Union[str, int, List[Union[str, int]]], + workflow_specification: Optional[dict] = None, + workspace_name: Optional[str] = None, + workflow_id: Optional[str] = None, + image_input_name: str = "image", + workflows_parameters: Optional[Dict[str, Any]] = None, + workflows_thread_pool_workers: int = 4, + cancel_thread_pool_tasks_on_exit: bool = True, + video_metadata_input_name: str = "video_metadata", + max_fps: Optional[Union[float, int]] = None, + source_buffer_filling_strategy: Optional[BufferFillingStrategy] = "DROP_OLDEST", + source_buffer_consumption_strategy: Optional[ + BufferConsumptionStrategy + ] = "EAGER", + video_source_properties: Optional[Dict[str, float]] = None, + batch_collection_timeout: Optional[float] = None, + results_buffer_size: int = 64, + ) -> dict: + named_workflow_specified = (workspace_name is not None) and ( + workflow_id is not None + ) + if not (named_workflow_specified != (workflow_specification is not None)): + raise InvalidParameterError( + "Parameters (`workspace_name`, `workflow_id`) can be used mutually exclusive with " + "`workflow_specification`, but at least one must be set." + ) + payload = { + "api_key": self.__api_key, + "video_configuration": { + "type": "VideoConfiguration", + "video_reference": video_reference, + "max_fps": max_fps, + "source_buffer_filling_strategy": source_buffer_filling_strategy, + "source_buffer_consumption_strategy": source_buffer_consumption_strategy, + "video_source_properties": video_source_properties, + "batch_collection_timeout": batch_collection_timeout, + }, + "processing_configuration": { + "type": "WorkflowConfiguration", + "workflow_specification": workflow_specification, + "workspace_name": workspace_name, + "workflow_id": workflow_id, + "image_input_name": image_input_name, + "workflows_parameters": workflows_parameters, + "workflows_thread_pool_workers": workflows_thread_pool_workers, + "cancel_thread_pool_tasks_on_exit": cancel_thread_pool_tasks_on_exit, + "video_metadata_input_name": video_metadata_input_name, + }, + "sink_configuration": { + "type": "MemorySinkConfiguration", + "results_buffer_size": results_buffer_size, + }, + } + response = requests.post( + f"{self.__api_url}/inference_pipelines/initialise", + json=payload, + ) + response.raise_for_status() + return response.json() + + @experimental( + info="Video processing in inference server is under development. Breaking changes are possible." + ) + @wrap_errors + def list_inference_pipelines(self) -> List[dict]: + payload = {"api_key": self.__api_key} + response = requests.get( + f"{self.__api_url}/inference_pipelines/list", + json=payload, + ) + api_key_safe_raise_for_status(response=response) + return response.json() + + @experimental( + info="Video processing in inference server is under development. Breaking changes are possible." + ) + @wrap_errors + def get_inference_pipeline_status(self, pipeline_id: str) -> dict: + self._ensure_pipeline_id_not_empty(pipeline_id=pipeline_id) + payload = {"api_key": self.__api_key} + response = requests.get( + f"{self.__api_url}/inference_pipelines/{pipeline_id}/status", + json=payload, + ) + api_key_safe_raise_for_status(response=response) + return response.json() + + @experimental( + info="Video processing in inference server is under development. Breaking changes are possible." + ) + @wrap_errors + def pause_inference_pipeline(self, pipeline_id: str) -> dict: + self._ensure_pipeline_id_not_empty(pipeline_id=pipeline_id) + payload = {"api_key": self.__api_key} + response = requests.post( + f"{self.__api_url}/inference_pipelines/{pipeline_id}/pause", + json=payload, + ) + api_key_safe_raise_for_status(response=response) + return response.json() + + @experimental( + info="Video processing in inference server is under development. Breaking changes are possible." + ) + @wrap_errors + def resume_inference_pipeline(self, pipeline_id: str) -> dict: + self._ensure_pipeline_id_not_empty(pipeline_id=pipeline_id) + payload = {"api_key": self.__api_key} + response = requests.post( + f"{self.__api_url}/inference_pipelines/{pipeline_id}/resume", + json=payload, + ) + api_key_safe_raise_for_status(response=response) + return response.json() + + @experimental( + info="Video processing in inference server is under development. Breaking changes are possible." + ) + @wrap_errors + def terminate_inference_pipeline(self, pipeline_id: str) -> dict: + self._ensure_pipeline_id_not_empty(pipeline_id=pipeline_id) + payload = {"api_key": self.__api_key} + response = requests.post( + f"{self.__api_url}/inference_pipelines/{pipeline_id}/terminate", + json=payload, + ) + api_key_safe_raise_for_status(response=response) + return response.json() + + @experimental( + info="Video processing in inference server is under development. Breaking changes are possible." + ) + @wrap_errors + def consume_inference_pipeline_result( + self, + pipeline_id: str, + excluded_fields: Optional[List[str]] = None, + ) -> dict: + self._ensure_pipeline_id_not_empty(pipeline_id=pipeline_id) + if excluded_fields is None: + excluded_fields = [] + payload = {"api_key": self.__api_key, "excluded_fields": excluded_fields} + response = requests.get( + f"{self.__api_url}/inference_pipelines/{pipeline_id}/consume", + json=payload, + ) + api_key_safe_raise_for_status(response=response) + return response.json() + + def _ensure_pipeline_id_not_empty(self, pipeline_id: str) -> None: + if not pipeline_id: + raise InvalidParameterError("Empty `pipeline_id` parameter detected") + def _post_images( self, inference_input: Union[ImagesReference, List[ImagesReference]], diff --git a/inference_sdk/utils/decorators.py b/inference_sdk/utils/decorators.py index d2e854dc4..13580aa19 100644 --- a/inference_sdk/utils/decorators.py +++ b/inference_sdk/utils/decorators.py @@ -18,3 +18,19 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +def experimental(info: str): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + warnings.warn( + f"{func.__name__} is experimental: {info}", + category=InferenceSDKDeprecationWarning, + stacklevel=2, + ) + return func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/mkdocs.yml b/mkdocs.yml index 0849a7862..c4a421acc 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -77,6 +77,8 @@ nav: - Blocks Overview: workflows/blocks_connections.md - Blocks Gallery: workflows/blocks.md - Examples: workflows/gallery_index.md + - Video Processing: workflows/video_processing/overview.md + - User Guide: - Running Workflows: workflows/modes_of_running.md - Workflows Definitions: workflows/definitions.md diff --git a/tests/inference/integration_tests/test_video_processing_endpoints.py b/tests/inference/integration_tests/test_video_processing_endpoints.py new file mode 100644 index 000000000..d821a2fd4 --- /dev/null +++ b/tests/inference/integration_tests/test_video_processing_endpoints.py @@ -0,0 +1,18 @@ +import os + +import requests + +API_KEY = os.environ.get("API_KEY") + + +def test_list_pipeline_endpoint_being_enabled(server_url: str) -> None: + # when + response = requests.get( + f"{server_url}/inference_pipelines/list", + json={ + "api_key": API_KEY, + } + ) + + # then + response.raise_for_status() diff --git a/tests/inference/unit_tests/core/interfaces/stream/test_sinks.py b/tests/inference/unit_tests/core/interfaces/stream/test_sinks.py index 5b6292508..34689a3e3 100644 --- a/tests/inference/unit_tests/core/interfaces/stream/test_sinks.py +++ b/tests/inference/unit_tests/core/interfaces/stream/test_sinks.py @@ -14,6 +14,7 @@ from inference.core.interfaces.camera.entities import VideoFrame from inference.core.interfaces.stream.sinks import ( ImageWithSourceID, + InMemoryBufferSink, UDPSink, active_learning_sink, multi_sink, @@ -394,3 +395,71 @@ def test_active_learning_sink_with_batch_input() -> None: prediction_type="object-detection", disable_preproc_auto_orient=False, ) + + +def test_in_memory_buffer_sink_for_singular_input() -> None: + # given + sink = InMemoryBufferSink.init(queue_size=2) + video_frames = [ + VideoFrame( + image=np.ones((128, 128, 3), dtype=np.uint8) * 255, + frame_id=i, + frame_timestamp=datetime.now(), + ) + for i in range(3) + ] + + # when + sink.on_prediction(predictions={"some": 1}, video_frame=video_frames[0]) + sink.on_prediction(predictions={"some": 2}, video_frame=video_frames[1]) + sink.on_prediction(predictions={"some": 3}, video_frame=video_frames[2]) + + result_1 = sink.consume_prediction() + result_2 = sink.consume_prediction() + empty_status = sink.empty() + + # then + assert result_1[0] == [ + {"some": 2} + ], "Expected to be second dict wrapped in list (first to be lost by queue size)" + assert result_2[0] == [ + {"some": 3} + ], "Expected to be third dict wrapped in list (first to be lost by queue size)" + assert empty_status is True, "Expected buffer to be purged during test" + + +def test_in_memory_buffer_sink_for_batch_input() -> None: + # given + sink = InMemoryBufferSink.init(queue_size=2) + video_frames = [ + [ + VideoFrame( + image=np.ones((128, 128, 3), dtype=np.uint8) * 255, + frame_id=i, + frame_timestamp=datetime.now(), + source_id=0, + ), + None, + ] + for i in range(3) + ] + + # when + sink.on_prediction(predictions=[{"some": 1}, None], video_frame=video_frames[0]) + sink.on_prediction(predictions=[{"some": 2}, None], video_frame=video_frames[1]) + sink.on_prediction(predictions=[{"some": 3}, None], video_frame=video_frames[2]) + + result_1 = sink.consume_prediction() + result_2 = sink.consume_prediction() + empty_status = sink.empty() + + # then + assert result_1[0] == [ + {"some": 2}, + None, + ], "Expected to be second dict wrapped in list (first to be lost by queue size)" + assert result_2[0] == [ + {"some": 3}, + None, + ], "Expected to be third dict wrapped in list (first to be lost by queue size)" + assert empty_status is True, "Expected buffer to be purged during test" diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/__init__.py b/tests/inference/unit_tests/core/interfaces/stream_manager/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/api/__init__.py b/tests/inference/unit_tests/core/interfaces/stream_manager/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/api/test_stream_manager_client.py b/tests/inference/unit_tests/core/interfaces/stream_manager/api/test_stream_manager_client.py new file mode 100644 index 000000000..5bddf64fc --- /dev/null +++ b/tests/inference/unit_tests/core/interfaces/stream_manager/api/test_stream_manager_client.py @@ -0,0 +1,732 @@ +import asyncio +import json +from typing import Type +from unittest import mock +from unittest.mock import AsyncMock + +import pytest + +from inference.core.interfaces.stream_manager.api import stream_manager_client +from inference.core.interfaces.stream_manager.api.entities import ( + CommandContext, + CommandResponse, + InferencePipelineStatusResponse, + ListPipelinesResponse, +) +from inference.core.interfaces.stream_manager.api.errors import ( + ConnectivityError, + ProcessesManagerAuthorisationError, + ProcessesManagerClientError, + ProcessesManagerInternalError, + ProcessesManagerInvalidPayload, + ProcessesManagerNotFoundError, + ProcessesManagerOperationError, +) +from inference.core.interfaces.stream_manager.api.stream_manager_client import ( + StreamManagerClient, + build_response, + dispatch_error, + is_request_unsuccessful, + receive_message, + send_command, + send_message, +) +from inference.core.interfaces.stream_manager.manager_app.entities import ( + CommandType, + InitialisePipelinePayload, + VideoConfiguration, + WorkflowConfiguration, +) +from inference.core.interfaces.stream_manager.manager_app.errors import ( + CommunicationProtocolError, + MalformedHeaderError, + MalformedPayloadError, + MessageToBigError, + TransmissionChannelClosed, +) + + +def test_build_response_when_all_optional_fields_are_filled() -> None: + # given + response = { + "response": {"status": "failure"}, + "request_id": "my_request", + "pipeline_id": "my_pipeline", + } + + # when + result = build_response(response=response) + + # then + assert result == CommandResponse( + status="failure", + context=CommandContext(request_id="my_request", pipeline_id="my_pipeline"), + ), "Assembled response must indicate failure and context with request id and pipeline id denoted" + + +def test_build_response_when_all_optional_fields_are_missing() -> None: + # given + response = { + "response": {"status": "failure"}, + } + + # when + result = build_response(response=response) + + # then + assert result == CommandResponse( + status="failure", + context=CommandContext(request_id=None, pipeline_id=None), + ), "Assembled response must indicate failure and empty context" + + +@pytest.mark.parametrize( + "error_type, expected_error", + [ + ("internal_error", ProcessesManagerInternalError), + ("invalid_payload", ProcessesManagerInvalidPayload), + ("not_found", ProcessesManagerNotFoundError), + ("operation_error", ProcessesManagerOperationError), + ("authorisation_error", ProcessesManagerAuthorisationError), + ], +) +def test_dispatch_error_when_known_error_is_detected( + error_type: str, expected_error: Type[Exception] +) -> None: + # given + error_response = { + "response": { + "status": "failure", + "error_type": error_type, + } + } + + # when + with pytest.raises(expected_error): + dispatch_error(error_response=error_response) + + +def test_dispatch_error_when_unknown_error_is_detected() -> None: + # given + error_response = { + "response": { + "status": "failure", + "error_type": "unknown", + } + } + + # when + with pytest.raises(ProcessesManagerClientError): + dispatch_error(error_response=error_response) + + +def test_dispatch_error_when_malformed_payload_is_detected() -> None: + # given + error_response = {"response": {"status": "failure"}} + + # when + with pytest.raises(ProcessesManagerClientError): + dispatch_error(error_response=error_response) + + +def test_is_request_unsuccessful_when_successful_response_given() -> None: + # given + response = {"response": {"status": "success"}} + + # when + result = is_request_unsuccessful(response=response) + + # then + assert ( + result is False + ), "Success status denoted should be assumed as sign of request success" + + +def test_is_request_unsuccessful_when_unsuccessful_response_given() -> None: + # given + error_response = { + "response": { + "status": "failure", + "error_type": "not_found", + } + } + + # when + result = is_request_unsuccessful(response=error_response) + + # then + assert result is True, "Explicitly failed response is indication of failed response" + + +def test_is_request_unsuccessful_when_malformed_response_given() -> None: + # given + response = {"response": {"some": "data"}} + + # when + result = is_request_unsuccessful(response=response) + + # then + assert ( + result is True + ), "When success is not clearly demonstrated - failure is to be assumed" + + +class DummyStreamReader: + def __init__(self, read_buffer_content: bytes): + self._read_buffer_content = read_buffer_content + + async def read(self, n: int = -1) -> bytes: + if n == -1: + n = len(self._read_buffer_content) + to_return = self._read_buffer_content[:n] + self._read_buffer_content = self._read_buffer_content[n:] + return to_return + + +@pytest.mark.asyncio +async def test_receive_message_when_malformed_header_sent() -> None: + # given + header = 3 + reader = DummyStreamReader( + read_buffer_content=header.to_bytes(length=1, byteorder="big") + ) + + # when + with pytest.raises(MalformedHeaderError): + _ = await receive_message(reader=reader, header_size=4, buffer_size=512) + + +@pytest.mark.asyncio +async def test_receive_message_when_payload_to_be_read_in_single_piece() -> None: + # given + data = b"DO OR NOT DO, THERE IS NO TRY" + payload = len(data).to_bytes(length=4, byteorder="big") + data + reader = DummyStreamReader(read_buffer_content=payload) + + # when + result = await receive_message( + reader=reader, header_size=4, buffer_size=len(payload) + ) + + # then + assert ( + result == b"DO OR NOT DO, THERE IS NO TRY" + ), "Result must be exact to the data in payload" + + +@pytest.mark.asyncio +async def test_receive_message_when_payload_to_be_read_in_multiple_pieces() -> None: + # given + data = b"DO OR NOT DO, THERE IS NO TRY" + payload = len(data).to_bytes(length=4, byteorder="big") + data + reader = DummyStreamReader(read_buffer_content=payload) + + # when + result = await receive_message(reader=reader, header_size=4, buffer_size=1) + + # then + assert ( + result == b"DO OR NOT DO, THERE IS NO TRY" + ), "Result must be exact to the data in payload" + + +@pytest.mark.asyncio +async def test_receive_message_when_not_all_declared_bytes_received() -> None: + # given + data = b"DO OR NOT DO, THERE IS NO TRY" + payload = len(data).to_bytes(length=4, byteorder="big") + data[:5] + reader = DummyStreamReader(read_buffer_content=payload) + + # when + with pytest.raises(TransmissionChannelClosed): + _ = await receive_message(reader=reader, header_size=4, buffer_size=1) + + +@pytest.mark.asyncio +async def test_send_message_when_content_cannot_be_serialised() -> None: + # given + writer = AsyncMock() + + # when + with pytest.raises(MalformedPayloadError): + await send_message(writer=writer, message=set([1, 2, 3]), header_size=4) + + +@pytest.mark.asyncio +async def test_send_message_when_message_is_to_long_up_to_header_length() -> None: + # given + writer = AsyncMock() + message = {"data": [i for i in range(1024)]} + + # when + with pytest.raises(MessageToBigError): + await send_message(writer=writer, message=message, header_size=1) + + +@pytest.mark.asyncio +async def test_send_message_when_communication_problem_arises() -> None: + # given + writer = AsyncMock() + writer.drain.side_effect = IOError() + message = {"data": "some"} + + # when + with pytest.raises(CommunicationProtocolError): + await send_message(writer=writer, message=message, header_size=4) + + +@pytest.mark.asyncio +async def test_send_message_when_communication_succeeds() -> None: + # given + writer = AsyncMock() + message = {"data": "some"} + serialised_message = json.dumps(message).encode("utf-8") + expected_payload = ( + len(serialised_message).to_bytes(length=4, byteorder="big") + serialised_message + ) + + # when + await send_message(writer=writer, message=message, header_size=4) + + # then + writer.write.assert_called_once_with(expected_payload) + + +class DummyStreamWriter: + def __init__(self, operation_delay: float = 0.0): + self._write_buffer_content = b"" + self._operation_delay = operation_delay + + def get_content(self) -> bytes: + return self._write_buffer_content + + def write(self, payload: bytes) -> None: + self._write_buffer_content += payload + + async def drain(self) -> None: + await asyncio.sleep(self._operation_delay) + + def close(self) -> None: + pass + + async def wait_closed(self) -> None: + await asyncio.sleep(self._operation_delay) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_send_command_when_connectivity_problem_arises( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + establish_socket_connection_mock.side_effect = ConnectionError() + + # when + with pytest.raises(ConnectivityError): + _ = await send_command( + host="127.0.0.1", + port=7070, + command={}, + header_size=4, + buffer_size=16438, + timeout=0.1, + ) + + +@pytest.mark.asyncio +@pytest.mark.timeout(30) +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_send_command_when_timeout_is_raised( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = DummyStreamReader(read_buffer_content=b"") + establish_socket_connection_mock.return_value = ( + reader, + DummyStreamWriter(operation_delay=1.0), + ) + + # when + with pytest.raises(ConnectivityError): + _ = await send_command( + host="127.0.0.1", + port=7070, + command={}, + header_size=4, + buffer_size=16438, + timeout=0.1, + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_send_command_when_communication_successful( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={"response": {"status": "success"}}, header_size=4 + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + command = { + "type": CommandType.TERMINATE, + "pipeline_id": "my_pipeline", + } + + # when + result = await send_command( + host="127.0.0.1", port=7070, command=command, header_size=4, buffer_size=16438 + ) + + # then + assert result == {"response": {"status": "success"}} + assert_correct_command_sent( + writer=writer, + command=command, + header_size=4, + message="Expected to send termination command successfully", + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_send_command_when_response_payload_could_not_be_decoded( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + response_message = b"FOR SURE NOT A JSON" + response_payload = ( + len(response_message).to_bytes(length=4, byteorder="big") + response_message + ) + reader = DummyStreamReader(read_buffer_content=response_payload) + establish_socket_connection_mock.return_value = ( + reader, + DummyStreamWriter(operation_delay=1.0), + ) + command = { + "type": CommandType.TERMINATE, + "pipeline_id": "my_pipeline", + } + + # when + with pytest.raises(MalformedPayloadError): + _ = await send_command( + host="127.0.0.1", + port=7070, + command=command, + header_size=4, + buffer_size=16438, + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_stream_manager_client_can_successfully_list_pipelines( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={ + "request_id": "my_request", + "response": {"status": "success", "pipelines": ["a", "b", "c"]}, + }, + header_size=4, + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + expected_command = {"type": CommandType.LIST_PIPELINES} + client = StreamManagerClient.init( + host="127.0.0.1", + port=7070, + operations_timeout=1.0, + header_size=4, + buffer_size=16438, + ) + + # when + result = await client.list_pipelines() + + # then + assert result == ListPipelinesResponse( + status="success", + context=CommandContext(request_id="my_request", pipeline_id=None), + pipelines=["a", "b", "c"], + ) + assert_correct_command_sent( + writer=writer, + command=expected_command, + header_size=4, + message="Expected list pipelines command to be sent", + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_stream_manager_client_can_successfully_initialise_pipeline( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={ + "request_id": "my_request", + "pipeline_id": "new_pipeline", + "response": {"status": "success"}, + }, + header_size=4, + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + initialisation_request = InitialisePipelinePayload( + video_configuration=VideoConfiguration( + type="VideoConfiguration", + video_reference="rtsp://128.0.0.1", + ), + processing_configuration=WorkflowConfiguration( + type="WorkflowConfiguration", + workspace_name="some", + workflow_id="other", + ), + api_key="", + ) + client = StreamManagerClient.init( + host="127.0.0.1", + port=7070, + operations_timeout=1.0, + header_size=4, + buffer_size=16438, + ) + + # when + result = await client.initialise_pipeline( + initialisation_request=initialisation_request + ) + + # then + assert result == CommandResponse( + status="success", + context=CommandContext(request_id="my_request", pipeline_id="new_pipeline"), + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_stream_manager_client_can_successfully_terminate_pipeline( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={ + "request_id": "my_request", + "pipeline_id": "my_pipeline", + "response": {"status": "success"}, + }, + header_size=4, + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + expected_command = {"type": CommandType.TERMINATE, "pipeline_id": "my_pipeline"} + client = StreamManagerClient.init( + host="127.0.0.1", + port=7070, + operations_timeout=1.0, + header_size=4, + buffer_size=16438, + ) + + # when + result = await client.terminate_pipeline(pipeline_id="my_pipeline") + + # then + assert result == CommandResponse( + status="success", + context=CommandContext(request_id="my_request", pipeline_id="my_pipeline"), + ) + assert_correct_command_sent( + writer=writer, + command=expected_command, + header_size=4, + message="Expected termination command to be sent", + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_stream_manager_client_can_successfully_pause_pipeline( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={ + "request_id": "my_request", + "pipeline_id": "my_pipeline", + "response": {"status": "success"}, + }, + header_size=4, + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + expected_command = {"type": CommandType.MUTE, "pipeline_id": "my_pipeline"} + client = StreamManagerClient.init( + host="127.0.0.1", + port=7070, + operations_timeout=1.0, + header_size=4, + buffer_size=16438, + ) + + # when + result = await client.pause_pipeline(pipeline_id="my_pipeline") + + # then + assert result == CommandResponse( + status="success", + context=CommandContext(request_id="my_request", pipeline_id="my_pipeline"), + ) + assert_correct_command_sent( + writer=writer, + command=expected_command, + header_size=4, + message="Expected pause command to be sent", + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_stream_manager_client_can_successfully_resume_pipeline( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={ + "request_id": "my_request", + "pipeline_id": "my_pipeline", + "response": {"status": "success"}, + }, + header_size=4, + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + expected_command = {"type": CommandType.RESUME, "pipeline_id": "my_pipeline"} + client = StreamManagerClient.init( + host="127.0.0.1", + port=7070, + operations_timeout=1.0, + header_size=4, + buffer_size=16438, + ) + + # when + result = await client.resume_pipeline(pipeline_id="my_pipeline") + + # then + assert result == CommandResponse( + status="success", + context=CommandContext(request_id="my_request", pipeline_id="my_pipeline"), + ) + assert_correct_command_sent( + writer=writer, + command=expected_command, + header_size=4, + message="Expected resume command to be sent", + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_stream_manager_client_can_successfully_get_pipeline_status( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={ + "request_id": "my_request", + "pipeline_id": "my_pipeline", + "response": {"status": "success", "report": {"my": "report"}}, + }, + header_size=4, + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + expected_command = {"type": CommandType.STATUS, "pipeline_id": "my_pipeline"} + client = StreamManagerClient.init( + host="127.0.0.1", + port=7070, + operations_timeout=1.0, + header_size=4, + buffer_size=16438, + ) + + # when + result = await client.get_status(pipeline_id="my_pipeline") + + # then + assert result == InferencePipelineStatusResponse( + status="success", + context=CommandContext(request_id="my_request", pipeline_id="my_pipeline"), + report={"my": "report"}, # this is mock data + ) + assert_correct_command_sent( + writer=writer, + command=expected_command, + header_size=4, + message="Expected get info command to be sent", + ) + + +@pytest.mark.asyncio +@mock.patch.object(stream_manager_client, "establish_socket_connection") +async def test_stream_manager_client_can_dispatch_error_response( + establish_socket_connection_mock: AsyncMock, +) -> None: + # given + reader = assembly_socket_reader( + message={ + "request_id": "my_request", + "pipeline_id": "my_pipeline", + "response": {"status": "failure", "error_type": "not_found"}, + }, + header_size=4, + ) + writer = DummyStreamWriter() + establish_socket_connection_mock.return_value = (reader, writer) + expected_command = {"type": CommandType.RESUME, "pipeline_id": "my_pipeline"} + client = StreamManagerClient.init( + host="127.0.0.1", + port=7070, + operations_timeout=1.0, + header_size=4, + buffer_size=16438, + ) + + # when + with pytest.raises(ProcessesManagerNotFoundError): + _ = await client.resume_pipeline(pipeline_id="my_pipeline") + + # then + + assert_correct_command_sent( + writer=writer, + command=expected_command, + header_size=4, + message="Expected resume command to be sent", + ) + + +def assembly_socket_reader(message: dict, header_size: int) -> DummyStreamReader: + serialised = json.dumps(message).encode("utf-8") + response_payload = ( + len(serialised).to_bytes(length=header_size, byteorder="big") + serialised + ) + return DummyStreamReader(read_buffer_content=response_payload) + + +def assert_correct_command_sent( + writer: DummyStreamWriter, command: dict, header_size: int, message: str +) -> None: + serialised_command = json.dumps(command).encode("utf-8") + payload = ( + len(serialised_command).to_bytes(length=header_size, byteorder="big") + + serialised_command + ) + assert writer.get_content() == payload, message diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/__init__.py b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_communucation.py b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_communucation.py new file mode 100644 index 000000000..a0c4d8092 --- /dev/null +++ b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_communucation.py @@ -0,0 +1,190 @@ +""" +In this module, tests are written to mock, avoiding issues with trying to find empty socket in tests +""" + +import json +import random +from unittest.mock import MagicMock + +import pytest + +from inference.core.interfaces.stream_manager.manager_app.communication import ( + receive_socket_data, + send_data_trough_socket, +) +from inference.core.interfaces.stream_manager.manager_app.errors import ( + MalformedHeaderError, + MalformedPayloadError, + TransmissionChannelClosed, +) + + +def test_receive_socket_data_when_header_is_malformed() -> None: + # given + socket = MagicMock() + socket.recv.side_effect = [b"A"] + + # when + with pytest.raises(MalformedHeaderError): + _ = receive_socket_data( + source=socket, + header_size=4, + buffer_size=512, + ) + + +def test_receive_socket_data_when_header_cannot_be_decoded_as_valid_value() -> None: + # given + socket = MagicMock() + zero = 0 + socket.recv.side_effect = [zero.to_bytes(length=4, byteorder="big")] + + # when + with pytest.raises(MalformedHeaderError): + _ = receive_socket_data( + source=socket, + header_size=4, + buffer_size=512, + ) + + +def test_receive_socket_data_when_header_indicated_invalid_payload_length() -> None: + # given + socket = MagicMock() + data = json.dumps({"some": "data"}).encode("utf-8") + header = len(data) + 32 + socket.recv.side_effect = [header.to_bytes(length=4, byteorder="big"), data, b""] + + # when + with pytest.raises(TransmissionChannelClosed): + _ = receive_socket_data( + source=socket, + header_size=4, + buffer_size=len(data), + ) + + +def test_receive_socket_data_when_malformed_payload_given() -> None: + # given + socket = MagicMock() + data = "FOR SURE NOT A JSON :)".encode("utf-8") + header = len(data) + socket.recv.side_effect = [header.to_bytes(length=4, byteorder="big"), data] + + # when + with pytest.raises(MalformedPayloadError): + _ = receive_socket_data( + source=socket, + header_size=4, + buffer_size=len(data), + ) + + +def test_receive_socket_data_complete_successfully_despite_fragmented_message() -> None: + # given + socket = MagicMock() + data = json.dumps({"some": "data"}).encode("utf-8") + header = len(data) + socket.recv.side_effect = [ + header.to_bytes(length=4, byteorder="big"), + data[:-3], + data[-3:], + ] + + # when + result = receive_socket_data( + source=socket, + header_size=4, + buffer_size=len(data) - 3, + ) + + # then + assert result == {"some": "data"}, "Decoded date must be equal to input payload" + + +def test_receive_socket_data_when_timeout_error_should_be_reraised() -> None: + # given + socket = MagicMock() + data = json.dumps({"some": "data"}).encode("utf-8") + header = len(data) + socket.recv.side_effect = [header.to_bytes(length=4, byteorder="big"), TimeoutError] + + # when + with pytest.raises(TimeoutError): + _ = receive_socket_data( + source=socket, + header_size=4, + buffer_size=len(data), + ) + + +def test_send_data_trough_socket_when_operation_succeeds() -> None: + # given + socket = MagicMock() + payload = json.dumps({"my": "data"}).encode("utf-8") + + # when + send_data_trough_socket( + target=socket, + header_size=4, + data=payload, + request_id="my_request", + pipeline_id="my_pipeline", + ) + + # then + socket.sendall.assert_called_once_with( + len(payload).to_bytes(length=4, byteorder="big") + payload + ) + + +def test_send_data_trough_socket_when_payload_overflow_happens() -> None: + # given + socket = MagicMock() + payload = json.dumps( + {"my": "data", "list": [random.randint(0, 100) for _ in range(128)]} + ).encode("utf-8") + expected_error_payload = json.dumps( + { + "request_id": "my_request", + "response": { + "status": "failure", + "error_type": "internal_error", + "error_class": "OverflowError", + "error_message": "int too big to convert", + }, + "pipeline_id": "my_pipeline", + } + ).encode("utf-8") + + # when + send_data_trough_socket( + target=socket, + header_size=1, + data=payload, + request_id="my_request", + pipeline_id="my_pipeline", + ) + + # then + socket.sendall.assert_called_once_with( + len(expected_error_payload).to_bytes(length=1, byteorder="big") + + expected_error_payload + ) + + +def test_send_data_trough_socket_when_connection_error_occurs() -> None: + # given + socket = MagicMock() + payload = json.dumps({"my": "data"}).encode("utf-8") + + # when + send_data_trough_socket( + target=socket, + header_size=4, + data=payload, + request_id="my_request", + pipeline_id="my_pipeline", + ) + + # then: Nothing happens - error just logged diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_inference_pipeline_manager.py b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_inference_pipeline_manager.py new file mode 100644 index 000000000..6781895e0 --- /dev/null +++ b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_inference_pipeline_manager.py @@ -0,0 +1,569 @@ +""" +Unit tests in this module are realised using `InferencePipeline` mock - and within single process, submitting +command queues upfront, and then handling one-by-one in the same process. +""" + +from datetime import datetime +from multiprocessing import Queue +from unittest import mock +from unittest.mock import MagicMock + +import numpy as np +import pytest + +from inference.core.exceptions import ( + RoboflowAPINotAuthorizedError, + RoboflowAPINotNotFoundError, +) +from inference.core.interfaces.camera.entities import VideoFrame +from inference.core.interfaces.camera.exceptions import StreamOperationNotAllowedError +from inference.core.interfaces.camera.video_source import ( + BufferConsumptionStrategy, + BufferFillingStrategy, +) +from inference.core.interfaces.stream.sinks import InMemoryBufferSink +from inference.core.interfaces.stream_manager.manager_app import ( + inference_pipeline_manager, +) +from inference.core.interfaces.stream_manager.manager_app.entities import ( + CommandType, + ErrorType, + InitialisePipelinePayload, + OperationStatus, + VideoConfiguration, + WorkflowConfiguration, +) +from inference.core.interfaces.stream_manager.manager_app.inference_pipeline_manager import ( + InferencePipelineManager, +) + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_when_init_pipeline_operation_is_requested( + pipeline_init_mock: MagicMock, +) -> None: + # given + pipeline_init_mock.return_value = MagicMock() + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert status_1 == ( + "1", + {"status": OperationStatus.SUCCESS}, + ), "Initialisation operation must succeed" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination operation must succeed" + actual_video_reference = pipeline_init_mock.call_args[1]["video_reference"] + assert actual_video_reference == "rtsp://128.0.0.1" + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_when_init_pipeline_operation_is_requested_but_invalid_payload_sent( + pipeline_init_mock: MagicMock, +) -> None: + # given + pipeline_init_mock.return_value = MagicMock() + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + del init_payload["video_configuration"]["video_reference"] + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.FAILURE + ), "Init operation should fail" + assert ( + status_1[1]["error_type"] == ErrorType.INVALID_PAYLOAD + ), "Invalid Payload error is expected" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_when_init_pipeline_operation_is_requested_but_roboflow_operation_not_authorised( + pipeline_init_mock: MagicMock, +) -> None: + # given + pipeline_init_mock.side_effect = RoboflowAPINotAuthorizedError() + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.FAILURE + ), "Init operation should fail" + assert ( + status_1[1]["error_type"] == ErrorType.AUTHORISATION_ERROR + ), "Authorisation error is expected" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InMemoryBufferSink, "init") +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_consumption( + pipeline_init_mock: MagicMock, + buffer_init: MagicMock, +) -> None: + # given + pipeline_init_mock.return_value = MagicMock() + filled_buffer = InMemoryBufferSink(queue_size=10) + filled_buffer.on_prediction( + predictions=[None, {"some": "value"}], + video_frame=[ + None, + VideoFrame( + image=np.zeros((3, 3)), + frame_id=0, + frame_timestamp=datetime.now(), + source_id=1, + ), + ], + ) + filled_buffer.on_prediction( + predictions=[None, {"other": "value"}], + video_frame=[ + None, + VideoFrame( + image=np.zeros((3, 3)), + frame_id=1, + frame_timestamp=datetime.now(), + source_id=1, + ), + ], + ) + buffer_init.return_value = filled_buffer + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.CONSUME_RESULT})) + command_queue.put(("3", {"type": CommandType.CONSUME_RESULT})) + command_queue.put(("4", {"type": CommandType.CONSUME_RESULT})) + command_queue.put(("5", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + status_3 = responses_queue.get() + status_4 = responses_queue.get() + status_5 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.SUCCESS + ), "Init operation should succeed" + assert status_2[0] == "2", "2nd request should be reported in responses_queue 2nd" + assert status_2[1]["status"] == OperationStatus.SUCCESS, "Operation should succeed" + assert status_2[1]["outputs"] == [ + None, + {"some": "value"}, + ], "Operation should yield first buffer result" + assert status_3[0] == "3", "3rd request should be reported in responses_queue 3rd" + assert status_3[1]["status"] == OperationStatus.SUCCESS, "Operation should succeed" + assert status_3[1]["outputs"] == [ + None, + {"other": "value"}, + ], "Operation should yield second buffer result" + assert status_4[0] == "4", "4th request should be reported in responses_queue 4th" + assert status_4[1]["status"] == OperationStatus.SUCCESS, "Operation should succeed" + assert status_4[1]["outputs"] == [], "Operation should yield empty result" + assert status_5[0] == "5", "5th request should be reported in responses_queue 5th" + assert status_5[1]["status"] == OperationStatus.SUCCESS, "Operation should succeed" + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_when_init_pipeline_operation_is_requested_but_model_not_found( + pipeline_init_mock: MagicMock, +) -> None: + # given + pipeline_init_mock.side_effect = RoboflowAPINotNotFoundError() + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.FAILURE + ), "Init operation should fail" + assert ( + status_1[1]["error_type"] == ErrorType.NOT_FOUND + ), "Not found error is expected" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_when_init_pipeline_operation_is_requested_but_unknown_error_appears( + pipeline_init_mock: MagicMock, +) -> None: + # given + pipeline_init_mock.side_effect = Exception() + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.FAILURE + ), "Init operation should fail" + assert ( + status_1[1]["error_type"] == ErrorType.INTERNAL_ERROR + ), "Internal error is expected" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +def test_inference_pipeline_manager_when_attempted_to_get_status_of_not_initialised_pipeline() -> ( + None +): + # given + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + + # when + command_queue.put(("1", {"type": CommandType.STATUS})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.FAILURE + ), "Init operation should fail" + assert ( + status_1[1]["error_type"] == ErrorType.OPERATION_ERROR + ), "Operation error is expected" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +def test_inference_pipeline_manager_when_attempted_to_pause_of_not_initialised_pipeline() -> ( + None +): + # given + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + + # when + command_queue.put(("1", {"type": CommandType.MUTE})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.FAILURE + ), "Init operation should fail" + assert ( + status_1[1]["error_type"] == ErrorType.OPERATION_ERROR + ), "Operation error is expected" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +def test_inference_pipeline_manager_when_attempted_to_resume_of_not_initialised_pipeline() -> ( + None +): + # given + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + + # when + command_queue.put(("1", {"type": CommandType.RESUME})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + + # then + assert ( + status_1[0] == "1" + ), "First request should be reported in responses_queue at first" + assert ( + status_1[1]["status"] == OperationStatus.FAILURE + ), "Init operation should fail" + assert ( + status_1[1]["error_type"] == ErrorType.OPERATION_ERROR + ), "Operation error is expected" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_when_attempted_to_init_pause_resume_actions_successfully( + pipeline_init_mock: MagicMock, +) -> None: + # given + pipeline_init_mock.return_value = MagicMock() + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.MUTE})) + command_queue.put(("3", {"type": CommandType.RESUME})) + command_queue.put(("4", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + status_3 = responses_queue.get() + status_4 = responses_queue.get() + + # then + assert status_1 == ( + "1", + {"status": OperationStatus.SUCCESS}, + ), "Initialisation operation must succeed" + assert status_2 == ( + "2", + {"status": OperationStatus.SUCCESS}, + ), "Pause of pipeline must happen" + assert status_3 == ( + "3", + {"status": OperationStatus.SUCCESS}, + ), "Resume of pipeline must happen" + assert status_4 == ( + "4", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +@pytest.mark.timeout(30) +@mock.patch.object(inference_pipeline_manager.InferencePipeline, "init_with_workflow") +def test_inference_pipeline_manager_when_attempted_to_resume_running_sprint_causing_not_allowed_transition( + pipeline_init_mock: MagicMock, +) -> None: + # given + pipeline_init_mock.return_value = MagicMock() + pipeline_init_mock.return_value.resume_stream.side_effect = ( + StreamOperationNotAllowedError() + ) + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + init_payload = assembly_valid_init_payload() + + # when + command_queue.put(("1", init_payload)) + command_queue.put(("2", {"type": CommandType.RESUME})) + command_queue.put(("3", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + status_3 = responses_queue.get() + + # then + assert status_1 == ( + "1", + {"status": OperationStatus.SUCCESS}, + ), "Initialisation operation must succeed" + assert status_2[0] == "2", "Second result must refer to request `2`" + assert ( + status_2[1]["status"] is OperationStatus.FAILURE + ), "Second request should fail, as we requested forbidden action" + assert status_2[1]["error_type"] == ErrorType.OPERATION_ERROR + assert status_3 == ( + "3", + {"status": OperationStatus.SUCCESS}, + ), "Termination of pipeline must happen" + + +def assembly_valid_init_payload() -> dict: + specification = { + "version": "1.0", + "inputs": [ + { + "type": "InferenceImage", + "name": "image", + "video_metadata_input_name": "test", + }, + ], + "steps": [ + { + "type": "ObjectDetectionModel", + "name": "people_detector", + "image": "$inputs.image", + "model_id": "yolov8n-640", + "confidence": 0.5, + "iou_threshold": 0.7, + "class_filter": ["person"], + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.people_detector.predictions", + } + ], + } + valid_init_payload = InitialisePipelinePayload( + video_configuration=VideoConfiguration( + type="VideoConfiguration", + video_reference="rtsp://128.0.0.1", + source_buffer_filling_strategy=BufferFillingStrategy.DROP_OLDEST, + source_buffer_consumption_strategy=BufferConsumptionStrategy.EAGER, + ), + processing_configuration=WorkflowConfiguration( + type="WorkflowConfiguration", + workflow_specification=specification, + ), + api_key="", + ).dict() + valid_init_payload["type"] = CommandType.INIT + return valid_init_payload diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_serialisation.py b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_serialisation.py new file mode 100644 index 000000000..20f04239f --- /dev/null +++ b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_serialisation.py @@ -0,0 +1,126 @@ +import datetime +import json +from enum import Enum + +from inference.core.interfaces.stream_manager.manager_app.entities import ( + ErrorType, + OperationStatus, +) +from inference.core.interfaces.stream_manager.manager_app.serialisation import ( + describe_error, + prepare_error_response, + serialise_to_json, +) + + +def test_serialise_to_json_when_datetime_object_given() -> None: + # given + timestamp = datetime.datetime( + year=2020, month=10, day=13, hour=10, minute=30, second=12 + ) + + # when + serialised = json.dumps({"time": timestamp}, default=serialise_to_json) + + # then + + assert ( + "2020-10-13T10:30:12" in serialised + ), "Timestamp in format YYYY-MM-DDTHH:MM:HH must be present in serialised json" + + +def test_serialise_to_json_when_date_object_given() -> None: + # given + timestamp = datetime.date(year=2020, month=10, day=13) + + # when + serialised = json.dumps({"time": timestamp}, default=serialise_to_json) + + # then + + assert ( + "2020-10-13" in serialised + ), "Date in format YYYY-MM-DD must be present in serialised json" + + +class ExampleEnum(Enum): + SOME = "some" + OTHER = "other" + + +def test_serialise_to_json_when_enum_object_given() -> None: + # given + data = ExampleEnum.SOME + + # when + serialised = json.dumps({"payload": data}, default=serialise_to_json) + + # then + + assert "some" in serialised, "Enum value `some` must be present in serialised json" + + +def test_serialise_to_json_when_no_special_content_given() -> None: + # given + data = {"some": 1, "other": True} + + # when + serialised = json.dumps(data, default=serialise_to_json) + result = json.loads(serialised) + + # then + + assert result == data, "After deserialization, data must be recovered 100%" + + +def test_describe_error_when_exception_is_provided_as_context() -> None: + # given + exception = ValueError("Some value error") + + # when + result = describe_error(exception=exception, error_type=ErrorType.INVALID_PAYLOAD) + + # then + assert result == { + "status": OperationStatus.FAILURE, + "error_type": ErrorType.INVALID_PAYLOAD, + "error_class": "ValueError", + "error_message": "Some value error", + } + + +def test_describe_error_when_exception_is_not_provided() -> None: + # when + result = describe_error(exception=None, error_type=ErrorType.INVALID_PAYLOAD) + + # then + assert result == { + "status": OperationStatus.FAILURE, + "error_type": ErrorType.INVALID_PAYLOAD, + } + + +def test_prepare_error_response() -> None: + # given + exception = ValueError("Some value error") + + # when + error_response = prepare_error_response( + request_id="my_request", + error=exception, + error_type=ErrorType.INTERNAL_ERROR, + pipeline_id="my_pipeline", + ) + decoded_response = json.loads(error_response.decode("utf-8")) + + # then + assert decoded_response == { + "request_id": "my_request", + "response": { + "status": "failure", + "error_type": "internal_error", + "error_class": "ValueError", + "error_message": "Some value error", + }, + "pipeline_id": "my_pipeline", + } diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_tcp_server.py b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_tcp_server.py new file mode 100644 index 000000000..ac5eaab6e --- /dev/null +++ b/tests/inference/unit_tests/core/interfaces/stream_manager/manager_app/test_tcp_server.py @@ -0,0 +1,27 @@ +from unittest.mock import MagicMock + +from inference.core.interfaces.stream_manager.manager_app.tcp_server import ( + RoboflowTCPServer, +) + + +def test_roboflow_server_applies_connection_timeout() -> None: + # given + server = RoboflowTCPServer( + server_address=("127.0.0.1", 7070), + handler_class=MagicMock, + socket_operations_timeout=1.5, + ) + connection, address = MagicMock(), MagicMock() + server.socket = MagicMock() + server.socket.accept.return_value = (connection, address) + + # when + result = server.get_request() + + # then + connection.settimeout.assert_called_once_with(1.5) + assert result == ( + connection, + address, + ), "Method must return accepted connection and address, as per TCPServer interface requirement" diff --git a/tests/inference/unit_tests/enterprise/stream_management/manager/test_app.py b/tests/inference/unit_tests/enterprise/stream_management/manager/test_app.py index 9fc7a4439..13d86eaad 100644 --- a/tests/inference/unit_tests/enterprise/stream_management/manager/test_app.py +++ b/tests/inference/unit_tests/enterprise/stream_management/manager/test_app.py @@ -147,6 +147,9 @@ def test_execute_termination(exit_mock: MagicMock) -> None: } # when + # initial command makes sure the error handling is set and there is no time-hazard with execute_termination(...) + command_queue.put(("unknown", {})) + _ = responses_queue.get() execute_termination(9, MagicMock(), processes_table=processes_table) # then diff --git a/tests/inference_sdk/unit_tests/http/test_client.py b/tests/inference_sdk/unit_tests/http/test_client.py index 39287bbac..6385706e5 100644 --- a/tests/inference_sdk/unit_tests/http/test_client.py +++ b/tests/inference_sdk/unit_tests/http/test_client.py @@ -3762,3 +3762,356 @@ def test_infer_from_workflow_when_custom_workflow_with_both_parameters_and_exclu }, "excluded_fields": ["some"], }, "Request payload must contain api key and inputs" + + +def test_list_inference_pipelines(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.get( + f"{api_url}/inference_pipelines/list", + json={ + "status": "success", + "context": {"request_id": "52f5df39-b7de-4a56-8c42-b979d365cfa0", + "pipeline_id": None}, + "pipelines": ["acd62146-edca-4253-8eeb-40c88906cd70"] + }, + ) + + # when + result = http_client.list_inference_pipelines() + + # then + assert result == { + "status": "success", + "context": {"request_id": "52f5df39-b7de-4a56-8c42-b979d365cfa0", + "pipeline_id": None}, + "pipelines": ["acd62146-edca-4253-8eeb-40c88906cd70"] + } + assert requests_mock.request_history[0].json() == {"api_key": "my-api-key"}, \ + "Expected payload to contain API key" + + +def test_list_inference_pipelines_on_auth_error(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.get( + f"{api_url}/inference_pipelines/list", + status_code=401, + ) + + # when + with pytest.raises(HTTPCallErrorError): + _ = http_client.list_inference_pipelines() + + +def test_get_inference_pipeline_status(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.get( + f"{api_url}/inference_pipelines/my-pipeline/status", + json={ + "status": "success", + }, + ) + + # when + result = http_client.get_inference_pipeline_status(pipeline_id="my-pipeline") + + # then + assert result == { + "status": "success", + } + assert requests_mock.request_history[0].json() == {"api_key": "my-api-key"}, \ + "Expected payload to contain API key" + + +def test_get_inference_pipeline_status_when_pipeline_id_empty(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + + # when + with pytest.raises(InvalidParameterError): + _ = http_client.get_inference_pipeline_status(pipeline_id="") + + +def test_get_inference_pipeline_status_when_pipeline_id_not_found(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.get( + f"{api_url}/inference_pipelines/my-pipeline/status", + status_code=404, + ) + + # when + with pytest.raises(HTTPCallErrorError): + _ = http_client.get_inference_pipeline_status(pipeline_id="my-pipeline") + + +def test_pause_inference_pipeline(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.post( + f"{api_url}/inference_pipelines/my-pipeline/pause", + json={ + "status": "success", + }, + ) + + # when + result = http_client.pause_inference_pipeline(pipeline_id="my-pipeline") + + # then + assert result == { + "status": "success", + } + assert requests_mock.request_history[0].json() == {"api_key": "my-api-key"}, \ + "Expected payload to contain API key" + + +def test_pause_inference_pipeline_when_pipeline_id_empty() -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + + # when + with pytest.raises(InvalidParameterError): + _ = http_client.pause_inference_pipeline(pipeline_id="") + + +def test_pause_inference_pipeline_when_pipeline_id_not_found(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.post( + f"{api_url}/inference_pipelines/my-pipeline/pause", + status_code=404, + ) + + # when + with pytest.raises(HTTPCallErrorError): + _ = http_client.pause_inference_pipeline(pipeline_id="my-pipeline") + + +def test_resume_inference_pipeline(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.post( + f"{api_url}/inference_pipelines/my-pipeline/resume", + json={ + "status": "success", + }, + ) + + # when + result = http_client.resume_inference_pipeline(pipeline_id="my-pipeline") + + # then + assert result == { + "status": "success", + } + assert requests_mock.request_history[0].json() == {"api_key": "my-api-key"}, \ + "Expected payload to contain API key" + + +def test_resume_inference_pipeline_when_pipeline_id_empty() -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + + # when + with pytest.raises(InvalidParameterError): + _ = http_client.resume_inference_pipeline(pipeline_id="") + + +def test_resume_inference_pipeline_when_pipeline_id_not_found(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.post( + f"{api_url}/inference_pipelines/my-pipeline/resume", + status_code=404, + ) + + # when + with pytest.raises(HTTPCallErrorError): + _ = http_client.resume_inference_pipeline(pipeline_id="my-pipeline") + + +def test_terminate_inference_pipeline(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.post( + f"{api_url}/inference_pipelines/my-pipeline/terminate", + json={ + "status": "success", + }, + ) + + # when + result = http_client.terminate_inference_pipeline(pipeline_id="my-pipeline") + + # then + assert result == { + "status": "success", + } + assert requests_mock.request_history[0].json() == {"api_key": "my-api-key"}, \ + "Expected payload to contain API key" + + +def test_terminate_inference_pipeline_when_pipeline_id_empty() -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + + # when + with pytest.raises(InvalidParameterError): + _ = http_client.terminate_inference_pipeline(pipeline_id="") + + +def test_terminate_inference_pipeline_when_pipeline_id_not_found(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.post( + f"{api_url}/inference_pipelines/my-pipeline/terminate", + status_code=404, + ) + + # when + with pytest.raises(HTTPCallErrorError): + _ = http_client.terminate_inference_pipeline(pipeline_id="my-pipeline") + + +def test_consume_inference_pipeline_result(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.get( + f"{api_url}/inference_pipelines/my-pipeline/consume", + json={ + "status": "success", + }, + ) + + # when + result = http_client.consume_inference_pipeline_result( + pipeline_id="my-pipeline", + excluded_fields=["a"], + ) + + # then + assert result == { + "status": "success", + } + assert requests_mock.request_history[0].json() == {"api_key": "my-api-key", "excluded_fields": ["a"]}, \ + "Expected payload to contain API key" + + +def test_consume_inference_pipeline_result_when_pipeline_id_empty() -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + + # when + with pytest.raises(InvalidParameterError): + _ = http_client.consume_inference_pipeline_result(pipeline_id="") + + +def test_consume_inference_pipeline_result_when_pipeline_id_not_found(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.get( + f"{api_url}/inference_pipelines/my-pipeline/consume", + status_code=404, + ) + + # when + with pytest.raises(HTTPCallErrorError): + _ = http_client.consume_inference_pipeline_result(pipeline_id="my-pipeline") + + +def test_start_inference_pipeline_with_workflow_when_configuration_does_not_specify_workflow() -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + + # when + with pytest.raises(InvalidParameterError): + http_client.start_inference_pipeline_with_workflow(video_reference="rtsp://some/stream") + + +def test_start_inference_pipeline_with_workflow_when_configuration_does_over_specify_workflow() -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + + # when + with pytest.raises(InvalidParameterError): + http_client.start_inference_pipeline_with_workflow( + video_reference="rtsp://some/stream", + workflow_specification={}, + workspace_name="some", + workflow_id="some", + ) + + +def test_start_inference_pipeline_with_workflow_when_configuration_is_valid(requests_mock: Mocker) -> None: + # given + api_url = "http://some.com" + http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) + requests_mock.post( + f"{api_url}/inference_pipelines/initialise", + json={ + "status": "success", + }, + ) + + # when + result = http_client.start_inference_pipeline_with_workflow( + video_reference="rtsp://some/stream", + workspace_name="some", + workflow_id="other", + ) + + # then + assert result == { + "status": "success", + } + assert requests_mock.request_history[0].json() == { + "api_key": "my-api-key", + "video_configuration": { + "type": "VideoConfiguration", + "video_reference": "rtsp://some/stream", + "max_fps": None, + "source_buffer_filling_strategy": "DROP_OLDEST", + "source_buffer_consumption_strategy": "EAGER", + "video_source_properties": None, + "batch_collection_timeout": None, + }, + "processing_configuration": { + "type": "WorkflowConfiguration", + "workflow_specification": None, + "workspace_name": "some", + "workflow_id": "other", + "image_input_name": "image", + "workflows_parameters": None, + "workflows_thread_pool_workers": 4, + "cancel_thread_pool_tasks_on_exit": True, + "video_metadata_input_name": "video_metadata", + }, + "sink_configuration": { + "type": "MemorySinkConfiguration", + "results_buffer_size": 64, + }, + } + + diff --git a/tests/workflows/unit_tests/core_steps/analytics/test_path_deviation.py b/tests/workflows/unit_tests/core_steps/analytics/test_path_deviation.py index 8d9fe8a27..3d9c98e2a 100644 --- a/tests/workflows/unit_tests/core_steps/analytics/test_path_deviation.py +++ b/tests/workflows/unit_tests/core_steps/analytics/test_path_deviation.py @@ -7,9 +7,7 @@ from inference.core.workflows.core_steps.analytics.path_deviation.v1 import ( PathDeviationAnalyticsBlockV1, ) -from inference.core.workflows.execution_engine.entities.base import ( - VideoMetadata, -) +from inference.core.workflows.execution_engine.entities.base import VideoMetadata def test_path_deviation_exact_path(): @@ -60,9 +58,7 @@ def test_path_deviation_exact_path(): triggering_anchor=sv.Position.CENTER, # Default reference_path=reference_path, ) - frechet_distance = result["path_deviation_detections"][ - "path_deviation" - ][0] + frechet_distance = result["path_deviation_detections"]["path_deviation"][0] frechet_distances.append(frechet_distance) # Then @@ -128,9 +124,7 @@ def test_path_deviation_with_deviation(): triggering_anchor=sv.Position.CENTER, # Default reference_path=reference_path, ) - frechet_distance = result["path_deviation_detections"][ - "path_deviation" - ][0] + frechet_distance = result["path_deviation_detections"]["path_deviation"][0] frechet_distances.append(frechet_distance) # Then @@ -183,9 +177,7 @@ def test_path_deviation_multiple_objects(): triggering_anchor=sv.Position.CENTER, # Default reference_path=reference_path, ) - frechet_distance = result["path_deviation_detections"][ - "path_deviation" - ][0] + frechet_distance = result["path_deviation_detections"]["path_deviation"][0] frechet_distances.append(frechet_distance) # Then