Skip to content

Commit

Permalink
Merge pull request #654 from roboflow/regular-dynamic-zone
Browse files Browse the repository at this point in the history
Added the Ability to Use Rectilinear Polygons in the Dynamic Zone Block
  • Loading branch information
PawelPeczek-Roboflow authored Sep 20, 2024
2 parents 9594505 + 6401e6f commit e22b900
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 0 deletions.
4 changes: 4 additions & 0 deletions inference/core/workflows/core_steps/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@
from inference.core.workflows.core_steps.transformations.absolute_static_crop.v1 import (
AbsoluteStaticCropBlockV1,
)
from inference.core.workflows.core_steps.transformations.bounding_rect.v1 import (
BoundingRectBlockV1,
)
from inference.core.workflows.core_steps.transformations.byte_tracker.v1 import (
ByteTrackerBlockV1,
)
Expand Down Expand Up @@ -267,6 +270,7 @@
def load_blocks() -> List[Type[WorkflowBlock]]:
return [
TimeInZoneBlockV1,
BoundingRectBlockV1,
SegmentAnything2BlockV1,
DetectionsConsensusBlockV1,
ClipComparisonBlockV1,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import List, Literal, Optional, Tuple, Type

import cv2 as cv
import numpy as np
import supervision as sv
from pydantic import ConfigDict, Field

from inference.core.workflows.execution_engine.entities.base import OutputDefinition
from inference.core.workflows.execution_engine.entities.types import (
INSTANCE_SEGMENTATION_PREDICTION_KIND,
StepOutputSelector,
)
from inference.core.workflows.prototypes.block import (
BlockResult,
WorkflowBlock,
WorkflowBlockManifest,
)

OUTPUT_KEY: str = "detections_with_rect"
DETECTIONS_RECT_PARAM: str = "rect"
DETECTIONS_WIDTH_PARAM: str = "width"
DETECTIONS_HEIGHT_PARAM: str = "height"
DETECTIONS_ANGLE_PARAM: str = "angle"

SHORT_DESCRIPTION = "Find minimal bounding rectangle surrounding detection contour"
LONG_DESCRIPTION = """
The `BoundingRect` is a transformer block designed to simplify polygon
to the minimum boundig rectangle.
This block is best suited when Zone needs to be created based on shape of detected object
(i.e. basketball field, road segment, zebra crossing etc.)
Input detections should be filtered beforehand and contain only desired classes of interest.
Resulsts are stored in sv.Detections.data
"""


class BoundingRectManifest(WorkflowBlockManifest):
model_config = ConfigDict(
json_schema_extra={
"name": "Bounding Rectangle",
"version": "v1",
"short_description": SHORT_DESCRIPTION,
"long_description": LONG_DESCRIPTION,
"license": "Apache-2.0",
"block_type": "transformation",
}
)
type: Literal[f"roboflow_core/bounding_rect@v1"]
predictions: StepOutputSelector(
kind=[
INSTANCE_SEGMENTATION_PREDICTION_KIND,
]
) = Field( # type: ignore
description="",
examples=["$segmentation.predictions"],
)

@classmethod
def accepts_batch_input(cls) -> bool:
return False

@classmethod
def describe_outputs(cls) -> List[OutputDefinition]:
return [
OutputDefinition(
name=OUTPUT_KEY, kind=[INSTANCE_SEGMENTATION_PREDICTION_KIND]
),
]

@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
return ">=1.0.0,<2.0.0"


def calculate_minimum_bounding_rectangle(
mask: np.ndarray,
) -> Tuple[np.array, float, float, float]:
contours = sv.mask_to_polygons(mask)
largest_contour = max(contours, key=len)

rect = cv.minAreaRect(largest_contour)
box = cv.boxPoints(rect)
box = np.int0(box)

width, height = rect[1]
angle = rect[2]
return box, width, height, angle


class BoundingRectBlockV1(WorkflowBlock):
@classmethod
def get_manifest(cls) -> Type[WorkflowBlockManifest]:
return BoundingRectManifest

def run(
self,
predictions: sv.Detections,
) -> BlockResult:
if predictions.mask is None:
raise ValueError(
"Mask missing. This block operates on output from segmentation model."
)
to_be_merged = []
for i in range(len(predictions)):
# copy
det = predictions[i]

rect, width, height, angle = calculate_minimum_bounding_rectangle(
det.mask[0]
)

det[DETECTIONS_RECT_PARAM] = np.array([rect], dtype=np.float16)
det[DETECTIONS_WIDTH_PARAM] = np.array([width], dtype=np.float16)
det[DETECTIONS_HEIGHT_PARAM] = np.array([height], dtype=np.float16)
det[DETECTIONS_ANGLE_PARAM] = np.array([angle], dtype=np.float16)

to_be_merged.append(det)

return {OUTPUT_KEY: sv.Detections.merge(to_be_merged)}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import numpy as np
import pytest
import supervision as sv

from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS
from inference.core.managers.base import ModelManager
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.core.workflows.errors import RuntimeInputError, StepExecutionError
from inference.core.workflows.execution_engine.core import ExecutionEngine
from tests.workflows.integration_tests.execution.workflows_gallery_collector.decorators import (
add_to_workflows_gallery,
)

BOUNDNG_RECTANGLE_WORKFLOW = {
"version": "1.0",
"inputs": [
{"type": "WorkflowImage", "name": "image"},
],
"steps": [
{
"type": "InstanceSegmentationModel",
"name": "detection",
"image": "$inputs.image",
"model_id": "yolov8n-seg-640",
},
{
"type": "roboflow_core/bounding_rect@v1",
"name": "bounding_rect",
"predictions": "$steps.detection.predictions",
}
],
"outputs": [
{"type": "JsonField", "name": "result", "selector": "$steps.bounding_rect.detections_with_rect"}
],
}


@add_to_workflows_gallery(
category="Basic Workflows",
use_case_title="Workflow with bounding rect",
use_case_description="""
This is the basic workflow that only contains a single object detection model and bounding rectangle extraction.
""",
workflow_definition=BOUNDNG_RECTANGLE_WORKFLOW,
workflow_name_in_app="fit-bounding-rectangle",
)
def test_rectangle_bounding_workflow(
model_manager: ModelManager,
dogs_image: np.ndarray,
) -> None:
# given
workflow_init_parameters = {
"workflows_core.model_manager": model_manager,
"workflows_core.api_key": None,
"workflows_core.step_execution_mode": StepExecutionMode.LOCAL,
}
execution_engine = ExecutionEngine.init(
workflow_definition=BOUNDNG_RECTANGLE_WORKFLOW,
init_parameters=workflow_init_parameters,
max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS,
)

# when
result = execution_engine.run(
runtime_parameters={
"image": [dogs_image],
}
)

# then
assert len(result) == 1, "One set ot outputs expected"
assert "result" in result[0], "Output must contain key 'result'"
assert isinstance(result[0]["result"], sv.Detections), "Output must be instance of sv.Detections"
assert len(result[0]["result"]) == 2, "Two dogs on the image"
assert "rect" in result[0]["result"].data, "'rect' data field must expected to be found in result"
assert "width" in result[0]["result"].data, "'width' data field must expected to be found in result"
assert "height" in result[0]["result"].data, "'height' data field must expected to be found in result"
assert "angle" in result[0]["result"].data, "'angle' data field must expected to be found in result"

assert np.allclose(result[0]["result"]["rect"][0], np.array([[322.0, 402.0], [325.0, 224.0], [586.0, 228.0], [583.0, 406.0]]))
assert np.allclose(result[0]["result"]["rect"][1], np.array([[219.0, 82.0], [352.0, 57.0], [409.0, 363.0], [276.0, 388.0]]))
assert np.allclose(result[0]["result"]["width"], np.array([261.5, 311.25]), atol=0.1)
assert np.allclose(result[0]["result"]["height"], np.array([178.4, 135.2]), atol=0.1)
assert np.allclose(result[0]["result"]["angle"], np.array([0.826, 79.5]), atol=0.1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import numpy as np
import pytest
import supervision as sv

from inference.core.workflows.core_steps.transformations.bounding_rect.v1 import (
BoundingRectBlockV1,
BoundingRectManifest,
calculate_minimum_bounding_rectangle,
)


def test_calculate_minimum_bounding_rectangle():
# given
polygon = np.array(
[
[10, 10],
[10, 1],
[20, 1],
[20, 10],
[15, 5]
]
)
mask = sv.polygon_to_mask(
polygon=polygon, resolution_wh=(np.max(polygon, axis=0) + 10)
)

# when
box, width, height, angle = calculate_minimum_bounding_rectangle(mask=mask)

# then
expected_box = np.array([[10, 1], [20, 1], [20, 10], [10, 10]])
assert np.allclose(box, expected_box), (
f"Expected bounding box to be {expected_box}, but got {box}"
)
assert np.isclose(width, 9), f"Expected width to be 9, but got {width}"
assert np.isclose(height, 10), f"Expected height to be 10, but got {height}"
assert angle == 90 or angle == -90, f"Expected angle to be 90 or -90, but got {angle}"


@pytest.mark.parametrize("type_alias", ["roboflow_core/bounding_rect@v1"])
def test_bounding_box_validation_when_valid_manifest_is_given(
type_alias: str,
) -> None:
# given
data = {
"type": type_alias,
"name": "bounding_box",
"predictions": "$steps.od_model.predictions",
}

# when
result = BoundingRectManifest.model_validate(data)

# then
assert result == BoundingRectManifest(
type=type_alias,
name="bounding_box",
predictions="$steps.od_model.predictions"
)


def test_bounding_box_block() -> None:
# given
block = BoundingRectBlockV1()
detections = sv.Detections(
xyxy=np.array([[10, 10, 100, 100]]),
mask=np.array(
[
sv.polygon_to_mask(
polygon=np.array([[10, 10], [10, 100], [100, 100], [100, 10]]),
resolution_wh=(1000, 1000)
)
]
),
)

output = block.run(
predictions=detections,
)

assert isinstance(output, dict)
assert "detections_with_rect" in output
assert output["detections_with_rect"].data["height"][0] == 90
assert output["detections_with_rect"].data["width"][0] == 90
assert output["detections_with_rect"].data["angle"][0] == 90
np.allclose(np.array([[10, 10], [10, 100], [100, 100], [100, 10]]), output["detections_with_rect"].data["rect"][0])
# check if the image is modified
assert detections != output["detections_with_rect"]

0 comments on commit e22b900

Please sign in to comment.