Skip to content

Commit c066873

Browse files
authored
fix(spanner): handle errors during stream restart in snapshot (#1471)
***Handle errors during stream restart in snapshot*** **Root Cause** When `_restart_on_unavailable` caught a `ServiceUnavailable` or resumable `InternalServerError`, it attempted to re-initialize the iterator immediately within the `except` block. If this re-initialization failed (e.g. due to a persistent transient error), the exception would propagate unhandled, breaking the retry loop. **Fix** This change modifies the logic to reset the iterator to `None` and `continue` the loop, forcing the re-initialization to occur inside the `try` block. This ensures that subsequent errors during restart are properly caught and retried. **Testing** Added unit tests to cover this specific behavior
1 parent 3b1792a commit c066873

File tree

2 files changed

+62
-42
lines changed

2 files changed

+62
-42
lines changed

google/cloud/spanner_v1/snapshot.py

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -146,27 +146,12 @@ def _restart_on_unavailable(
146146

147147
except ServiceUnavailable:
148148
del item_buffer[:]
149-
with trace_call(
150-
trace_name,
151-
session,
152-
attributes,
153-
observability_options=observability_options,
154-
metadata=metadata,
155-
) as span, MetricsCapture():
156-
request.resume_token = resume_token
157-
if transaction is not None:
158-
transaction_selector = transaction._build_transaction_selector_pb()
159-
request.transaction = transaction_selector
160-
attempt += 1
161-
iterator = method(
162-
request=request,
163-
metadata=request_id_manager.metadata_with_request_id(
164-
nth_request,
165-
attempt,
166-
metadata,
167-
span,
168-
),
169-
)
149+
request.resume_token = resume_token
150+
if transaction is not None:
151+
transaction_selector = transaction._build_transaction_selector_pb()
152+
request.transaction = transaction_selector
153+
attempt += 1
154+
iterator = None
170155
continue
171156

172157
except InternalServerError as exc:
@@ -177,27 +162,12 @@ def _restart_on_unavailable(
177162
if not resumable_error:
178163
raise
179164
del item_buffer[:]
180-
with trace_call(
181-
trace_name,
182-
session,
183-
attributes,
184-
observability_options=observability_options,
185-
metadata=metadata,
186-
) as span, MetricsCapture():
187-
request.resume_token = resume_token
188-
if transaction is not None:
189-
transaction_selector = transaction._build_transaction_selector_pb()
190-
attempt += 1
191-
request.transaction = transaction_selector
192-
iterator = method(
193-
request=request,
194-
metadata=request_id_manager.metadata_with_request_id(
195-
nth_request,
196-
attempt,
197-
metadata,
198-
span,
199-
),
200-
)
165+
request.resume_token = resume_token
166+
if transaction is not None:
167+
transaction_selector = transaction._build_transaction_selector_pb()
168+
attempt += 1
169+
request.transaction = transaction_selector
170+
iterator = None
201171
continue
202172

203173
if len(item_buffer) == 0:

tests/unit/test_snapshot.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,56 @@ def test_iteration_w_raw_raising_unavailable_after_token(self):
405405
self.assertEqual(request.resume_token, RESUME_TOKEN)
406406
self.assertNoSpans()
407407

408+
def test_iteration_w_raw_raising_unavailable_during_restart(self):
409+
from google.api_core.exceptions import ServiceUnavailable
410+
411+
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
412+
LAST = (self._make_item(2),)
413+
before = _MockIterator(
414+
*FIRST, fail_after=True, error=ServiceUnavailable("testing")
415+
)
416+
after = _MockIterator(*LAST)
417+
request = mock.Mock(test="test", spec=["test", "resume_token"])
418+
# The second call (the first retry) raises ServiceUnavailable immediately.
419+
# The third call (the second retry) succeeds.
420+
restart = mock.Mock(
421+
spec=[],
422+
side_effect=[before, ServiceUnavailable("retry failed"), after],
423+
)
424+
database = _Database()
425+
database.spanner_api = build_spanner_api()
426+
session = _Session(database)
427+
derived = _build_snapshot_derived(session)
428+
resumable = self._call_fut(derived, restart, request, session=session)
429+
self.assertEqual(list(resumable), list(FIRST + LAST))
430+
self.assertEqual(len(restart.mock_calls), 3)
431+
self.assertEqual(request.resume_token, RESUME_TOKEN)
432+
self.assertNoSpans()
433+
434+
def test_iteration_w_raw_raising_resumable_internal_error_during_restart(self):
435+
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
436+
LAST = (self._make_item(2),)
437+
before = _MockIterator(
438+
*FIRST,
439+
fail_after=True,
440+
error=INTERNAL_SERVER_ERROR_UNEXPECTED_EOS,
441+
)
442+
after = _MockIterator(*LAST)
443+
request = mock.Mock(test="test", spec=["test", "resume_token"])
444+
restart = mock.Mock(
445+
spec=[],
446+
side_effect=[before, INTERNAL_SERVER_ERROR_UNEXPECTED_EOS, after],
447+
)
448+
database = _Database()
449+
database.spanner_api = build_spanner_api()
450+
session = _Session(database)
451+
derived = _build_snapshot_derived(session)
452+
resumable = self._call_fut(derived, restart, request, session=session)
453+
self.assertEqual(list(resumable), list(FIRST + LAST))
454+
self.assertEqual(len(restart.mock_calls), 3)
455+
self.assertEqual(request.resume_token, RESUME_TOKEN)
456+
self.assertNoSpans()
457+
408458
def test_iteration_w_raw_w_multiuse(self):
409459
from google.cloud.spanner_v1 import (
410460
ReadRequest,

0 commit comments

Comments
 (0)