Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Video processing in inference server #679

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
def327c
Add scratch of implementation
PawelPeczek-Roboflow Sep 12, 2024
8403d8e
Merge remote-tracking branch 'origin/byte-tracker-block' into feature…
PawelPeczek-Roboflow Sep 12, 2024
f620030
Merge remote-tracking branch 'origin/byte-tracker-block' into feature…
PawelPeczek-Roboflow Sep 12, 2024
e7578d5
WIP - added changes required for demo
PawelPeczek-Roboflow Sep 16, 2024
e157475
Resolved conflics with main
PawelPeczek-Roboflow Sep 25, 2024
371edef
Get rid of unwanted extensions
PawelPeczek-Roboflow Sep 25, 2024
0ca2739
Add dummy debiugging regarding speed
PawelPeczek-Roboflow Sep 25, 2024
1994111
Enable debug
PawelPeczek-Roboflow Sep 25, 2024
c9a8076
Start stream manager in GPU build
PawelPeczek-Roboflow Sep 25, 2024
8816972
Add basic tests
PawelPeczek-Roboflow Sep 25, 2024
a43f651
Add CLI extension and support for L4V2 VideoSource
PawelPeczek-Roboflow Sep 26, 2024
57888bc
Add CLI command option
PawelPeczek-Roboflow Sep 26, 2024
55ad939
Let re-use images
PawelPeczek-Roboflow Sep 26, 2024
8710650
Let re-use images
PawelPeczek-Roboflow Sep 26, 2024
7529505
WIP
PawelPeczek-Roboflow Sep 26, 2024
3b51db0
Improve API design and add SDK client
PawelPeczek-Roboflow Sep 26, 2024
b298533
Fix test
PawelPeczek-Roboflow Sep 26, 2024
727a511
Enable API by default
PawelPeczek-Roboflow Sep 26, 2024
d99075e
Apply fixes
PawelPeczek-Roboflow Sep 26, 2024
864c868
Apply fixes for tests
PawelPeczek-Roboflow Sep 27, 2024
813c595
Fix another sensitive info leak
PawelPeczek-Roboflow Sep 27, 2024
a0dae86
Apply fixes
PawelPeczek-Roboflow Sep 27, 2024
b434955
Merge branch 'main' into feature/video_processing_in_inference_server
PawelPeczek-Roboflow Sep 27, 2024
e555cf4
Make the linter happy
PawelPeczek-Roboflow Sep 27, 2024
6b1628e
Fix issues spotted while tesing
PawelPeczek-Roboflow Sep 27, 2024
668212c
Add tests
PawelPeczek-Roboflow Sep 27, 2024
19f1123
Kick-off docs
PawelPeczek-Roboflow Sep 27, 2024
38b2308
Resolved conflics with main
PawelPeczek-Roboflow Sep 27, 2024
64f2525
Add docs
PawelPeczek-Roboflow Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions development/stream_interface/workflows_demo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from threading import Thread
from typing import List, Optional

Expand All @@ -7,12 +6,13 @@

from inference import InferencePipeline
from inference.core.interfaces.camera.entities import VideoFrame
from inference.core.interfaces.camera.video_source import BufferFillingStrategy, BufferConsumptionStrategy
from inference.core.interfaces.stream.watchdog import PipelineWatchDog, BasePipelineWatchDog
from inference.core.utils.drawing import create_tiles

STOP = False
ANNOTATOR = sv.BoundingBoxAnnotator()
TARGET_PROJECT = os.environ["TARGET_PROJECT"]
fps_monitor = sv.FPSMonitor()


def main() -> None:
Expand All @@ -32,36 +32,25 @@ def main() -> None:
"confidence": 0.5,
},
{
"type": "RoboflowDatasetUpload",
"name": "roboflow_dataset_upload",
"images": "$inputs.image",
"type": "roboflow_core/bounding_box_visualization@v1",
"name": "bbox_visualiser",
"predictions": "$steps.step_1.predictions",
"target_project": TARGET_PROJECT,
"usage_quota_name": "upload_quota_XXX",
"fire_and_forget": True,
},
{
"type": "RoboflowCustomMetadata",
"name": "metadata_upload",
"predictions": "$steps.step_1.predictions",
"field_name": "dummy",
"field_value": "dummy",
"fire_and_forget": True,
},
"image": "$inputs.image"
}
],
"outputs": [
{"type": "JsonField", "name": "predictions", "selector": "$steps.step_1.predictions"},
{"type": "JsonField", "name": "upload_error", "selector": "$steps.roboflow_dataset_upload.error_status"},
{"type": "JsonField", "name": "upload_message", "selector": "$steps.roboflow_dataset_upload.message"},
{"type": "JsonField", "name": "metadata_error", "selector": "$steps.metadata_upload.error_status"},
{"type": "JsonField", "name": "metadata_message", "selector": "$steps.metadata_upload.message"},
{"type": "JsonField", "name": "preview", "selector": "$steps.bbox_visualiser.image"},

],
}
pipeline = InferencePipeline.init_with_workflow(
video_reference=[os.environ["VIDEO_REFERENCE"]] * 2,
video_reference=["rtsp://localhost:8554/live.stream"],
workflow_specification=workflow_specification,
watchdog=watchdog,
on_prediction=workflows_sink,
source_buffer_filling_strategy=BufferFillingStrategy.DROP_OLDEST,
source_buffer_consumption_strategy=BufferConsumptionStrategy.EAGER,
)
control_thread = Thread(target=command_thread, args=(pipeline, watchdog))
control_thread.start()
Expand Down Expand Up @@ -91,17 +80,25 @@ def workflows_sink(
predictions: List[Optional[dict]],
video_frames: List[Optional[VideoFrame]],
) -> None:
fps_monitor.tick()
if not isinstance(predictions, list):
predictions = [predictions]
video_frames = [video_frames]
images_to_show = []
for prediction, frame in zip(predictions, video_frames):
if prediction is None or frame is None:
continue
detections: sv.Detections = prediction["predictions"]
visualised = ANNOTATOR.annotate(frame.image.copy(), detections)
images_to_show.append(visualised)
print(prediction["upload_message"], prediction["metadata_message"])
tiles = create_tiles(images=images_to_show)
cv2.imshow(f"Predictions", tiles)
cv2.waitKey(1)
if hasattr(fps_monitor, "fps"):
fps_value = fps_monitor.fps
else:
fps_value = fps_monitor()
print(f"FPS: {fps_value}")


if __name__ == '__main__':
Expand Down
11 changes: 10 additions & 1 deletion docker/config/cpu_http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from multiprocessing import Process

from inference.core.cache import cache
from inference.core.interfaces.http.http_api import HttpInterface
from inference.core.interfaces.stream_manager.manager_app.app import start
from inference.core.managers.active_learning import ActiveLearningManager, BackgroundTaskActiveLearningManager
from inference.core.managers.base import ModelManager
from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache
Expand All @@ -9,7 +12,7 @@
import os
from prometheus_fastapi_instrumentator import Instrumentator

from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA
from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA, ENABLE_STREAM_API
from inference.models.utils import ROBOFLOW_MODEL_TYPES

model_registry = RoboflowModelRegistry(ROBOFLOW_MODEL_TYPES)
Expand Down Expand Up @@ -38,3 +41,9 @@
@app.on_event("startup")
async def _startup():
instrumentor.expose(app)

if ENABLE_STREAM_API:
stream_manager_process = Process(
target=start,
)
stream_manager_process.start()
11 changes: 10 additions & 1 deletion docker/config/gpu_http.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
from multiprocessing import Process

from prometheus_fastapi_instrumentator import Instrumentator

from inference.core.cache import cache
from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA
from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA, ENABLE_STREAM_API
from inference.core.interfaces.http.http_api import HttpInterface
from inference.core.interfaces.stream_manager.manager_app.app import start
from inference.core.managers.active_learning import ActiveLearningManager, BackgroundTaskActiveLearningManager
from inference.core.managers.base import ModelManager
from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache
Expand Down Expand Up @@ -41,3 +44,9 @@
@app.on_event("startup")
async def _startup():
instrumentor.expose(app)

if ENABLE_STREAM_API:
stream_manager_process = Process(
target=start,
)
stream_manager_process.start()
1 change: 1 addition & 0 deletions docker/dockerfiles/Dockerfile.onnx.cpu
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_SAM2_ENABLED=True
ENV CORE_MODEL_OWLV2_ENABLED=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn cpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
1 change: 1 addition & 0 deletions docker/dockerfiles/Dockerfile.onnx.gpu
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ ENV API_LOGGING_ENABLED=True
ENV LMM_ENABLED=True
ENV CORE_MODEL_SAM2_ENABLED=True
ENV CORE_MODEL_OWLV2_ENABLED=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
2 changes: 2 additions & 0 deletions docker/dockerfiles/Dockerfile.onnx.jetson.4.5.0
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local
ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_TROCR_ENABLED=false
ENV RUNS_ON_JETSON=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
2 changes: 2 additions & 0 deletions docker/dockerfiles/Dockerfile.onnx.jetson.4.6.1
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local
ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_TROCR_ENABLED=false
ENV RUNS_ON_JETSON=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
2 changes: 2 additions & 0 deletions docker/dockerfiles/Dockerfile.onnx.jetson.5.1.1
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,7 @@ ENV WORKFLOWS_STEP_EXECUTION_MODE=local
ENV WORKFLOWS_MAX_CONCURRENT_STEPS=1
ENV API_LOGGING_ENABLED=True
ENV CORE_MODEL_TROCR_ENABLED=false
ENV RUNS_ON_JETSON=True
ENV ENABLE_STREAM_API=True

ENTRYPOINT uvicorn gpu_http:app --workers $NUM_WORKERS --host $HOST --port $PORT
142 changes: 142 additions & 0 deletions docs/workflows/video_processing/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Video Processing with Workflows

We've begun our journey into video processing using Workflows. Over time, we've expanded the number of
video-specific blocks (e.g., the ByteTracker block) and continue to dedicate efforts toward improving
their performance and robustness. The current state of this work is as follows:

* We've introduced the `WorkflowVideoMetadata` input to store metadata related to video frames,
including FPS, timestamp, video source identifier, and file/stream flags. While this may not be the final approach
for handling video metadata, it allows us to build stateful video-processing blocks at this stage.
If your Workflow includes any blocks requiring input of kind `video_metadata`, you must define this input in
your Workflow. The metadata functions as a batch-oriented parameter, treated by the Execution Engine in the same
way as `WorkflowImage`.

* The `InferencePipeline` supports
[video processing with Workflows](/using_inference/inference_pipeline/#inferencepipeline-and-roboflow-workflows)
by automatically injecting `WorkflowVideoMetadata` into the `video_metadata` field. This allows you to seamlessly run
your Workflow using the `InferencePipeline` within the `inference` Python package.

* In the `0.21.0` release, we've initiated efforts to enable video processing management via the `inference` server API.
This means that eventually, no custom scripts will be required to process video using Workflows and `InferencePipeline`.
You'll simply call an endpoint, specify the video source and the workflow, and the server will handle the rest—allowing
you to focus on consuming the results.


## Video management API - comments and status update

This is experimental feature, breaking changes may be introduced over time. There is a list of
[known issues](https://github.com/roboflow/inference/issues?q=is%3Aopen+is%3Aissue+label%3A%22Video+Management+API+issues%22).
Please visit the page to raise new issues or comment on existing ones.

### Release `0.21.0`

* Added basic endpoints to `list`, `start`, `pause`, `resume`, `terminate` and `consume` results of `InferencePipelines`
running under control of `inference` server. Endpoints are enabled in `inference` server docker images for CPU, GPU
and Jetson devices. Running inference server there would let you call
[`http://127.0.0.1:9001/docs`](http://127.0.0.1:9001/docs) to retrieve OpenAPI schemas for endpoints.

* Added HTTP client for new endpoints into `InferenceHTTPClient` from `inference_sdk`. Here you may find examples on
how to use the client and API to start processing videos today:


!!! Note

Package in version `inference~=0.21.0` used in examlpe, due to experimental nature of the feature, the code
may evolve over time.

```python
from inference_sdk import InferenceHTTPClient

client = InferenceHTTPClient(
api_url="http://192.168.0.115:9001",
api_key="<ROBOFLOW-API-KEY>"
)

# to list active pipelines
client.list_inference_pipelines()

# start processing - single stream
client.start_inference_pipeline_with_workflow(
video_reference=["rtsp://192.168.0.73:8554/live0.stream"],
workspace_name="<YOUR-WORKSPACE>",
workflow_id="<YOUR-WORKFLOW-ID>",
results_buffer_size=5, # results are consumed from in-memory buffer - optionally you can control its size
)

# start processing - one RTSP stream and one camera
# USB camera cannot be passed easily to docker running on MacBook, but on Jetson devices it works :)

client.start_inference_pipeline_with_workflow(
video_reference=["rtsp://192.168.0.73:8554/live0.stream", 0],
workspace_name="<YOUR-WORKSPACE>",
workflow_id="<YOUR-WORKFLOW-ID>",
batch_collection_timeout=0.05, # for consumption of multiple video sources it is ADVISED to
# set batch collection timeout (defined as fraction of seconds - 0.05 = 50ms)
)

# start_inference_pipeline_with_workflow(...) will provide you pipeline_id which may be used to:

# * get pipeline status
client.get_inference_pipeline_status(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * pause pipeline
client.pause_inference_pipeline(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * resume pipeline
client.resume_inference_pipeline(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * terminate pipeline
client.terminate_inference_pipeline(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
)

# * consume pipeline results
client.consume_inference_pipeline_result(
pipeline_id="182452f4-a2c1-4537-92e1-ec64d1e42de1",
excluded_fields=["workflow_output_field_to_exclude"] # this is optional
# if you wanted to get rid of some outputs to save bandwidth - feel free to discard them
)
```


The client presented above, may be used preview workflow outputs in a very **naive** way. Let's assume
that the Workflow you defined runs object-detection model and renders it's output using Workflows visualisation
blocks registering output image in `preview` field. You can use the following script to pool and display processed video
frames:

```python
import cv2
from inference_sdk import InferenceHTTPClient
from inference.core.utils.image_utils import load_image


client = InferenceHTTPClient(
api_url=f"http://127.0.0.1:9001",
api_key="<YOUR-API-KEY>",
)

while True:
result = client.consume_inference_pipeline_result(pipeline_id="<PIPELINE-ID>")
if not result["outputs"] or not result["outputs"][0]:
# "outputs" key contains list of workflow results - why list? InferencePipeline can
# run on multiple video sources at the same time - each "round" it attempts to
# grab single frame from all sources and run through Workflows Execution Engine
# * when sources are inactive that may not be possible, hence empty list can be returned
# * when one of the source do not provide frame (for instance due to batch collection timeout)
# outputs list may contain `None` values!
continue
# let's assume single source
source_result = result["outputs"][0]
image, _ = load_image(source_result["preview"]) # "preview" is the name of workflow output with image
cv2.imshow("frame", image)
cv2.waitKey(1)
```



4 changes: 4 additions & 0 deletions inference/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,7 @@
DEDICATED_DEPLOYMENT_WORKSPACE_URL = os.environ.get(
"DEDICATED_DEPLOYMENT_WORKSPACE_URL", None
)

ENABLE_STREAM_API = str2bool(os.getenv("ENABLE_STREAM_API", "False"))

RUNS_ON_JETSON = str2bool(os.getenv("RUNS_ON_JETSON", "False"))
14 changes: 13 additions & 1 deletion inference/core/interfaces/camera/video_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
DEFAULT_BUFFER_SIZE,
DEFAULT_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW,
DEFAULT_MINIMUM_ADAPTIVE_MODE_SAMPLES,
RUNS_ON_JETSON,
)
from inference.core.interfaces.camera.entities import (
SourceProperties,
Expand Down Expand Up @@ -132,7 +133,10 @@ def locked_executor(video_source: "VideoSource", *args, **kwargs) -> None:

class CV2VideoFrameProducer(VideoFrameProducer):
def __init__(self, video: Union[str, int]):
self.stream = cv2.VideoCapture(video)
if _consumes_camera_on_jetson(video=video):
self.stream = cv2.VideoCapture(video, cv2.CAP_V4L2)
else:
self.stream = cv2.VideoCapture(video)

def isOpened(self) -> bool:
return self.stream.isOpened()
Expand Down Expand Up @@ -165,6 +169,14 @@ def release(self):
self.stream.release()


def _consumes_camera_on_jetson(video: Union[str, int]) -> bool:
if not RUNS_ON_JETSON:
return False
if isinstance(video, int):
return True
return video.startswith("/dev/video")


class VideoSource:
@classmethod
def init(
Expand Down
Loading
Loading