Skip to content

Commit 22a12e4

Browse files
fix(timeout): respect insertion timeout from config (#2048)
Co-authored-by: Joseph Soultanis <joe@kapa.ai>
1 parent 5a4b983 commit 22a12e4

3 files changed

Lines changed: 103 additions & 2 deletions

File tree

mock_tests/conftest.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,85 @@ def metadata_capture_collection(
281281
return weaviate_client.collections.use("MetadataCaptureCollection"), service
282282

283283

284+
BATCH_INSERT_TIMEOUT = 5
285+
286+
287+
class MockBatchDeadlineCaptureWeaviateService(weaviate_pb2_grpc.WeaviateServicer):
288+
captured_time_remaining: float = -1.0
289+
290+
def BatchObjects(
291+
self, request: batch_pb2.BatchObjectsRequest, context: grpc.ServicerContext
292+
) -> batch_pb2.BatchObjectsReply:
293+
self.captured_time_remaining = context.time_remaining()
294+
return batch_pb2.BatchObjectsReply()
295+
296+
297+
@pytest.fixture(scope="function")
298+
def weaviate_batch_insert_timeout_client(
299+
weaviate_mock: HTTPServer, start_grpc_server: grpc.Server
300+
) -> Generator[weaviate.WeaviateClient, None, None]:
301+
weaviate_mock.expect_request(f"/v1/schema/{mock_class['class']}").respond_with_json(mock_class)
302+
client = weaviate.connect_to_local(
303+
host=MOCK_IP,
304+
port=MOCK_PORT,
305+
grpc_port=MOCK_PORT_GRPC,
306+
additional_config=weaviate.classes.init.AdditionalConfig(
307+
timeout=weaviate.classes.init.Timeout(insert=BATCH_INSERT_TIMEOUT)
308+
),
309+
)
310+
yield client
311+
client.close()
312+
313+
314+
@pytest.fixture(scope="function")
315+
def batch_deadline_capture_collection(
316+
weaviate_batch_insert_timeout_client: weaviate.WeaviateClient,
317+
start_grpc_server: grpc.Server,
318+
) -> tuple[weaviate.collections.Collection, MockBatchDeadlineCaptureWeaviateService]:
319+
service = MockBatchDeadlineCaptureWeaviateService()
320+
weaviate_pb2_grpc.add_WeaviateServicer_to_server(service, start_grpc_server)
321+
return (
322+
weaviate_batch_insert_timeout_client.collections.use(mock_class["class"]),
323+
service,
324+
)
325+
326+
327+
BATCH_INSERT_TIMEOUT_SHORT = 0.5
328+
329+
330+
@pytest.fixture(scope="function")
331+
def weaviate_batch_insert_timeout_short_client(
332+
weaviate_mock: HTTPServer, start_grpc_server: grpc.Server
333+
) -> Generator[weaviate.WeaviateClient, None, None]:
334+
weaviate_mock.expect_request(f"/v1/schema/{mock_class['class']}").respond_with_json(mock_class)
335+
client = weaviate.connect_to_local(
336+
host=MOCK_IP,
337+
port=MOCK_PORT,
338+
grpc_port=MOCK_PORT_GRPC,
339+
additional_config=weaviate.classes.init.AdditionalConfig(
340+
timeout=weaviate.classes.init.Timeout(insert=BATCH_INSERT_TIMEOUT_SHORT)
341+
),
342+
)
343+
yield client
344+
client.close()
345+
346+
347+
@pytest.fixture(scope="function")
348+
def batch_slow_response_collection(
349+
weaviate_batch_insert_timeout_short_client: weaviate.WeaviateClient,
350+
start_grpc_server: grpc.Server,
351+
) -> weaviate.collections.Collection:
352+
class MockWeaviateService(weaviate_pb2_grpc.WeaviateServicer):
353+
def BatchObjects(
354+
self, request: batch_pb2.BatchObjectsRequest, context: grpc.ServicerContext
355+
) -> batch_pb2.BatchObjectsReply:
356+
time.sleep(BATCH_INSERT_TIMEOUT_SHORT + 1)
357+
return batch_pb2.BatchObjectsReply()
358+
359+
weaviate_pb2_grpc.add_WeaviateServicer_to_server(MockWeaviateService(), start_grpc_server)
360+
return weaviate_batch_insert_timeout_short_client.collections.use(mock_class["class"])
361+
362+
284363
class MockRetriesWeaviateService(weaviate_pb2_grpc.WeaviateServicer):
285364
search_count = 0
286365
tenants_count = 0

mock_tests/test_timeouts.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import weaviate
44
from weaviate.exceptions import WeaviateQueryError, WeaviateTimeoutError
55

6+
from .conftest import BATCH_INSERT_TIMEOUT, MockBatchDeadlineCaptureWeaviateService
7+
68

79
def test_timeout_rest_query(timeouts_collection: weaviate.collections.Collection):
810
with pytest.raises(WeaviateTimeoutError):
@@ -24,3 +26,24 @@ def test_timeout_grpc_insert(timeouts_collection: weaviate.collections.Collectio
2426
with pytest.raises(WeaviateQueryError) as recwarn:
2527
timeouts_collection.data.insert_many([{"what": "ever"}])
2628
assert "DEADLINE_EXCEEDED" in str(recwarn)
29+
30+
31+
def test_batch_fixed_size_deadline_uses_insert_timeout(
32+
batch_deadline_capture_collection: tuple[
33+
weaviate.collections.Collection, MockBatchDeadlineCaptureWeaviateService
34+
],
35+
):
36+
collection, service = batch_deadline_capture_collection
37+
with collection.batch.fixed_size(batch_size=1) as batch:
38+
batch.add_object({"what": "ever"})
39+
assert abs(service.captured_time_remaining - BATCH_INSERT_TIMEOUT) < 1
40+
41+
42+
def test_batch_fixed_size_times_out_when_insert_exceeded(
43+
batch_slow_response_collection: weaviate.collections.Collection,
44+
):
45+
with batch_slow_response_collection.batch.fixed_size(batch_size=1) as batch:
46+
batch.add_object({"what": "ever"})
47+
failed = batch_slow_response_collection.batch.failed_objects
48+
assert len(failed) == 1
49+
assert "Deadline Exceeded" in failed[0].message

weaviate/collections/batch/base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
TBatchInput = TypeVar("TBatchInput")
5555
TBatchReturn = TypeVar("TBatchReturn")
5656
MAX_CONCURRENT_REQUESTS = 10
57-
DEFAULT_REQUEST_TIMEOUT = 180
5857
CONCURRENT_REQUESTS_DYNAMIC_VECTORIZER = 2
5958
BATCH_TIME_TARGET = 10
6059
VECTORIZER_BATCHING_STEP_SIZE = 48 # cohere max batch size is 96
@@ -612,7 +611,7 @@ def __send_batch(
612611
self.__batch_grpc.objects(
613612
connection=self.__connection,
614613
objects=[obj._to_internal() for obj in objs],
615-
timeout=DEFAULT_REQUEST_TIMEOUT,
614+
timeout=self.__connection.timeout_config.insert,
616615
max_retries=MAX_RETRIES,
617616
)
618617
)

0 commit comments

Comments
 (0)