Skip to content

Commit 061011d

Browse files
authored
Checking _Rendezvous.done() when stopping Pub / Sub request generator. (#4554)
1 parent 840df1b commit 061011d

File tree

2 files changed

+66
-15
lines changed

2 files changed

+66
-15
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ def _request_generator_thread(self, policy):
223223
_LOGGER.debug('Sending request:\n%r', request)
224224
yield request
225225

226-
def _stop_request_generator(self, request_generator):
226+
def _stop_request_generator(self, request_generator, response_generator):
227227
"""Ensure a request generator is closed.
228228
229229
This **must** be done when recovering from a retry-able exception.
@@ -237,12 +237,23 @@ def _stop_request_generator(self, request_generator):
237237
Args:
238238
request_generator (Generator): A streaming pull request generator
239239
returned from :meth:`_request_generator_thread`.
240+
response_generator (grpc.Future): The gRPC bidirectional stream
241+
object that **was** consuming the ``request_generator``. (It
242+
will actually spawn a thread to consume the requests, but
243+
that thread will stop once the rendezvous has a status code
244+
set.)
240245
241246
Returns:
242247
bool: Indicates if the generator was successfully stopped. Will
243248
be :data:`True` unless the queue is not empty and the generator
244249
is running.
245250
"""
251+
if not response_generator.done():
252+
_LOGGER.debug(
253+
'Response generator must be done before stopping '
254+
'request generator.')
255+
return False
256+
246257
with self._put_lock:
247258
try:
248259
request_generator.close()
@@ -322,7 +333,8 @@ def _blocking_consume(self, policy):
322333
except Exception as exc:
323334
recover = policy.on_exception(exc)
324335
if recover:
325-
recover = self._stop_request_generator(request_generator)
336+
recover = self._stop_request_generator(
337+
request_generator, response_generator)
326338
if not recover:
327339
self._stop_no_join()
328340
return

pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -115,27 +115,32 @@ def test_blocking_consume_on_exception():
115115

116116

117117
def test_blocking_consume_two_exceptions():
118-
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
119-
policy.call_rpc.side_effect = (
120-
(mock.sentinel.A,),
121-
(mock.sentinel.B,),
122-
)
118+
policy = mock.Mock(spec=('call_rpc', 'on_exception'))
119+
123120
exc1 = NameError('Oh noes.')
124121
exc2 = ValueError('Something grumble.')
125-
policy.on_response.side_effect = (exc1, exc2)
122+
policy.on_exception.side_effect = OnException(acceptable=exc1)
123+
124+
response_generator1 = mock.MagicMock(spec=('__iter__', 'done'))
125+
response_generator1.__iter__.side_effect = exc1
126+
response_generator1.done.return_value = True
127+
response_generator2 = mock.MagicMock(spec=('__iter__', 'done'))
128+
response_generator2.__iter__.side_effect = exc2
129+
policy.call_rpc.side_effect = (response_generator1, response_generator2)
126130

127131
consumer = _consumer.Consumer()
128132
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
129-
policy.on_exception.side_effect = OnException(acceptable=exc1)
130133

131134
# Establish that we get responses until we are sent the exiting event.
132135
consumer._blocking_consume(policy)
133136
assert consumer._consumer_thread is None
134137

135138
# Check mocks.
136139
assert policy.call_rpc.call_count == 2
137-
policy.on_response.assert_has_calls(
138-
[mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)])
140+
response_generator1.__iter__.assert_called_once_with()
141+
response_generator1.done.assert_called_once_with()
142+
response_generator2.__iter__.assert_called_once_with()
143+
response_generator2.done.assert_not_called()
139144
policy.on_exception.assert_has_calls(
140145
[mock.call(exc1), mock.call(exc2)])
141146

@@ -179,6 +184,18 @@ def basic_queue_generator(queue, received):
179184
yield value
180185

181186

187+
def test_stop_request_generator_response_not_done():
188+
consumer = _consumer.Consumer()
189+
190+
response_generator = mock.Mock(spec=('done',))
191+
response_generator.done.return_value = False
192+
stopped = consumer._stop_request_generator(None, response_generator)
193+
assert stopped is False
194+
195+
# Check mocks.
196+
response_generator.done.assert_called_once_with()
197+
198+
182199
def test_stop_request_generator_not_running():
183200
# Model scenario tested:
184201
# - The request generator **is not** running
@@ -207,7 +224,10 @@ def test_stop_request_generator_not_running():
207224
# Make sure it **isn't** done.
208225
assert request_generator.gi_frame is not None
209226

210-
stopped = consumer._stop_request_generator(request_generator)
227+
response_generator = mock.Mock(spec=('done',))
228+
response_generator.done.return_value = True
229+
stopped = consumer._stop_request_generator(
230+
request_generator, response_generator)
211231
assert stopped is True
212232

213233
# Make sure it **is** done.
@@ -217,6 +237,9 @@ def test_stop_request_generator_not_running():
217237
assert queue_.get() == item2
218238
assert queue_.empty()
219239

240+
# Check mocks.
241+
response_generator.done.assert_called_once_with()
242+
220243

221244
def test_stop_request_generator_close_failure():
222245
# Model scenario tested:
@@ -229,11 +252,15 @@ def test_stop_request_generator_close_failure():
229252
request_generator = mock.Mock(spec=('close',))
230253
request_generator.close.side_effect = TypeError('Really, not a generator')
231254

232-
stopped = consumer._stop_request_generator(request_generator)
255+
response_generator = mock.Mock(spec=('done',))
256+
response_generator.done.return_value = True
257+
stopped = consumer._stop_request_generator(
258+
request_generator, response_generator)
233259
assert stopped is False
234260

235261
# Make sure close() was only called once.
236262
request_generator.close.assert_called_once_with()
263+
response_generator.done.assert_called_once_with()
237264

238265

239266
def test_stop_request_generator_queue_non_empty():
@@ -264,7 +291,10 @@ def test_stop_request_generator_queue_non_empty():
264291
assert received.empty()
265292
assert request_generator.gi_frame is not None
266293

267-
stopped = consumer._stop_request_generator(request_generator)
294+
response_generator = mock.Mock(spec=('done',))
295+
response_generator.done.return_value = True
296+
stopped = consumer._stop_request_generator(
297+
request_generator, response_generator)
268298
assert stopped is False
269299

270300
# Make sure the generator is **still** not finished.
@@ -279,6 +309,9 @@ def test_stop_request_generator_queue_non_empty():
279309
pass
280310
assert received.get() == item2
281311

312+
# Check mocks.
313+
response_generator.done.assert_called_once_with()
314+
282315

283316
def test_stop_request_generator_running():
284317
# Model scenario tested:
@@ -304,7 +337,10 @@ def test_stop_request_generator_running():
304337
assert received.empty()
305338
assert request_generator.gi_frame is not None
306339

307-
stopped = consumer._stop_request_generator(request_generator)
340+
response_generator = mock.Mock(spec=('done',))
341+
response_generator.done.return_value = True
342+
stopped = consumer._stop_request_generator(
343+
request_generator, response_generator)
308344
assert stopped is True
309345

310346
# Make sure it **is** done, though we may have to wait until
@@ -316,3 +352,6 @@ def test_stop_request_generator_running():
316352
assert request_generator.gi_frame is None
317353
assert received.get() == _helper_threads.STOP
318354
assert queue_.empty()
355+
356+
# Check mocks.
357+
response_generator.done.assert_called_once_with()

0 commit comments

Comments
 (0)