|
9 | 9 | import re |
10 | 10 | import time |
11 | 11 | import traceback |
| 12 | +from collections import defaultdict |
12 | 13 | from dataclasses import dataclass |
13 | 14 | from itertools import islice |
14 | 15 | from operator import itemgetter |
|
28 | 29 | import superannotate_schemas |
29 | 30 | from lib.core.conditions import Condition |
30 | 31 | from lib.core.conditions import CONDITION_EQ as EQ |
| 32 | +from lib.core.entities import AttachmentEntity |
31 | 33 | from lib.core.entities import BaseItemEntity |
32 | 34 | from lib.core.entities import ConfigEntity |
33 | 35 | from lib.core.entities import FolderEntity |
|
46 | 48 | from lib.core.serviceproviders import UploadAnnotationsResponse |
47 | 49 | from lib.core.types import PriorityScoreEntity |
48 | 50 | from lib.core.usecases.base import BaseReportableUseCase |
| 51 | +from lib.core.usecases.folders import CreateFolderUseCase |
| 52 | +from lib.core.usecases.items import AttachItems |
49 | 53 | from lib.core.utils import run_async |
50 | 54 | from lib.core.video_convertor import VideoFrameGenerator |
51 | 55 | from lib.infrastructure.utils import divide_to_chunks |
@@ -1812,3 +1816,283 @@ def execute(self): |
1812 | 1816 | self.download_annotation_classes(self.destination) |
1813 | 1817 | self._response.data = os.path.abspath(self.destination) |
1814 | 1818 | return self._response |
| 1819 | + |
| 1820 | + |
| 1821 | +class UploadMultiModalAnnotationsUseCase(BaseReportableUseCase): |
| 1822 | + CHUNK_SIZE = 500 |
| 1823 | + CHUNK_SIZE_MB = 10 * 1024 * 1024 |
| 1824 | + URI_THRESHOLD = 4 * 1024 - 120 |
| 1825 | + |
| 1826 | + def __init__( |
| 1827 | + self, |
| 1828 | + reporter: Reporter, |
| 1829 | + project: ProjectEntity, |
| 1830 | + root_folder: FolderEntity, |
| 1831 | + annotations: List[dict], |
| 1832 | + service_provider: BaseServiceProvider, |
| 1833 | + user: UserEntity, |
| 1834 | + keep_status: bool = False, |
| 1835 | + transform_version: str = None, |
| 1836 | + ): |
| 1837 | + super().__init__(reporter) |
| 1838 | + self._project = project |
| 1839 | + self._root_folder = root_folder |
| 1840 | + self._annotations = annotations |
| 1841 | + self._service_provider = service_provider |
| 1842 | + self._keep_status = keep_status |
| 1843 | + self._report = Report([], [], [], []) |
| 1844 | + self._user = user |
| 1845 | + self._files_queue = None |
| 1846 | + self._transform_version = ( |
| 1847 | + "llmJsonV2" if transform_version is None else transform_version |
| 1848 | + ) |
| 1849 | + |
| 1850 | + @property |
| 1851 | + def files_queue(self): |
| 1852 | + if self._files_queue is None: |
| 1853 | + self._files_queue = asyncio.Queue() |
| 1854 | + return self._files_queue |
| 1855 | + |
| 1856 | + def validate_project_type(self): |
| 1857 | + if self._project.type != constants.ProjectType.MULTIMODAL.value: |
| 1858 | + raise AppException("Unsupported project type.") |
| 1859 | + |
| 1860 | + @staticmethod |
| 1861 | + def _validate_json(json_data: dict) -> bool: |
| 1862 | + return "metadata" in json_data and "name" in json_data["metadata"] |
| 1863 | + |
| 1864 | + def list_items( |
| 1865 | + self, folder: FolderEntity, item_names: List[str] |
| 1866 | + ) -> List[BaseItemEntity]: |
| 1867 | + existing_items = [] |
| 1868 | + for i in range(0, len(item_names), self.CHUNK_SIZE): |
| 1869 | + items_to_check = item_names[i : i + self.CHUNK_SIZE] # noqa: E203 |
| 1870 | + data = self._service_provider.item_service.list( |
| 1871 | + self._project.id, |
| 1872 | + folder.id, |
| 1873 | + Filter("name", items_to_check, OperatorEnum.IN), |
| 1874 | + ) |
| 1875 | + existing_items.extend(data) |
| 1876 | + return existing_items |
| 1877 | + |
| 1878 | + async def distribute_queues(self, items_to_upload: List[ItemToUpload]): |
| 1879 | + data = [[i, False] for i in items_to_upload] |
| 1880 | + items_count = len(items_to_upload) |
| 1881 | + processed_count = 0 |
| 1882 | + while processed_count < items_count: |
| 1883 | + for idx, (item_to_upload, processed) in enumerate(data): |
| 1884 | + if not processed: |
| 1885 | + try: |
| 1886 | + file = io.StringIO() |
| 1887 | + json.dump( |
| 1888 | + item_to_upload.annotation_json, |
| 1889 | + file, |
| 1890 | + allow_nan=False, |
| 1891 | + ) |
| 1892 | + file.seek(0, os.SEEK_END) |
| 1893 | + item_to_upload.file_size = file.tell() |
| 1894 | + while True: |
| 1895 | + if item_to_upload.file_size > BIG_FILE_THRESHOLD: |
| 1896 | + self._report.failed_annotations.append( |
| 1897 | + item_to_upload.name |
| 1898 | + ) |
| 1899 | + continue |
| 1900 | + else: |
| 1901 | + # TODO add validation |
| 1902 | + self._files_queue.put_nowait(item_to_upload) |
| 1903 | + break |
| 1904 | + except Exception as e: |
| 1905 | + name = item_to_upload.name |
| 1906 | + if isinstance(e, ValueError): |
| 1907 | + logger.debug(f"Invalid annotation {name}: {e}") |
| 1908 | + else: |
| 1909 | + logger.debug(traceback.format_exc()) |
| 1910 | + self._report.failed_annotations.append(name) |
| 1911 | + self.reporter.update_progress() |
| 1912 | + data[idx][1] = True # noqa |
| 1913 | + processed_count += 1 |
| 1914 | + data[idx][1] = True # noqa |
| 1915 | + processed_count += 1 |
| 1916 | + self._files_queue.put_nowait(None) |
| 1917 | + |
| 1918 | + async def run_workers( |
| 1919 | + self, folder: FolderEntity, items_to_upload: List[ItemToUpload] |
| 1920 | + ): |
| 1921 | + await asyncio.gather( |
| 1922 | + self.distribute_queues(items_to_upload), |
| 1923 | + *[ |
| 1924 | + upload_small_annotations( |
| 1925 | + project=self._project, |
| 1926 | + folder=folder, |
| 1927 | + queue=self.files_queue, |
| 1928 | + service_provider=self._service_provider, |
| 1929 | + reporter=self.reporter, |
| 1930 | + report=self._report, |
| 1931 | + transform_version=self._transform_version, |
| 1932 | + ) |
| 1933 | + for _ in range(3) |
| 1934 | + ], |
| 1935 | + ) |
| 1936 | + |
| 1937 | + def get_or_create_folder(self, folder_name: str) -> FolderEntity: |
| 1938 | + if folder_name is None: |
| 1939 | + return self._root_folder |
| 1940 | + response = self._service_provider.folders.get_by_name( |
| 1941 | + self._project, folder_name |
| 1942 | + ) |
| 1943 | + if response.status != 404: |
| 1944 | + response.raise_for_status() |
| 1945 | + response = CreateFolderUseCase( |
| 1946 | + project=self._project, |
| 1947 | + folder=FolderEntity(name=folder_name), |
| 1948 | + service_provider=self._service_provider, |
| 1949 | + ).execute() |
| 1950 | + if response.errors: |
| 1951 | + raise AppException(response.errors) |
| 1952 | + else: |
| 1953 | + return response.data |
| 1954 | + |
| 1955 | + def attach_items( |
| 1956 | + self, folder: FolderEntity, item_names: List[str] |
| 1957 | + ) -> List[BaseItemEntity]: |
| 1958 | + """ |
| 1959 | + @param folder: |
| 1960 | + @param item_names: |
| 1961 | + @return: Attached and duplicated items names |
| 1962 | + """ |
| 1963 | + response = AttachItems( |
| 1964 | + reporter=self.reporter, |
| 1965 | + project=self._project, |
| 1966 | + folder=folder, |
| 1967 | + attachments=[ |
| 1968 | + AttachmentEntity(name=item_name, url="") for item_name in item_names |
| 1969 | + ], |
| 1970 | + service_provider=self._service_provider, |
| 1971 | + ).execute() |
| 1972 | + if response.errors: |
| 1973 | + raise AppException(response.errors) |
| 1974 | + attached_item_names = response.data[0] |
| 1975 | + return self.list_items(folder, attached_item_names) |
| 1976 | + |
| 1977 | + def set_defaults(self, annotation: dict): |
| 1978 | + annotation["metadata"]["lastAction"] = { |
| 1979 | + "email": self._project.team_id, |
| 1980 | + "timestamp": int(round(time.time() * 1000)), |
| 1981 | + } |
| 1982 | + return annotation |
| 1983 | + |
| 1984 | + @staticmethod |
| 1985 | + def serialize_folder_name(val): |
| 1986 | + special_characters = r"/\:*?”<>|\"" |
| 1987 | + replacement_char = "_" |
| 1988 | + translation_table = str.maketrans( |
| 1989 | + dict.fromkeys(special_characters, replacement_char) |
| 1990 | + ) |
| 1991 | + if str(val).strip(): |
| 1992 | + val = val.strip().translate(translation_table) |
| 1993 | + return val.lower() |
| 1994 | + |
| 1995 | + def execute(self): |
| 1996 | + if self.is_valid(): |
| 1997 | + serialized_original_folder_map = {} |
| 1998 | + failed, skipped, uploaded = [], [], [] |
| 1999 | + distributed_items: Dict[str, Dict[str, Any]] = defaultdict( |
| 2000 | + dict |
| 2001 | + ) # folder_id -> item_name -> annotation |
| 2002 | + valid_items_count = 0 |
| 2003 | + for annotation in self._annotations: |
| 2004 | + if self._validate_json(annotation): |
| 2005 | + folder_name = annotation["metadata"].get("folder_name", "").strip() |
| 2006 | + serialized_folder_name = self.serialize_folder_name(folder_name) |
| 2007 | + distributed_items[serialized_folder_name][ |
| 2008 | + annotation["metadata"]["name"] |
| 2009 | + ] = annotation |
| 2010 | + valid_items_count += 1 |
| 2011 | + if serialized_folder_name not in serialized_original_folder_map: |
| 2012 | + serialized_original_folder_map[ |
| 2013 | + serialized_folder_name |
| 2014 | + ] = folder_name |
| 2015 | + else: |
| 2016 | + failed.append(annotation) |
| 2017 | + logger.info( |
| 2018 | + f"Uploading {valid_items_count}/{len(self._annotations)} " |
| 2019 | + f"annotations to the project {self._project.name}." |
| 2020 | + ) |
| 2021 | + if not self._root_folder.is_root: |
| 2022 | + if len(distributed_items) > 1 or None not in distributed_items: |
| 2023 | + raise AppException( |
| 2024 | + "You can't include a folder when uploading from within a folder." |
| 2025 | + ) |
| 2026 | + for folder_name, name_annotation_map in distributed_items.items(): |
| 2027 | + folder = ( |
| 2028 | + self.get_or_create_folder( |
| 2029 | + serialized_original_folder_map[folder_name] |
| 2030 | + ) |
| 2031 | + if folder_name |
| 2032 | + else self._root_folder |
| 2033 | + ) |
| 2034 | + existing_items = self.list_items( |
| 2035 | + folder, list(name_annotation_map.keys()) |
| 2036 | + ) |
| 2037 | + items_to_create = name_annotation_map.keys() - { |
| 2038 | + i.name for i in existing_items |
| 2039 | + } |
| 2040 | + attached_items = self.attach_items(folder, list(items_to_create)) |
| 2041 | + name_item_map = { |
| 2042 | + **{i.name: i for i in existing_items}, |
| 2043 | + **{i.name: i for i in attached_items}, |
| 2044 | + } |
| 2045 | + |
| 2046 | + items_to_upload: List[ItemToUpload] = [] |
| 2047 | + for name, annotation in name_annotation_map.items(): |
| 2048 | + item = name_item_map.get(name) |
| 2049 | + if item: |
| 2050 | + annotation = self.set_defaults(annotation) |
| 2051 | + items_to_upload.append( |
| 2052 | + ItemToUpload(item=item, annotation_json=annotation) |
| 2053 | + ) |
| 2054 | + else: |
| 2055 | + skipped.append(name) |
| 2056 | + self.reporter.start_progress( |
| 2057 | + len(items_to_upload), |
| 2058 | + description=f"Uploading annotations to folder {folder.name}", |
| 2059 | + ) |
| 2060 | + try: |
| 2061 | + run_async(self.run_workers(folder, items_to_upload)) |
| 2062 | + except AppException: |
| 2063 | + logger.debug(traceback.format_exc()) |
| 2064 | + self._response.errors = AppException("Can't upload annotations.") |
| 2065 | + failed_annotations = self._report.failed_annotations |
| 2066 | + failed.extend(failed_annotations) |
| 2067 | + uploaded_annotations = list( |
| 2068 | + {i.item.name for i in items_to_upload} |
| 2069 | + - set(failed_annotations).union(skipped) |
| 2070 | + ) |
| 2071 | + workflow = self._service_provider.work_management.get_workflow( |
| 2072 | + self._project.workflow_id |
| 2073 | + ) |
| 2074 | + uploaded.extend(uploaded_annotations) |
| 2075 | + if workflow.is_system(): |
| 2076 | + if uploaded_annotations and not self._keep_status: |
| 2077 | + statuses_changed = set_annotation_statuses_in_progress( |
| 2078 | + service_provider=self._service_provider, |
| 2079 | + project=self._project, |
| 2080 | + folder=folder, |
| 2081 | + item_names=uploaded_annotations, |
| 2082 | + ) |
| 2083 | + if not statuses_changed: |
| 2084 | + self._response.errors = AppException( |
| 2085 | + "Failed to change status." |
| 2086 | + ) |
| 2087 | + |
| 2088 | + self.reporter.finish_progress() |
| 2089 | + self._report.failed_annotations = [] |
| 2090 | + |
| 2091 | + log_report(self._report) |
| 2092 | + |
| 2093 | + self._response.data = { |
| 2094 | + "succeeded": uploaded, |
| 2095 | + "failed": failed, |
| 2096 | + "skipped": skipped, |
| 2097 | + } |
| 2098 | + return self._response |
0 commit comments