Skip to content

Commit

Permalink
Merge pull request #679 from roboflow/feature/video_processing_in_inf…
Browse files Browse the repository at this point in the history
…erence_server

Video processing in `inference` server
  • Loading branch information
PawelPeczek-Roboflow authored Sep 27, 2024
2 parents a74f740 + 64f2525 commit 6acb030
Show file tree
Hide file tree
Showing 47 changed files with 4,109 additions and 50 deletions.
43 changes: 20 additions & 23 deletions development/stream_interface/workflows_demo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from threading import Thread
from typing import List, Optional

Expand All @@ -7,12 +6,13 @@

from inference import InferencePipeline
from inference.core.interfaces.camera.entities import VideoFrame
from inference.core.interfaces.camera.video_source import BufferFillingStrategy, BufferConsumptionStrategy
from inference.core.interfaces.stream.watchdog import PipelineWatchDog, BasePipelineWatchDog
from inference.core.utils.drawing import create_tiles

STOP = False
ANNOTATOR = sv.BoundingBoxAnnotator()
TARGET_PROJECT = os.environ["TARGET_PROJECT"]
fps_monitor = sv.FPSMonitor()


def main() -> None:
Expand All @@ -32,36 +32,25 @@ def main() -> None:
"confidence": 0.5,
},
{
"type": "RoboflowDatasetUpload",
"name": "roboflow_dataset_upload",
"images": "$inputs.image",
"type": "roboflow_core/bounding_box_visualization@v1",
"name": "bbox_visualiser",
"predictions": "$steps.step_1.predictions",
"target_project": TARGET_PROJECT,
"usage_quota_name": "upload_quota_XXX",
"fire_and_forget": True,
},
{
"type": "RoboflowCustomMetadata",
"name": "metadata_upload",
"predictions": "$steps.step_1.predictions",
"field_name": "dummy",
"field_value": "dummy",
"fire_and_forget": True,
},
"image": "$inputs.image"
}
],
"outputs": [
{"type": "JsonField", "name": "predictions", "selector": "$steps.step_1.predictions"},
{"type": "JsonField", "name": "upload_error", "selector": "$steps.roboflow_dataset_upload.error_status"},
{"type": "JsonField", "name": "upload_message", "selector": "$steps.roboflow_dataset_upload.message"},
{"type": "JsonField", "name": "metadata_error", "selector": "$steps.metadata_upload.error_status"},
{"type": "JsonField", "name": "metadata_message", "selector": "$steps.metadata_upload.message"},
{"type": "JsonField", "name": "preview", "selector": "$steps.bbox_visualiser.image"},

],
}
pipeline = InferencePipeline.init_with_workflow(
video_reference=[os.environ["VIDEO_REFERENCE"]] * 2,
video_reference=["rtsp://localhost:8554/live.stream"],
workflow_specification=workflow_specification,
watchdog=watchdog,
on_prediction=workflows_sink,
source_buffer_filling_strategy=BufferFillingStrategy.DROP_OLDEST,
source_buffer_consumption_strategy=BufferConsumptionStrategy.EAGER,
)
control_thread = Thread(target=command_thread, args=(pipeline, watchdog))
control_thread.start()
Expand Down Expand Up @@ -91,17 +80,25 @@ def workflows_sink(
predictions: List[Optional[dict]],
video_frames: List[Optional[VideoFrame]],
) -> None:
fps_monitor.tick()
if not isinstance(predictions, list):
predictions = [predictions]
video_frames = [video_frames]
images_to_show = []
for prediction, frame in zip(predictions, video_frames):
if prediction is None or frame is None:
continue
detections: sv.Detections = prediction["predictions"]
visualised = ANNOTATOR.annotate(frame.image.copy(), detections)
images_to_show.append(visualised)
print(prediction["upload_message"], prediction["metadata_message"])
tiles = create_tiles(images=images_to_show)
cv2.imshow(f"Predictions", tiles)
cv2.waitKey(1)
if hasattr(fps_monitor, "fps"):
fps_value = fps_monitor.fps
else:
fps_value = fps_monitor()
print(f"FPS: {fps_value}")


if __name__ == '__main__':
Expand Down
11 changes: 10 additions & 1 deletion docker/config/cpu_http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from multiprocessing import Process

from inference.core.cache import cache
from inference.core.interfaces.http.http_api import HttpInterface
from inference.core.interfaces.stream_manager.manager_app.app import start
from inference.core.managers.active_learning import ActiveLearningManager, BackgroundTaskActiveLearningManager
from inference.core.managers.base import ModelManager
from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache
Expand All @@ -9,7 +12,7 @@
import os
from prometheus_fastapi_instrumentator import Instrumentator

from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA
from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA, ENABLE_STREAM_API
from inference.models.utils import ROBOFLOW_MODEL_TYPES

model_registry = RoboflowModelRegistry(ROBOFLOW_MODEL_TYPES)
Expand Down Expand Up @@ -38,3 +41,9 @@
@app.on_event("startup")
async def _startup():
instrumentor.expose(app)

if ENABLE_STREAM_API:
stream_manager_process = Process(
target=start,
)
stream_manager_process.start()
11 changes: 10 additions & 1 deletion docker/config/gpu_http.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
from multiprocessing import Process

from prometheus_fastapi_instrumentator import Instrumentator

from inference.core.cache import cache
from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA
from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA, ENABLE_STREAM_API
from inference.core.interfaces.http.http_api import HttpInterface
from inference.core.interfaces.stream_manager.manager_app.app import start
from inference.core.managers.active_learning import ActiveLearningManager, BackgroundTaskActiveLearningManager
from inference.core.managers.base import ModelManager
from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache
Expand Down Expand Up @@ -41,3 +44,9 @@
@app.on_event("startup")
async def _startup():
instrumentor.expose(app)

if ENABLE_STREAM_API:
stream_manager_process = Process(
target=start,
)
stream_manager_process.start()
1 change: 1 addition & 0 deletions docker/dockerfiles/Dockerfile.onnx.cpu
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_SAM2_ENABLED=True
ENV CORE_MODEL_OWLV2_ENABLED=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn cpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
1 change: 1 addition & 0 deletions docker/dockerfiles/Dockerfile.onnx.gpu
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ ENV API_LOGGING_ENABLED=True
ENV LMM_ENABLED=True
ENV CORE_MODEL_SAM2_ENABLED=True
ENV CORE_MODEL_OWLV2_ENABLED=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
2 changes: 2 additions & 0 deletions docker/dockerfiles/Dockerfile.onnx.jetson.4.5.0
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local
ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_TROCR_ENABLED=false
ENV RUNS_ON_JETSON=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
2 changes: 2 additions & 0 deletions docker/dockerfiles/Dockerfile.onnx.jetson.4.6.1
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local
ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_TROCR_ENABLED=false
ENV RUNS_ON_JETSON=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
2 changes: 2 additions & 0 deletions docker/dockerfiles/Dockerfile.onnx.jetson.5.1.1
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local
ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_TROCR_ENABLED=false
ENV RUNS_ON_JETSON=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
142 changes: 142 additions & 0 deletions docs/workflows/video_processing/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Video Processing with Workflows

We've begun our journey into video processing using Workflows. Over time, we've expanded the number of
video-specific blocks (e.g., the ByteTracker block) and continue to dedicate efforts toward improving
their performance and robustness. The current state of this work is as follows:

* We've introduced the `WorkflowVideoMetadata` input to store metadata related to video frames,
including FPS, timestamp, video source identifier, and file/stream flags. While this may not be the final approach
for handling video metadata, it allows us to build stateful video-processing blocks at this stage.
If your Workflow includes any blocks requiring input of kind `video_metadata`, you must define this input in
your Workflow. The metadata functions as a batch-oriented parameter, treated by the Execution Engine in the same
way as `WorkflowImage`.

* The `InferencePipeline` supports
[video processing with Workflows](/using_inference/inference_pipeline/#inferencepipeline-and-roboflow-workflows)
by automatically injecting `WorkflowVideoMetadata` into the `video_metadata` field. This allows you to seamlessly run
your Workflow using the `InferencePipeline` within the `inference` Python package.

* In the `0.21.0` release, we've initiated efforts to enable video processing management via the `inference` server API.
This means that eventually, no custom scripts will be required to process video using Workflows and `InferencePipeline`.
You'll simply call an endpoint, specify the video source and the workflow, and the server will handle the rest—allowing
you to focus on consuming the results.


## Video management API - comments and status update

This is experimental feature, breaking changes may be introduced over time. There is a list of
[known issues](https://github.com/roboflow/inference/issues?q=is%3Aopen+is%3Aissue+label%3A%22Video+Management+API+issues%22).
Please visit the page to raise new issues or comment on existing ones.

### Release `0.21.0`

* Added basic endpoints to `list`, `start`, `pause`, `resume`, `terminate` and `consume` results of `InferencePipelines`
running under control of `inference` server. Endpoints are enabled in `inference` server docker images for CPU, GPU
and Jetson devices. Running inference server there would let you call
[`http://127.0.0.1:9001/docs`](http://127.0.0.1:9001/docs) to retrieve OpenAPI schemas for endpoints.

* Added HTTP client for new endpoints into `InferenceHTTPClient` from `inference_sdk`. Here you may find examples on
how to use the client and API to start processing videos today:


!!! Note

Package in version `inference~=0.21.0` used in examlpe, due to experimental nature of the feature, the code
may evolve over time.

```python
from inference_sdk import InferenceHTTPClient

client = InferenceHTTPClient(
api_url="http://192.168.0.115:9001",
api_key="<ROBOFLOW-API-KEY>"
)

# to list active pipelines
client.list_inference_pipelines()

# start processing - single stream
client.start_inference_pipeline_with_workflow(
video_reference=["rtsp://192.168.0.73:8554/live0.stream"],
workspace_name="<YOUR-WORKSPACE>",
workflow_id="<YOUR-WORKFLOW-ID>",
results_buffer_size=5, # results are consumed from in-memory buffer - optionally you can control its size
)

# start processing - one RTSP stream and one camera
# USB camera cannot be passed easily to docker running on MacBook, but on Jetson devices it works :)

client.start_inference_pipeline_with_workflow(
video_reference=["rtsp://192.168.0.73:8554/live0.stream", 0],
workspace_name="<YOUR-WORKSPACE>",
workflow_id="<YOUR-WORKFLOW-ID>",
batch_collection_timeout=0.05, # for consumption of multiple video sources it is ADVISED to
# set batch collection timeout (defined as fraction of seconds - 0.05 = 50ms)
)

# start_inference_pipeline_with_workflow(...) will provide you pipeline_id which may be used to:

# * get pipeline status
client.get_inference_pipeline_status(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * pause pipeline
client.pause_inference_pipeline(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * resume pipeline
client.resume_inference_pipeline(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * terminate pipeline
client.terminate_inference_pipeline(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * consume pipeline results
client.consume_inference_pipeline_result(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
excluded_fields=["workflow_output_field_to_exclude"] # this is optional
# if you wanted to get rid of some outputs to save bandwidth - feel free to discard them
)
```


The client presented above, may be used preview workflow outputs in a very **naive** way. Let's assume
that the Workflow you defined runs object-detection model and renders it's output using Workflows visualisation
blocks registering output image in `preview` field. You can use the following script to pool and display processed video
frames:

```python
import cv2
from inference_sdk import InferenceHTTPClient
from inference.core.utils.image_utils import load_image


client = InferenceHTTPClient(
api_url=f"http://127.0.0.1:9001",
api_key="<YOUR-API-KEY>",
)

while True:
result = client.consume_inference_pipeline_result(pipeline_id="<PIPELINE-ID>")
if not result["outputs"] or not result["outputs"][0]:
# "outputs" key contains list of workflow results - why list? InferencePipeline can
# run on multiple video sources at the same time - each "round" it attempts to
# grab single frame from all sources and run through Workflows Execution Engine
# * when sources are inactive that may not be possible, hence empty list can be returned
# * when one of the source do not provide frame (for instance due to batch collection timeout)
# outputs list may contain `None` values!
continue
# let's assume single source
source_result = result["outputs"][0]
image, _ = load_image(source_result["preview"]) # "preview" is the name of workflow output with image
cv2.imshow("frame", image)
cv2.waitKey(1)
```



4 changes: 4 additions & 0 deletions inference/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,7 @@
DEDICATED_DEPLOYMENT_WORKSPACE_URL = os.environ.get(
"DEDICATED_DEPLOYMENT_WORKSPACE_URL", None
)

ENABLE_STREAM_API = str2bool(os.getenv("ENABLE_STREAM_API", "False"))

RUNS_ON_JETSON = str2bool(os.getenv("RUNS_ON_JETSON", "False"))
14 changes: 13 additions & 1 deletion inference/core/interfaces/camera/video_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
DEFAULT_BUFFER_SIZE,
DEFAULT_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW,
DEFAULT_MINIMUM_ADAPTIVE_MODE_SAMPLES,
RUNS_ON_JETSON,
)
from inference.core.interfaces.camera.entities import (
SourceProperties,
Expand Down Expand Up @@ -132,7 +133,10 @@ def locked_executor(video_source: "VideoSource", *args, **kwargs) -> None:

class CV2VideoFrameProducer(VideoFrameProducer):
def __init__(self, video: Union[str, int]):
self.stream = cv2.VideoCapture(video)
if _consumes_camera_on_jetson(video=video):
self.stream = cv2.VideoCapture(video, cv2.CAP_V4L2)
else:
self.stream = cv2.VideoCapture(video)

def isOpened(self) -> bool:
return self.stream.isOpened()
Expand Down Expand Up @@ -165,6 +169,14 @@ def release(self):
self.stream.release()


def _consumes_camera_on_jetson(video: Union[str, int]) -> bool:
if not RUNS_ON_JETSON:
return False
if isinstance(video, int):
return True
return video.startswith("/dev/video")


class VideoSource:
@classmethod
def init(
Expand Down
Loading

0 comments on commit 6acb030

Please sign in to comment.