Skip to content

Commit 8f3cf8e

Browse files
Narek MkhitaryanNarek Mkhitaryan
authored andcommitted
added Databricks support in attach_items_from_integrated_storage
1 parent 98d5876 commit 8f3cf8e

File tree

6 files changed

+205
-34
lines changed

6 files changed

+205
-34
lines changed

src/superannotate/lib/app/interface/sdk_interface.py

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2919,32 +2919,80 @@ def attach_items_from_integrated_storage(
29192919
project: NotEmptyStr,
29202920
integration: Union[NotEmptyStr, IntegrationEntity],
29212921
folder_path: Optional[NotEmptyStr] = None,
2922+
*,
2923+
query: Optional[NotEmptyStr] = None,
2924+
item_name_column: Optional[NotEmptyStr] = None,
2925+
custom_item_name: Optional[NotEmptyStr] = None,
2926+
component_mapping: Optional[Dict[str, str]] = None,
29222927
):
2923-
"""Link images from integrated external storage to SuperAnnotate.
2928+
"""Link images from integrated external storage to SuperAnnotate from AWS, GCP, Azure, Databricks.
29242929
29252930
:param project: project name or folder path where items should be attached (e.g., “project1/folder1”).
29262931
:type project: str
29272932
2928-
:param integration: existing integration name or metadata dict to pull items from.
2929-
Mandatory keys in integration metadata’s dict is “name”.
2933+
:param integration: The existing integration name or metadata dict to pull items from.
2934+
Mandatory keys in integration metadata’s dict is “name”.
29302935
:type integration: str or dict
29312936
29322937
:param folder_path: Points to an exact folder/directory within given storage.
2933-
If None, items are fetched from the root directory.
2938+
If None, items are fetched from the root directory.
29342939
:type folder_path: str
2940+
2941+
:param query: (Only for Databricks). The SQL query to retrieve specific columns from Databricks.
2942+
If provided, the function will execute the query and use the results for mapping and uploading.
2943+
:type query: Optional[str]
2944+
2945+
:param item_name_column: (Only for Databricks). The column name from the SQL query whose values
2946+
will be used as item names. If this is provided, custom_item_name cannot be used.
2947+
The column must exist in the query result.
2948+
:type item_name_column: Optional[str]
2949+
2950+
:param custom_item_name: (Only for Databricks). A manually defined prefix for item names.
2951+
A random 10-character suffix will be appended to ensure uniqueness.
2952+
If this is provided, item_name_column cannot be used.
2953+
:type custom_item_name: Optional[str]
2954+
2955+
:param component_mapping: (Only for Databricks). A dictionary mapping Databricks
2956+
columns to SuperAnnotate component IDs.
2957+
:type component_mapping: Optional[dict]
2958+
2959+
2960+
Request Example:
2961+
::
2962+
2963+
client.attach_items_from_integrated_storage(
2964+
project="project_name",
2965+
integration="databricks_integration",
2966+
query="SELECT * FROM integration_data LIMIT 10",
2967+
item_name_column="prompt",
2968+
component_mapping={
2969+
"category": "_item_category",
2970+
"prompt_id": "id",
2971+
"prompt": "prompt"
2972+
}
2973+
)
2974+
29352975
"""
29362976
project, folder = self.controller.get_project_folder_by_path(project)
29372977
_integration = None
29382978
if isinstance(integration, str):
29392979
integration = IntegrationEntity(name=integration)
29402980
for i in self.controller.integrations.list().data:
2941-
if integration.name == i.name:
2981+
if integration.name.lower() == i.name.lower():
29422982
_integration = i
29432983
break
29442984
else:
29452985
raise AppException("Integration not found.")
2986+
29462987
response = self.controller.integrations.attach_items(
2947-
project, folder, _integration, folder_path
2988+
project=project,
2989+
folder=folder,
2990+
integration=_integration,
2991+
folder_path=folder_path,
2992+
query=query,
2993+
item_name_column=item_name_column,
2994+
custom_item_name=custom_item_name,
2995+
component_mapping=component_mapping,
29482996
)
29492997
if response.errors:
29502998
raise AppException(response.errors)

src/superannotate/lib/core/serviceproviders.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ def attach_items(
593593
folder: entities.FolderEntity,
594594
integration: entities.IntegrationEntity,
595595
folder_name: str = None,
596+
options: Dict[str, str] = None,
596597
) -> ServiceResponse:
597598
raise NotImplementedError
598599

Lines changed: 119 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
from typing import List
1+
from typing import Dict
2+
from typing import Optional
23

4+
from lib.core.conditions import Condition
5+
from lib.core.conditions import CONDITION_EQ as EQ
36
from lib.core.entities import FolderEntity
47
from lib.core.entities import IntegrationEntity
58
from lib.core.entities import ProjectEntity
9+
from lib.core.entities.integrations import IntegrationTypeEnum
10+
from lib.core.enums import ProjectType
611
from lib.core.exceptions import AppException
712
from lib.core.reporter import Reporter
813
from lib.core.response import Response
@@ -33,46 +38,133 @@ def __init__(
3338
service_provider: BaseServiceProvider,
3439
integration: IntegrationEntity,
3540
folder_path: str = None,
41+
query: Optional[str] = None,
42+
item_name_column: Optional[str] = None,
43+
custom_item_name: Optional[str] = None,
44+
component_mapping: Optional[Dict[str, str]] = None,
3645
):
37-
3846
super().__init__(reporter)
3947
self._project = project
4048
self._folder = folder
4149
self._integration = integration
4250
self._service_provider = service_provider
4351
self._folder_path = folder_path
52+
self._query = query
53+
self._item_name_column = item_name_column
54+
self._custom_item_name = custom_item_name
55+
self._component_mapping = component_mapping
4456

4557
@property
4658
def _upload_path(self):
4759
return f"{self._project.name}{f'/{self._folder.name}' if self._folder.name != 'root' else ''}"
4860

4961
def execute(self) -> Response:
50-
integrations: List[
51-
IntegrationEntity
52-
] = self._service_provider.integrations.list().data.integrations
53-
integration_name_lower = self._integration.name.lower()
54-
integration = next(
55-
(i for i in integrations if i.name.lower() == integration_name_lower), None
56-
)
57-
if integration:
58-
self.reporter.log_info(
59-
"Attaching file(s) from "
60-
f"{integration.root}{f'/{self._folder_path}' if self._folder_path else ''} "
61-
f"to {self._upload_path}. This may take some time."
62+
# TODO add support in next iterations
63+
if self._integration.type == IntegrationTypeEnum.SNOWFLAKE:
64+
raise AppException(
65+
"Attaching items is not supported with Snowflake integration."
6266
)
63-
attached = self._service_provider.integrations.attach_items(
64-
project=self._project,
65-
folder=self._folder,
66-
integration=integration,
67-
folder_name=self._folder_path,
67+
68+
options = {} # using only for Databricks and Snowflake
69+
multimodal_integrations = [
70+
IntegrationTypeEnum.DATABRICKS,
71+
IntegrationTypeEnum.SNOWFLAKE,
72+
]
73+
if self._integration.type in multimodal_integrations:
74+
if self._project.type != ProjectType.MULTIMODAL:
75+
raise AppException(
76+
f"{self._integration.name} integration is supported only for Multimodal projects."
77+
)
78+
79+
if self._item_name_column and self._custom_item_name:
80+
raise AppException(
81+
"‘item_name_column and custom_item_name cannot be used simultaneously."
82+
)
83+
84+
if not self._item_name_column and not self._custom_item_name:
85+
raise AppException(
86+
"Either item_name_column or custom_item_name is required."
87+
)
88+
89+
if not all((self._query, self._component_mapping)):
90+
raise AppException(
91+
f"{self._integration.name} integration requires both a query and component_mapping."
92+
)
93+
94+
category_setting: bool = bool(
95+
next(
96+
(
97+
setting.value
98+
for setting in self._service_provider.projects.list_settings(
99+
self._project
100+
).data
101+
if setting.attribute == "CategorizeItems"
102+
),
103+
None,
104+
)
68105
)
69-
if not attached:
70-
self._response.errors = AppException(
71-
f"An error occurred for {self._integration.name}. Please make sure: "
72-
"\n - The bucket exists."
73-
"\n - The connection is valid."
74-
"\n - The path to a specified directory is correct."
106+
if (
107+
not category_setting
108+
and "_item_category" in self._component_mapping.values()
109+
):
110+
raise AppException(
111+
"Item Category must be enabled for a project to use _item_category"
75112
)
76-
else:
77-
self._response.errors = AppException("Integration not found.")
113+
114+
item_category_column = next(
115+
(
116+
k
117+
for k, v in self._component_mapping.items()
118+
if v == "_item_category"
119+
),
120+
None,
121+
)
122+
if item_category_column:
123+
self._component_mapping.pop(item_category_column)
124+
125+
sa_components = [
126+
c.name.lower()
127+
for c in self._service_provider.annotation_classes.list(
128+
condition=Condition("project_id", self._project.id, EQ)
129+
).data
130+
]
131+
132+
for i in self._component_mapping.values():
133+
if i.lower() not in sa_components:
134+
raise AppException(
135+
f"Component mapping contains invalid component ID: `{i}`"
136+
)
137+
138+
options["query"] = self._query
139+
options["item_name"] = (
140+
self._custom_item_name
141+
if self._custom_item_name
142+
else self._item_name_column
143+
)
144+
options["prefix"] = True if self._custom_item_name else False
145+
options["column_class_map"] = self._component_mapping
146+
if item_category_column:
147+
options["item_category"] = item_category_column
148+
149+
self.reporter.log_info(
150+
"Attaching file(s) from "
151+
f"{self._integration.root}{f'/{self._folder_path}' if self._folder_path else ''} "
152+
f"to {self._upload_path}. This may take some time."
153+
)
154+
attache_response = self._service_provider.integrations.attach_items(
155+
project=self._project,
156+
folder=self._folder,
157+
integration=self._integration,
158+
folder_name=self._folder_path
159+
if self._integration.type not in multimodal_integrations
160+
else None,
161+
options=options if options else None,
162+
)
163+
if not attache_response.ok:
164+
self._response.errors = AppException(
165+
f"An error occurred for {self._integration.name}. Please make sure: "
166+
"\n - The bucket exists."
167+
"\n - The connection is valid."
168+
"\n - The path to a specified directory is correct."
169+
)
78170
return self._response

src/superannotate/lib/infrastructure/controller.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1138,7 +1138,11 @@ def attach_items(
11381138
project: ProjectEntity,
11391139
folder: FolderEntity,
11401140
integration: IntegrationEntity,
1141-
folder_path: str,
1141+
folder_path: str = None,
1142+
query: Optional[str] = None,
1143+
item_name_column: Optional[str] = None,
1144+
custom_item_name: Optional[str] = None,
1145+
component_mapping: Optional[Dict[str, str]] = None,
11421146
):
11431147
use_case = usecases.AttachIntegrations(
11441148
reporter=Reporter(),
@@ -1147,6 +1151,10 @@ def attach_items(
11471151
folder=folder,
11481152
integration=integration,
11491153
folder_path=folder_path,
1154+
query=query,
1155+
item_name_column=item_name_column,
1156+
custom_item_name=custom_item_name,
1157+
component_mapping=component_mapping,
11501158
)
11511159
return use_case.execute()
11521160

src/superannotate/lib/infrastructure/services/integration.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Dict
2+
13
from lib.core import entities
24
from lib.core.service_types import IntegrationListResponse
35
from lib.core.serviceproviders import BaseIntegrationService
@@ -23,6 +25,7 @@ def attach_items(
2325
folder: entities.FolderEntity,
2426
integration: entities.IntegrationEntity,
2527
folder_name: str = None,
28+
options: Dict[str, str] = None,
2629
):
2730
data = {
2831
"team_id": project.team_id,
@@ -32,6 +35,8 @@ def attach_items(
3235
}
3336
if folder_name:
3437
data["customer_folder_name"] = folder_name
38+
if options:
39+
data["options"] = options
3540
return self.client.request(
3641
self.URL_ATTACH_INTEGRATIONS.format(project.team_id), "post", data=data
3742
)

tests/test_.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from superannotate import SAClient
2+
3+
sa = SAClient()
4+
5+
def test_sa():
6+
sa.attach_items_from_integrated_storage(
7+
"databricks",
8+
"Nshantest",
9+
query="SELECT * FROM sa_db_integration.sadataset.sa_integration_data_to_sa LIMIT 10",
10+
# item_name_column="category",
11+
custom_item_name="test_SDK",
12+
component_mapping={
13+
"prompt_id": "_item_category",
14+
"category": "category",
15+
"prompt": "prompt"
16+
}
17+
)

0 commit comments

Comments
 (0)