33import shutil
44import unittest
55import zipfile
6- from unittest .mock import MagicMock , patch
76
8- import requests
97import responses
108
119from roboflow .adapters .rfapi import RoboflowError , get_search_export , start_search_export
@@ -80,28 +78,11 @@ def test_error_response(self):
8078 get_search_export (self .API_KEY , self .WORKSPACE , "exp1" )
8179
8280
83- class TestWorkspaceSearchExportValidation (unittest .TestCase ):
84- def _make_workspace (self ):
85- from roboflow .core .workspace import Workspace
86-
87- info = {
88- "workspace" : {
89- "name" : "Test" ,
90- "url" : "test-ws" ,
91- "projects" : [],
92- "members" : [],
93- }
94- }
95- return Workspace (info , api_key = "test_key" , default_workspace = "test-ws" , model_format = "yolov8" )
96-
97- def test_mutual_exclusion (self ):
98- ws = self ._make_workspace ()
99- with self .assertRaises (ValueError ) as ctx :
100- ws .search_export (query = "*" , dataset = "ds" , annotation_group = "ag" )
101- self .assertIn ("mutually exclusive" , str (ctx .exception ))
102-
81+ class TestWorkspaceSearchExport (unittest .TestCase ):
82+ API_KEY = "test_key"
83+ WORKSPACE = "test-ws"
84+ DOWNLOAD_URL = "https://example.com/export.zip"
10385
104- class TestWorkspaceSearchExportFlow (unittest .TestCase ):
10586 @staticmethod
10687 def _build_zip_bytes (files ):
10788 buffer = io .BytesIO ()
@@ -116,27 +97,33 @@ def _make_workspace(self):
11697 info = {
11798 "workspace" : {
11899 "name" : "Test" ,
119- "url" : "test-ws" ,
100+ "url" : self . WORKSPACE ,
120101 "projects" : [],
121102 "members" : [],
122103 }
123104 }
124- return Workspace (info , api_key = "test_key" , default_workspace = "test-ws" , model_format = "yolov8" )
105+ return Workspace (info , api_key = self . API_KEY , default_workspace = self . WORKSPACE , model_format = "yolov8" )
125106
126- @patch ("roboflow.core.workspace.rfapi" )
127- @patch ("roboflow.core.workspace.requests" )
128- def test_full_flow (self , mock_requests , mock_rfapi ):
129- ws = self ._make_workspace ()
107+ def _register_responses (self , zip_bytes = b"" , download_status = 200 ):
108+ export_url = f"{ API_URL } /{ self .WORKSPACE } /search/export?api_key={ self .API_KEY } "
109+ responses .add (responses .POST , export_url , json = {"success" : True , "link" : "exp_abc" }, status = 202 )
110+
111+ poll_url = f"{ API_URL } /{ self .WORKSPACE } /search/export/exp_abc?api_key={ self .API_KEY } "
112+ responses .add (responses .GET , poll_url , json = {"ready" : True , "link" : self .DOWNLOAD_URL }, status = 200 )
130113
131- mock_rfapi .start_search_export .return_value = "exp_abc"
132- mock_rfapi .get_search_export .return_value = {"ready" : True , "link" : "https://example.com/export.zip" }
114+ responses .add (responses .GET , self .DOWNLOAD_URL , body = zip_bytes , status = download_status )
133115
116+ def test_mutual_exclusion (self ):
117+ ws = self ._make_workspace ()
118+ with self .assertRaises (ValueError ) as ctx :
119+ ws .search_export (query = "*" , dataset = "ds" , annotation_group = "ag" )
120+ self .assertIn ("mutually exclusive" , str (ctx .exception ))
121+
122+ @responses .activate
123+ def test_full_flow (self ):
124+ ws = self ._make_workspace ()
134125 fake_zip = self ._build_zip_bytes ({"images/sample.jpg" : "fake-image-data" })
135- mock_response = MagicMock ()
136- mock_response .headers = {"content-length" : str (len (fake_zip ))}
137- mock_response .raise_for_status .return_value = None
138- mock_response .iter_content .return_value = [fake_zip [:1024 ], fake_zip [1024 :]]
139- mock_requests .get .return_value = mock_response
126+ self ._register_responses (fake_zip )
140127
141128 location = "./test_search_export_output"
142129 try :
@@ -146,84 +133,25 @@ def test_full_flow(self, mock_requests, mock_rfapi):
146133 self .assertEqual (result , expected_location )
147134 self .assertTrue (os .path .exists (os .path .join (expected_location , "images" , "sample.jpg" )))
148135 self .assertFalse (os .path .exists (os .path .join (expected_location , "roboflow.zip" )))
149-
150- mock_rfapi .start_search_export .assert_called_once_with (
151- api_key = "test_key" ,
152- workspace_url = "test-ws" ,
153- query = "*" ,
154- format = "coco" ,
155- dataset = None ,
156- annotation_group = None ,
157- name = None ,
158- )
159- mock_rfapi .get_search_export .assert_called_once_with (
160- api_key = "test_key" ,
161- workspace_url = "test-ws" ,
162- export_id = "exp_abc" ,
163- )
164- mock_response .raise_for_status .assert_called_once ()
165- mock_response .iter_content .assert_called_once_with (chunk_size = 1024 )
166136 finally :
167137 if os .path .exists (location ):
168138 shutil .rmtree (location )
169139
170- @patch ("roboflow.core.workspace.rfapi" )
171- @patch ("roboflow.core.workspace.requests" )
172- def test_full_flow_without_content_length_still_streams (self , mock_requests , mock_rfapi ):
173- ws = self ._make_workspace ()
174-
175- mock_rfapi .start_search_export .return_value = "exp_abc"
176- mock_rfapi .get_search_export .return_value = {"ready" : True , "link" : "https://example.com/export.zip" }
177-
178- fake_zip = self ._build_zip_bytes ({"annotations/instances.json" : "{}" })
179- mock_response = MagicMock ()
180- mock_response .headers = {}
181- mock_response .raise_for_status .return_value = None
182- mock_response .iter_content .return_value = [fake_zip ]
183- mock_requests .get .return_value = mock_response
184-
185- location = "./test_search_export_no_content_length"
186- try :
187- result = ws .search_export (query = "*" , format = "coco" , location = location )
188- expected_location = os .path .abspath (location )
189- self .assertEqual (result , expected_location )
190- self .assertTrue (os .path .exists (os .path .join (expected_location , "annotations" , "instances.json" )))
191- mock_response .iter_content .assert_called_once_with (chunk_size = 1024 )
192- finally :
193- if os .path .exists (location ):
194- shutil .rmtree (location )
195-
196- @patch ("roboflow.core.workspace.rfapi" )
197- @patch ("roboflow.core.workspace.requests" )
198- def test_download_http_error_raises_roboflow_error (self , mock_requests , mock_rfapi ):
140+ @responses .activate
141+ def test_download_http_error (self ):
199142 ws = self ._make_workspace ()
143+ self ._register_responses (download_status = 403 )
200144
201- mock_rfapi .start_search_export .return_value = "exp_abc"
202- mock_rfapi .get_search_export .return_value = {"ready" : True , "link" : "https://example.com/export.zip" }
203-
204- mock_response = MagicMock ()
205- mock_response .raise_for_status .side_effect = requests .HTTPError ("403 Client Error" )
206- mock_requests .get .return_value = mock_response
207-
208- with self .assertRaises (RoboflowError ) as context :
145+ with self .assertRaises (RoboflowError ) as ctx :
209146 ws .search_export (query = "*" , format = "coco" , location = "./test_search_export_http_error" )
210147
211- self .assertIn ("Failed to download search export" , str (context .exception ))
148+ self .assertIn ("Failed to download search export" , str (ctx .exception ))
212149
213- @patch ("roboflow.core.workspace.rfapi" )
214- @patch ("roboflow.core.workspace.requests" )
215- def test_no_extract (self , mock_requests , mock_rfapi ):
150+ @responses .activate
151+ def test_no_extract (self ):
216152 ws = self ._make_workspace ()
217-
218- mock_rfapi .start_search_export .return_value = "exp_abc"
219- mock_rfapi .get_search_export .return_value = {"ready" : True , "link" : "https://example.com/export.zip" }
220-
221153 fake_zip = self ._build_zip_bytes ({"images/sample.jpg" : "fake-image-data" })
222- mock_response = MagicMock ()
223- mock_response .headers = {"content-length" : str (len (fake_zip ))}
224- mock_response .raise_for_status .return_value = None
225- mock_response .iter_content .return_value = [fake_zip ]
226- mock_requests .get .return_value = mock_response
154+ self ._register_responses (fake_zip )
227155
228156 location = "./test_search_export_no_extract"
229157 try :
0 commit comments