|
| 1 | +import io |
1 | 2 | from datetime import datetime |
2 | 3 | from unittest import TestCase |
| 4 | +from unittest.mock import MagicMock, patch |
3 | 5 |
|
4 | 6 | import requests_mock |
5 | 7 |
|
6 | 8 | from csfunctions import MetaData, Service |
7 | 9 | from csfunctions.service.base import BaseService, Unauthorized |
| 10 | +from csfunctions.service.file_upload import FileUploadService |
| 11 | +from csfunctions.service.file_upload_schemas import PresignedWriteUrls |
8 | 12 |
|
9 | 13 |
|
10 | 14 | class TestNumberGeneratorService(TestCase): |
@@ -103,3 +107,191 @@ def test_request(self, mock_request: requests_mock.Mocker): |
103 | 107 | self.assertEqual("GET", last_request.method) |
104 | 108 | self.assertEqual(f"Bearer {service_token}", last_request.headers["Authorization"]) |
105 | 109 | self.assertEqual({}, response) |
| 110 | + |
| 111 | + |
| 112 | +class TestFileUploadService(TestCase): |
| 113 | + def setUp(self): |
| 114 | + self.metadata = MetaData.model_validate( |
| 115 | + { |
| 116 | + "request_id": "req-1", |
| 117 | + "app_lang": "en", |
| 118 | + "app_user": "tester", |
| 119 | + "request_datetime": datetime(2000, 1, 1), |
| 120 | + "transaction_id": "txn-1", |
| 121 | + "instance_url": "https://instance.contact-cloud.com", |
| 122 | + "service_url": "https://some_service_url", |
| 123 | + "service_token": "some_service_token", |
| 124 | + "db_service_url": None, |
| 125 | + } |
| 126 | + ) |
| 127 | + self.service = FileUploadService(metadata=self.metadata) |
| 128 | + |
| 129 | + def test_create_new_file(self): |
| 130 | + # Patch self.service.request to return a valid response |
| 131 | + with patch.object(self.service, "request", return_value={"file_object_id": "file123"}) as mock_request: |
| 132 | + file_id = self.service._create_new_file("test.txt", "parent1", "tester") |
| 133 | + self.assertEqual(file_id, "file123") |
| 134 | + mock_request.assert_called_once() |
| 135 | + args, kwargs = mock_request.call_args |
| 136 | + self.assertIn("endpoint", kwargs) |
| 137 | + self.assertEqual(kwargs["endpoint"], "/file_upload/create") |
| 138 | + self.assertEqual(kwargs["method"], "POST") |
| 139 | + self.assertIn("json", kwargs) |
| 140 | + self.assertEqual(kwargs["json"]["filename"], "test.txt") |
| 141 | + self.assertEqual(kwargs["json"]["parent_object_id"], "parent1") |
| 142 | + |
| 143 | + def test_get_presigned_write_urls(self): |
| 144 | + mock_response = { |
| 145 | + "blob_id": "blob123", |
| 146 | + "urls": ["https://upload.url/1", "https://upload.url/2"], |
| 147 | + "chunksize": 1024, |
| 148 | + "headers": {"Authorization": "Bearer token"}, |
| 149 | + } |
| 150 | + with patch.object(self.service, "request", return_value=mock_response) as mock_request: |
| 151 | + result = self.service._get_presigned_write_urls("file123", 2048, "lockid", "tester") |
| 152 | + self.assertEqual(result.blob_id, "blob123") |
| 153 | + self.assertEqual(result.chunksize, 1024) |
| 154 | + self.assertEqual(result.urls, ["https://upload.url/1", "https://upload.url/2"]) |
| 155 | + mock_request.assert_called_once() |
| 156 | + args, kwargs = mock_request.call_args |
| 157 | + self.assertIn("endpoint", kwargs) |
| 158 | + self.assertTrue("generate_presigned_url" in kwargs["endpoint"]) |
| 159 | + self.assertEqual(kwargs["method"], "POST") |
| 160 | + self.assertEqual(kwargs["json"]["filesize"], 2048) |
| 161 | + self.assertEqual(kwargs["json"]["lock_id"], "lockid") |
| 162 | + |
| 163 | + def test_upload_from_stream(self): |
| 164 | + # Test with multiple URLs and ETags |
| 165 | + presigned = PresignedWriteUrls( |
| 166 | + blob_id="blob123", |
| 167 | + urls=["https://upload.url/1", "https://upload.url/2"], |
| 168 | + chunksize=2, |
| 169 | + headers={"Authorization": "Bearer token"}, |
| 170 | + ) |
| 171 | + # Simulate a file-like object with 4 bytes, so 2 chunks |
| 172 | + stream = io.BytesIO(b"abcd") |
| 173 | + # Each call to requests.put returns a different ETag |
| 174 | + mock_response1 = MagicMock() |
| 175 | + mock_response1.headers = {"ETag": "etag1"} |
| 176 | + mock_response1.raise_for_status = MagicMock() |
| 177 | + mock_response2 = MagicMock() |
| 178 | + mock_response2.headers = {"ETag": "etag2"} |
| 179 | + mock_response2.raise_for_status = MagicMock() |
| 180 | + with patch("requests.put", side_effect=[mock_response1, mock_response2]) as mock_put: |
| 181 | + updated, sha256 = self.service._upload_from_stream(presigned, stream) |
| 182 | + self.assertEqual(updated.etags, ["etag1", "etag2"]) |
| 183 | + self.assertEqual(len(sha256), 64) # sha256 hex length |
| 184 | + import hashlib |
| 185 | + |
| 186 | + expected_hash = hashlib.sha256(b"abcd").hexdigest() |
| 187 | + self.assertEqual(sha256, expected_hash) |
| 188 | + self.assertEqual(mock_put.call_count, 2) |
| 189 | + # Check each call |
| 190 | + call_args_list = mock_put.call_args_list |
| 191 | + self.assertEqual(call_args_list[0][0][0], "https://upload.url/1") |
| 192 | + self.assertEqual(call_args_list[0][1]["data"], b"ab") |
| 193 | + self.assertEqual(call_args_list[1][0][0], "https://upload.url/2") |
| 194 | + self.assertEqual(call_args_list[1][1]["data"], b"cd") |
| 195 | + for call_args in call_args_list: |
| 196 | + self.assertEqual(call_args[1]["headers"], {"Authorization": "Bearer token"}) |
| 197 | + |
| 198 | + def test_get_stream_size(self): |
| 199 | + stream = io.BytesIO(b"abcde") |
| 200 | + size = self.service._get_stream_size(stream) |
| 201 | + self.assertEqual(size, 5) |
| 202 | + # Check that stream position is unchanged |
| 203 | + self.assertEqual(stream.tell(), 0) |
| 204 | + |
| 205 | + def test_complete_upload(self): |
| 206 | + presigned = PresignedWriteUrls( |
| 207 | + blob_id="blob123", urls=["https://upload.url/1"], chunksize=4, headers={"Authorization": "Bearer token"} |
| 208 | + ) |
| 209 | + with patch.object(self.service, "request", return_value=None) as mock_request: |
| 210 | + self.service._complete_upload( |
| 211 | + file_object_id="file123", |
| 212 | + filesize=4, |
| 213 | + lock_id="lockid", |
| 214 | + presigned_urls=presigned, |
| 215 | + persno="tester", |
| 216 | + sha256="deadbeef", |
| 217 | + ) |
| 218 | + mock_request.assert_called_once() |
| 219 | + args, kwargs = mock_request.call_args |
| 220 | + self.assertIn("endpoint", kwargs) |
| 221 | + self.assertTrue("complete" in kwargs["endpoint"]) |
| 222 | + self.assertEqual(kwargs["method"], "POST") |
| 223 | + self.assertEqual(kwargs["json"].get("file_object_id", "file123"), "file123") |
| 224 | + self.assertEqual(kwargs["json"]["sha256"], "deadbeef") |
| 225 | + |
| 226 | + def test_upload_new_file(self): |
| 227 | + # Patch _create_new_file and upload_file_content |
| 228 | + with ( |
| 229 | + patch.object(self.service, "_create_new_file", return_value="file123") as mock_create, |
| 230 | + patch.object(self.service, "upload_file_content", return_value=None) as mock_upload, |
| 231 | + ): |
| 232 | + stream = io.BytesIO(b"abc") |
| 233 | + file_id = self.service.upload_new_file("parent1", "test.txt", stream) |
| 234 | + self.assertEqual(file_id, "file123") |
| 235 | + mock_create.assert_called_once_with( |
| 236 | + filename="test.txt", |
| 237 | + parent_object_id="parent1", |
| 238 | + persno="tester", |
| 239 | + check_access=True, |
| 240 | + ) |
| 241 | + mock_upload.assert_called_once() |
| 242 | + args, kwargs = mock_upload.call_args |
| 243 | + self.assertEqual(kwargs["file_object_id"], "file123") |
| 244 | + self.assertEqual(kwargs["stream"].getvalue(), b"abc") |
| 245 | + |
| 246 | + def test_upload_file_content(self): |
| 247 | + # Patch internal methods to isolate upload_file_content logic |
| 248 | + with ( |
| 249 | + patch.object(self.service, "_get_stream_size", return_value=4) as mock_size, |
| 250 | + patch.object(self.service, "_get_presigned_write_urls") as mock_presigned, |
| 251 | + patch.object(self.service, "_upload_from_stream") as mock_upload, |
| 252 | + patch.object(self.service, "_complete_upload") as mock_complete, |
| 253 | + ): |
| 254 | + # Setup mocks |
| 255 | + mock_presigned.return_value = PresignedWriteUrls( |
| 256 | + blob_id="blob123", |
| 257 | + urls=["https://upload.url/1", "https://upload.url/2"], |
| 258 | + chunksize=2, |
| 259 | + headers={"Authorization": "Bearer token"}, |
| 260 | + ) |
| 261 | + mock_upload.return_value = ( |
| 262 | + PresignedWriteUrls( |
| 263 | + blob_id="blob123", |
| 264 | + urls=["https://upload.url/1", "https://upload.url/2"], |
| 265 | + chunksize=2, |
| 266 | + headers={"Authorization": "Bearer token"}, |
| 267 | + etags=["etag1", "etag2"], |
| 268 | + ), |
| 269 | + "deadbeef", |
| 270 | + ) |
| 271 | + stream = io.BytesIO(b"abcd") |
| 272 | + # Call method |
| 273 | + self.service.upload_file_content( |
| 274 | + file_object_id="file123", |
| 275 | + stream=stream, |
| 276 | + persno="tester", |
| 277 | + check_access=True, |
| 278 | + filesize=None, |
| 279 | + delete_derived_files=False, |
| 280 | + ) |
| 281 | + mock_size.assert_called_once_with(stream) |
| 282 | + mock_presigned.assert_called_once_with( |
| 283 | + file_object_id="file123", |
| 284 | + filesize=4, |
| 285 | + lock_id=mock_presigned.call_args[1]["lock_id"], |
| 286 | + persno="tester", |
| 287 | + check_access=True, |
| 288 | + ) |
| 289 | + mock_upload.assert_called_once_with(presigned_urls=mock_presigned.return_value, stream=stream) |
| 290 | + mock_complete.assert_called_once() |
| 291 | + args, kwargs = mock_complete.call_args |
| 292 | + self.assertEqual(kwargs["file_object_id"], "file123") |
| 293 | + self.assertEqual(kwargs["filesize"], 4) |
| 294 | + self.assertEqual(kwargs["presigned_urls"].etags, ["etag1", "etag2"]) |
| 295 | + self.assertEqual(kwargs["persno"], "tester") |
| 296 | + self.assertEqual(kwargs["sha256"], "deadbeef") |
| 297 | + self.assertEqual(kwargs["delete_derived_files"], False) |
0 commit comments