diff --git a/inference/core/workflows/core_steps/transformations/dynamic_crop/v1.py b/inference/core/workflows/core_steps/transformations/dynamic_crop/v1.py index bba42ec00..1b88285e8 100644 --- a/inference/core/workflows/core_steps/transformations/dynamic_crop/v1.py +++ b/inference/core/workflows/core_steps/transformations/dynamic_crop/v1.py @@ -1,6 +1,8 @@ from dataclasses import replace -from typing import Dict, List, Literal, Optional, Type, Union +from typing import Dict, List, Literal, Optional, Tuple, Type, Union +import cv2 +import numpy as np import supervision as sv from pydantic import AliasChoices, ConfigDict, Field @@ -13,13 +15,17 @@ WorkflowImageData, ) from inference.core.workflows.execution_engine.entities.types import ( + FLOAT_ZERO_TO_ONE_KIND, IMAGE_KIND, INSTANCE_SEGMENTATION_PREDICTION_KIND, KEYPOINT_DETECTION_PREDICTION_KIND, OBJECT_DETECTION_PREDICTION_KIND, + RGB_COLOR_KIND, + STRING_KIND, StepOutputImageSelector, StepOutputSelector, WorkflowImageSelector, + WorkflowParameterSelector, ) from inference.core.workflows.prototypes.block import ( BlockResult, @@ -34,6 +40,11 @@ workflow. For example, you could use an ObjectDetection block to detect objects, then the DynamicCropBlock block to crop objects, then an OCR block to run character recognition on each of the individual cropped regions. + +In addition, for instance segmentation predictions (which provide segmentation mask for each +bounding box) it is possible to remove background in the crops, outside of detected instances. +To enable that functionality, set `mask_opacity` to positive value and optionally tune +`background_color`. """ @@ -67,6 +78,36 @@ class BlockManifest(WorkflowBlockManifest): examples=["$steps.my_object_detection_model.predictions"], validation_alias=AliasChoices("predictions", "detections"), ) + mask_opacity: Union[ + WorkflowParameterSelector(kind=[FLOAT_ZERO_TO_ONE_KIND]), + float, + ] = Field( + default=0.0, + le=1.0, + ge=0.0, + description="For instance segmentation, mask_opacity can be used to control background removal. " + "Opacity 1.0 removes the background, while 0.0 leaves the crop unchanged.", + json_schema_extra={ + "relevant_for": { + "predictions": { + "kind": [INSTANCE_SEGMENTATION_PREDICTION_KIND.name], + "required": True, + }, + } + }, + ) + background_color: Union[ + WorkflowParameterSelector(kind=[STRING_KIND]), + StepOutputSelector(kind=[RGB_COLOR_KIND]), + str, + Tuple[int, int, int], + ] = Field( + default=(0, 0, 0), + description="For background removal based on segmentation mask, new background color can be selected. " + "Can be a hex string (like '#431112') RGB string (like '(128, 32, 64)') or a RGB tuple " + "(like (18, 17, 67)).", + examples=["#431112", "$inputs.bg_color", (18, 17, 67)], + ) @classmethod def accepts_batch_input(cls) -> bool: @@ -97,9 +138,16 @@ def run( self, images: Batch[WorkflowImageData], predictions: Batch[sv.Detections], + mask_opacity: float, + background_color: Union[str, Tuple[int, int, int]], ) -> BlockResult: return [ - crop_image(image=image, detections=detections) + crop_image( + image=image, + detections=detections, + mask_opacity=mask_opacity, + background_color=background_color, + ) for image, detections in zip(images, predictions) ] @@ -107,6 +155,8 @@ def run( def crop_image( image: WorkflowImageData, detections: sv.Detections, + mask_opacity: float, + background_color: Union[str, Tuple[int, int, int]], detection_id_key: str = DETECTION_ID_KEY, ) -> List[Dict[str, WorkflowImageData]]: if len(detections) == 0: @@ -117,10 +167,24 @@ def crop_image( f"in data dictionary." ) crops = [] - for (x_min, y_min, x_max, y_max), detection_id in zip( - detections.xyxy.round().astype(dtype=int), detections[detection_id_key] + for idx, ((x_min, y_min, x_max, y_max), detection_id) in enumerate( + zip(detections.xyxy.round().astype(dtype=int), detections[detection_id_key]) ): cropped_image = image.numpy_image[y_min:y_max, x_min:x_max] + if not cropped_image.size: + crops.append({"crops": None}) + continue + if mask_opacity > 0 and detections.mask is not None: + detection_mask = detections.mask[idx] + cropped_mask = np.stack( + [detection_mask[y_min:y_max, x_min:x_max]] * 3, axis=-1 + ) + cropped_image = overlay_crop_with_mask( + crop=cropped_image, + mask=cropped_mask, + mask_opacity=mask_opacity, + background_color=background_color, + ) parent_metadata = ImageParentMetadata( parent_id=detection_id, origin_coordinates=OriginCoordinatesSystem( @@ -141,13 +205,51 @@ def crop_image( parent_id=image.workflow_root_ancestor_metadata.parent_id, origin_coordinates=workflow_root_ancestor_coordinates, ) - if cropped_image.size: - result = WorkflowImageData( - parent_metadata=parent_metadata, - workflow_root_ancestor_metadata=workflow_root_ancestor_metadata, - numpy_image=cropped_image, - ) - else: - result = None + result = WorkflowImageData( + parent_metadata=parent_metadata, + workflow_root_ancestor_metadata=workflow_root_ancestor_metadata, + numpy_image=cropped_image, + ) crops.append({"crops": result}) return crops + + +def overlay_crop_with_mask( + crop: np.ndarray, + mask: np.ndarray, + mask_opacity: float, + background_color: Union[str, Tuple[int, int, int]], +) -> np.ndarray: + bgr_color = convert_color_to_bgr_tuple(color=background_color) + background = (np.ones_like(crop) * bgr_color).astype(np.uint8) + blended_crop = np.where(mask > 0, crop, background) + return cv2.addWeighted(blended_crop, mask_opacity, crop, 1.0 - mask_opacity, 0) + + +def convert_color_to_bgr_tuple( + color: Union[str, Tuple[int, int, int]] +) -> Tuple[int, int, int]: + if isinstance(color, str): + return convert_string_color_to_bgr_tuple(color=color) + if isinstance(color, tuple) and len(color) == 3: + return color[::-1] + raise ValueError(f"Invalid color format: {color}") + + +def convert_string_color_to_bgr_tuple(color: str) -> Tuple[int, int, int]: + if color.startswith("#") and len(color) == 7: + try: + return tuple(int(color[i : i + 2], 16) for i in (5, 3, 1)) + except ValueError as e: + raise ValueError(f"Invalid hex color format: {color}") from e + if color.startswith("#") and len(color) == 4: + try: + return tuple(int(color[i] + color[i], 16) for i in (3, 2, 1)) + except ValueError as e: + raise ValueError(f"Invalid hex color format: {color}") from e + if color.startswith("(") and color.endswith(")"): + try: + return tuple(map(int, color[1:-1].split(",")))[::-1] + except ValueError as e: + raise ValueError(f"Invalid tuple color format: {color}") from e + raise ValueError(f"Invalid hex color format: {color}") diff --git a/tests/workflows/integration_tests/execution/test_workflow_with_masked_crop.py b/tests/workflows/integration_tests/execution/test_workflow_with_masked_crop.py new file mode 100644 index 000000000..6e1684319 --- /dev/null +++ b/tests/workflows/integration_tests/execution/test_workflow_with_masked_crop.py @@ -0,0 +1,140 @@ +import numpy as np + +from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS +from inference.core.managers.base import ModelManager +from inference.core.workflows.core_steps.common.entities import StepExecutionMode +from inference.core.workflows.execution_engine.core import ExecutionEngine +from tests.workflows.integration_tests.execution.workflows_gallery_collector.decorators import ( + add_to_workflows_gallery, +) + +MASKED_CROP_WORKFLOW = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + { + "type": "WorkflowParameter", + "name": "model_id", + "default_value": "yolov8n-seg-640", + }, + { + "type": "WorkflowParameter", + "name": "confidence", + "default_value": 0.4, + }, + ], + "steps": [ + { + "type": "roboflow_core/roboflow_instance_segmentation_model@v1", + "name": "segmentation", + "image": "$inputs.image", + "model_id": "$inputs.model_id", + "confidence": "$inputs.confidence", + }, + { + "type": "roboflow_core/dynamic_crop@v1", + "name": "cropping", + "image": "$inputs.image", + "predictions": "$steps.segmentation.predictions", + "mask_opacity": 1.0, + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "crops", + "selector": "$steps.cropping.crops", + }, + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.segmentation.predictions", + }, + ], +} + + +@add_to_workflows_gallery( + category="Workflows with data transformations", + use_case_title="Instance Segmentation results with background subtracted", + use_case_description=""" +This example showcases how to extract all instances detected by instance segmentation model +as separate crops without background. + """, + workflow_definition=MASKED_CROP_WORKFLOW, + workflow_name_in_app="segmentation-plus-masked-crop", +) +def test_workflow_with_masked_crop( + model_manager: ModelManager, + dogs_image: np.ndarray, + roboflow_api_key: str, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": roboflow_api_key, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=MASKED_CROP_WORKFLOW, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = execution_engine.run( + runtime_parameters={ + "image": dogs_image, + } + ) + + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 1, "Expected 1 element in the output for one input image" + assert set(result[0].keys()) == { + "crops", + "predictions", + }, "Expected all declared outputs to be delivered" + assert len(result[0]["crops"]) == 2, "Expected 2 crops for two dogs detected" + crop_image = result[0]["crops"][0].numpy_image + (x_min, y_min, x_max, y_max) = ( + result[0]["predictions"].xyxy[0].round().astype(dtype=int) + ) + crop_mask = result[0]["predictions"].mask[0][y_min:y_max, x_min:x_max] + pixels_outside_mask = np.where( + np.stack([crop_mask] * 3, axis=-1) == 0, + crop_image, + np.zeros_like(crop_image), + ) + pixels_sum = pixels_outside_mask.sum() + assert pixels_sum == 0, "Expected everything black outside mask" + + +def test_workflow_with_masked_crop_when_nothing_gets_predicted( + model_manager: ModelManager, + dogs_image: np.ndarray, + roboflow_api_key: str, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": roboflow_api_key, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=MASKED_CROP_WORKFLOW, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = execution_engine.run( + runtime_parameters={"image": dogs_image, "confidence": 0.99} + ) + + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 1, "Expected 1 element in the output for one input image" + assert set(result[0].keys()) == { + "crops", + "predictions", + }, "Expected all declared outputs to be delivered" + assert len(result[0]["crops"]) == 0, "Expected 0 crops detected" diff --git a/tests/workflows/unit_tests/core_steps/transformations/test_crop.py b/tests/workflows/unit_tests/core_steps/transformations/test_crop.py index b42dc3a6a..4175d76af 100644 --- a/tests/workflows/unit_tests/core_steps/transformations/test_crop.py +++ b/tests/workflows/unit_tests/core_steps/transformations/test_crop.py @@ -1,3 +1,5 @@ +from typing import Tuple, Union + import numpy as np import pytest import supervision as sv @@ -5,6 +7,7 @@ from inference.core.workflows.core_steps.transformations.dynamic_crop.v1 import ( BlockManifest, + convert_color_to_bgr_tuple, crop_image, ) from inference.core.workflows.execution_engine.entities.base import ( @@ -18,8 +21,13 @@ "type_alias", ["roboflow_core/dynamic_crop@v1", "DynamicCrop", "Crop"] ) @pytest.mark.parametrize("images_field_alias", ["images", "image"]) +@pytest.mark.parametrize( + "background_color", ["$steps.some.color", "$inputs.color", (10, 20, 30), "#fff"] +) def test_crop_validation_when_valid_manifest_is_given( - type_alias: str, images_field_alias: str + type_alias: str, + images_field_alias: str, + background_color: Union[str, Tuple[int, int, int]], ) -> None: # given data = { @@ -27,6 +35,8 @@ def test_crop_validation_when_valid_manifest_is_given( "name": "some", images_field_alias: "$inputs.image", "predictions": "$steps.detection.predictions", + "mask_opacity": 0.3, + "background_color": background_color, } # when @@ -38,6 +48,8 @@ def test_crop_validation_when_valid_manifest_is_given( name="some", images="$inputs.image", predictions="$steps.detection.predictions", + mask_opacity=0.3, + background_color=background_color, ) @@ -55,6 +67,24 @@ def test_crop_validation_when_invalid_image_is_given() -> None: _ = BlockManifest.model_validate(data) +@pytest.mark.parametrize("mask_opacity", [-0.1, 1.1]) +def test_crop_validation_when_invalid_mask_opacity_is_given( + mask_opacity: float, +) -> None: + # given + data = { + "type": "Crop", + "name": "some", + "images": "$inputs.image", + "predictions": "$steps.detection.predictions", + "mask_opacity": mask_opacity, + } + + # when + with pytest.raises(ValidationError): + _ = BlockManifest.model_validate(data) + + def test_crop_image() -> None: # given np_image = np.zeros((1000, 1000, 3), dtype=np.uint8) @@ -78,7 +108,9 @@ def test_crop_image() -> None: ) # when - result = crop_image(image=image, detections=detections) + result = crop_image( + image=image, detections=detections, mask_opacity=0.0, background_color=(0, 0, 0) + ) # then assert len(result) == 3, "Expected 3 crops to be created" @@ -136,7 +168,9 @@ def test_crop_image_on_empty_detections() -> None: detections = sv.Detections.empty() # when - result = crop_image(image=image, detections=detections) + result = crop_image( + image=image, detections=detections, mask_opacity=0.0, background_color=(0, 0, 0) + ) # then assert result == [], "Expected empty list" @@ -162,7 +196,9 @@ def test_crop_image_on_zero_size_detections() -> None: ) # when - result = crop_image(image=image, detections=detections) + result = crop_image( + image=image, detections=detections, mask_opacity=0.0, background_color=(0, 0, 0) + ) # then assert len(result) == 3, "Expected 3 outputs" @@ -193,4 +229,159 @@ def test_crop_image_when_detections_without_ids_provided() -> None: # when with pytest.raises(ValueError): - _ = crop_image(image=image, detections=detections) + _ = crop_image( + image=image, + detections=detections, + mask_opacity=0.0, + background_color=(0, 0, 0), + ) + + +def test_convert_color_to_bgr_tuple_when_valid_tuple_given() -> None: + # when + result = convert_color_to_bgr_tuple(color=(255, 0, 0)) + + # then + assert result == (0, 0, 255), "Expected RGB to be converted into BGR" + + +def test_convert_color_to_bgr_tuple_when_invalid_tuple_given() -> None: + # when + with pytest.raises(ValueError): + _ = convert_color_to_bgr_tuple(color=(256, 0, 0, 0)) + + +def test_convert_color_to_bgr_tuple_when_valid_hex_string_given() -> None: + # when + result = convert_color_to_bgr_tuple(color="#ff000A") + + # then + assert result == (10, 0, 255), "Expected RGB to be converted into BGR" + + +def test_convert_color_to_bgr_tuple_when_valid_short_hex_string_given() -> None: + # when + result = convert_color_to_bgr_tuple(color="#f0A") + + # then + assert result == (170, 0, 255), "Expected RGB to be converted into BGR" + + +def test_convert_color_to_bgr_tuple_when_invalid_hex_string_given() -> None: + # when + with pytest.raises(ValueError): + _ = convert_color_to_bgr_tuple(color="#invalid") + + +def test_convert_color_to_bgr_tuple_when_tuple_string_given() -> None: + # when + result = convert_color_to_bgr_tuple(color="(255, 0, 128)") + + # then + assert result == (128, 0, 255), "Expected RGB to be converted into BGR" + + +def test_convert_color_to_bgr_tuple_when_invalid_tuple_string_given() -> None: + # when + with pytest.raises(ValueError): + _ = convert_color_to_bgr_tuple(color="(255, 0, a)") + + +def test_convert_color_to_bgr_tuple_when_invalid_value() -> None: + # when + with pytest.raises(ValueError): + _ = convert_color_to_bgr_tuple(color="invalid") + + +def test_crop_image_when_background_removal_requested_and_mask_not_found() -> None: + # given + np_image = np.zeros((1000, 1000, 3), dtype=np.uint8) + np_image[0:20, 0:20] = 39 + np_image[80:120, 80:120] = 49 + np_image[450:550, 450:550] = 59 + image = WorkflowImageData( + parent_metadata=ImageParentMetadata(parent_id="origin_image"), + numpy_image=np_image, + ) + detections = sv.Detections( + xyxy=np.array( + [[0, 0, 20, 20], [80, 80, 120, 120], [450, 450, 550, 550]], dtype=np.float64 + ), + class_id=np.array([1, 1, 1]), + confidence=np.array([0.5, 0.5, 0.5], dtype=np.float64), + data={ + "detection_id": np.array(["one", "two", "three"]), + "class_name": np.array(["cat", "cat", "cat"]), + }, + ) + + # when + result = crop_image( + image=image, detections=detections, mask_opacity=1.0, background_color=(0, 0, 0) + ) + + # then + assert len(result) == 3, "Expected 3 crops to be created" + assert ( + result[0]["crops"].numpy_image == (np.ones((20, 20, 3), dtype=np.uint8) * 39) + ).all(), "Image must have expected size and color" + assert ( + result[1]["crops"].numpy_image == (np.ones((40, 40, 3), dtype=np.uint8) * 49) + ).all(), "Image must have expected size and color" + assert ( + result[2]["crops"].numpy_image == (np.ones((100, 100, 3), dtype=np.uint8) * 59) + ).all(), "Image must have expected size and color" + + +def test_crop_image_when_background_removal_requested_and_mask_found() -> None: + # given + np_image = np.zeros((1000, 1000, 3), dtype=np.uint8) + np_image[0:20, 0:20] = 39 + np_image[80:120, 80:120] = 49 + np_image[450:550, 450:550] = 59 + mask = np.zeros((3, 1000, 1000), dtype=np.bool_) + mask[0, 0:15, 0:15] = 1 + mask[1, 80:90, 80:90] = 1 + mask[2, 450:460, 450:460] = 1 + image = WorkflowImageData( + parent_metadata=ImageParentMetadata(parent_id="origin_image"), + numpy_image=np_image, + ) + detections = sv.Detections( + xyxy=np.array( + [[0, 0, 20, 20], [80, 80, 120, 120], [450, 450, 550, 550]], dtype=np.float64 + ), + class_id=np.array([1, 1, 1]), + mask=mask, + confidence=np.array([0.5, 0.5, 0.5], dtype=np.float64), + data={ + "detection_id": np.array(["one", "two", "three"]), + "class_name": np.array(["cat", "cat", "cat"]), + }, + ) + + # when + result = crop_image( + image=image, + detections=detections, + mask_opacity=1.0, + background_color=(127, 127, 127), + ) + + # then + assert len(result) == 3, "Expected 3 crops to be created" + expected_first_crop = np.ones((20, 20, 3), dtype=np.uint8) * 127 + expected_first_crop[0:15, 0:15, :] = 39 + assert ( + result[0]["crops"].numpy_image == expected_first_crop + ).all(), "Image must have expected size and color" + expected_second_crop = np.ones((40, 40, 3), dtype=np.uint8) * 127 + expected_second_crop[0:10, 0:10, :] = 49 + assert ( + result[1]["crops"].numpy_image == expected_second_crop + ).all(), "Image must have expected size and color" + expected_third_crop = np.ones((100, 100, 3), dtype=np.uint8) * 127 + expected_third_crop[0:10, 0:10, :] = 59 + assert ( + result[2]["crops"].numpy_image == expected_third_crop + ).all(), "Image must have expected size and color"