2222from google .api_core import exceptions
2323
2424_LOGGER = logging .getLogger (__name__ )
25- _BIDIRECTIONAL_CONSUMER_NAME = ' Thread-ConsumeBidirectionalStream'
25+ _BIDIRECTIONAL_CONSUMER_NAME = " Thread-ConsumeBidirectionalStream"
2626
2727
2828class _RequestQueueGenerator (object ):
@@ -79,6 +79,7 @@ class _RequestQueueGenerator(object):
7979 easily restarting streams that require some initial configuration
8080 request.
8181 """
82+
8283 def __init__ (self , queue , period = 1 , initial_request = None ):
8384 self ._queue = queue
8485 self ._period = period
@@ -107,8 +108,8 @@ def __iter__(self):
107108 except queue .Empty :
108109 if not self ._is_active ():
109110 _LOGGER .debug (
110- ' Empty queue and inactive call, exiting request '
111- 'generator.' )
111+ " Empty queue and inactive call, exiting request " "generator."
112+ )
112113 return
113114 else :
114115 # call is still active, keep waiting for queue items.
@@ -117,16 +118,17 @@ def __iter__(self):
117118 # The consumer explicitly sent "None", indicating that the request
118119 # should end.
119120 if item is None :
120- _LOGGER .debug (' Cleanly exiting request generator.' )
121+ _LOGGER .debug (" Cleanly exiting request generator." )
121122 return
122123
123124 if not self ._is_active ():
124125 # We have an item, but the call is closed. We should put the
125126 # item back on the queue so that the next call can consume it.
126127 self ._queue .put (item )
127128 _LOGGER .debug (
128- 'Inactive call, replacing item on queue and exiting '
129- 'request generator.' )
129+ "Inactive call, replacing item on queue and exiting "
130+ "request generator."
131+ )
130132 return
131133
132134 yield item
@@ -164,6 +166,7 @@ class BidiRpc(object):
164166 yield. This is useful if an initial request is needed to start the
165167 stream.
166168 """
169+
167170 def __init__ (self , start_rpc , initial_request = None ):
168171 self ._start_rpc = start_rpc
169172 self ._initial_request = initial_request
@@ -192,17 +195,18 @@ def _on_call_done(self, future):
192195 def open (self ):
193196 """Opens the stream."""
194197 if self .is_active :
195- raise ValueError (' Can not open an already open stream.' )
198+ raise ValueError (" Can not open an already open stream." )
196199
197200 request_generator = _RequestQueueGenerator (
198- self ._request_queue , initial_request = self ._initial_request )
201+ self ._request_queue , initial_request = self ._initial_request
202+ )
199203 call = self ._start_rpc (iter (request_generator ))
200204
201205 request_generator .call = call
202206
203207 # TODO: api_core should expose the future interface for wrapped
204208 # callables as well.
205- if hasattr (call , ' _wrapped' ): # pragma: NO COVER
209+ if hasattr (call , " _wrapped" ): # pragma: NO COVER
206210 call ._wrapped .add_done_callback (self ._on_call_done )
207211 else :
208212 call .add_done_callback (self ._on_call_done )
@@ -232,8 +236,7 @@ def send(self, request):
232236 request (protobuf.Message): The request to send.
233237 """
234238 if self .call is None :
235- raise ValueError (
236- 'Can not send() on an RPC that has never been open()ed.' )
239+ raise ValueError ("Can not send() on an RPC that has never been open()ed." )
237240
238241 # Don't use self.is_active(), as ResumableBidiRpc will overload it
239242 # to mean something semantically different.
@@ -254,8 +257,7 @@ def recv(self):
254257 protobuf.Message: The received message.
255258 """
256259 if self .call is None :
257- raise ValueError (
258- 'Can not recv() on an RPC that has never been open()ed.' )
260+ raise ValueError ("Can not recv() on an RPC that has never been open()ed." )
259261
260262 return next (self .call )
261263
@@ -309,6 +311,7 @@ def should_recover(exc):
309311 True if the stream should be recovered. This will be called
310312 whenever an error is encountered on the stream.
311313 """
314+
312315 def __init__ (self , start_rpc , should_recover , initial_request = None ):
313316 super (ResumableBidiRpc , self ).__init__ (start_rpc , initial_request )
314317 self ._should_recover = should_recover
@@ -334,14 +337,14 @@ def _on_call_done(self, future):
334337 if not self ._should_recover (future ):
335338 self ._finalize (future )
336339 else :
337- _LOGGER .debug (' Re-opening stream from gRPC callback.' )
340+ _LOGGER .debug (" Re-opening stream from gRPC callback." )
338341 self ._reopen ()
339342
340343 def _reopen (self ):
341344 with self ._operational_lock :
342345 # Another thread already managed to re-open this stream.
343346 if self .call is not None and self .call .is_active ():
344- _LOGGER .debug (' Stream was already re-established.' )
347+ _LOGGER .debug (" Stream was already re-established." )
345348 return
346349
347350 self .call = None
@@ -362,11 +365,11 @@ def _reopen(self):
362365 # If re-opening or re-calling the method fails for any reason,
363366 # consider it a terminal error and finalize the stream.
364367 except Exception as exc :
365- _LOGGER .debug (' Failed to re-open stream due to %s' , exc )
368+ _LOGGER .debug (" Failed to re-open stream due to %s" , exc )
366369 self ._finalize (exc )
367370 raise
368371
369- _LOGGER .info (' Re-established stream' )
372+ _LOGGER .info (" Re-established stream" )
370373
371374 def _recoverable (self , method , * args , ** kwargs ):
372375 """Wraps a method to recover the stream and retry on error.
@@ -388,18 +391,15 @@ def _recoverable(self, method, *args, **kwargs):
388391
389392 except Exception as exc :
390393 with self ._operational_lock :
391- _LOGGER .debug (
392- 'Call to retryable %r caused %s.' , method , exc )
394+ _LOGGER .debug ("Call to retryable %r caused %s." , method , exc )
393395
394396 if not self ._should_recover (exc ):
395397 self .close ()
396- _LOGGER .debug (
397- 'Not retrying %r due to %s.' , method , exc )
398+ _LOGGER .debug ("Not retrying %r due to %s." , method , exc )
398399 self ._finalize (exc )
399400 raise exc
400401
401- _LOGGER .debug (
402- 'Re-opening stream from retryable %r.' , method )
402+ _LOGGER .debug ("Re-opening stream from retryable %r." , method )
403403 self ._reopen ()
404404
405405 def _send (self , request ):
@@ -414,8 +414,7 @@ def _send(self, request):
414414 call = self .call
415415
416416 if call is None :
417- raise ValueError (
418- 'Can not send() on an RPC that has never been open()ed.' )
417+ raise ValueError ("Can not send() on an RPC that has never been open()ed." )
419418
420419 # Don't use self.is_active(), as ResumableBidiRpc will overload it
421420 # to mean something semantically different.
@@ -434,8 +433,7 @@ def _recv(self):
434433 call = self .call
435434
436435 if call is None :
437- raise ValueError (
438- 'Can not recv() on an RPC that has never been open()ed.' )
436+ raise ValueError ("Can not recv() on an RPC that has never been open()ed." )
439437
440438 return next (call )
441439
@@ -493,6 +491,7 @@ def on_response(response):
493491 on_response (Callable[[protobuf.Message], None]): The callback to
494492 be called for every response on the stream.
495493 """
494+
496495 def __init__ (self , bidi_rpc , on_response ):
497496 self ._bidi_rpc = bidi_rpc
498497 self ._on_response = on_response
@@ -522,43 +521,47 @@ def _thread_main(self):
522521 # Python 2.7.
523522 with self ._wake :
524523 if self ._paused :
525- _LOGGER .debug (' paused, waiting for waking.' )
524+ _LOGGER .debug (" paused, waiting for waking." )
526525 self ._wake .wait ()
527- _LOGGER .debug (' woken.' )
526+ _LOGGER .debug (" woken." )
528527
529- _LOGGER .debug (' waiting for recv.' )
528+ _LOGGER .debug (" waiting for recv." )
530529 response = self ._bidi_rpc .recv ()
531- _LOGGER .debug (' recved response.' )
530+ _LOGGER .debug (" recved response." )
532531 self ._on_response (response )
533532
534533 except exceptions .GoogleAPICallError as exc :
535534 _LOGGER .debug (
536- '%s caught error %s and will exit. Generally this is due to '
537- 'the RPC itself being cancelled and the error will be '
538- 'surfaced to the calling code.' ,
539- _BIDIRECTIONAL_CONSUMER_NAME , exc , exc_info = True )
535+ "%s caught error %s and will exit. Generally this is due to "
536+ "the RPC itself being cancelled and the error will be "
537+ "surfaced to the calling code." ,
538+ _BIDIRECTIONAL_CONSUMER_NAME ,
539+ exc ,
540+ exc_info = True ,
541+ )
540542
541543 except Exception as exc :
542544 _LOGGER .exception (
543- '%s caught unexpected exception %s and will exit.' ,
544- _BIDIRECTIONAL_CONSUMER_NAME , exc )
545+ "%s caught unexpected exception %s and will exit." ,
546+ _BIDIRECTIONAL_CONSUMER_NAME ,
547+ exc ,
548+ )
545549
546550 else :
547- _LOGGER .error (
548- 'The bidirectional RPC exited.' )
551+ _LOGGER .error ("The bidirectional RPC exited." )
549552
550- _LOGGER .info (' %s exiting' , _BIDIRECTIONAL_CONSUMER_NAME )
553+ _LOGGER .info (" %s exiting" , _BIDIRECTIONAL_CONSUMER_NAME )
551554
552555 def start (self ):
553556 """Start the background thread and begin consuming the thread."""
554557 with self ._operational_lock :
555558 thread = threading .Thread (
556- name = _BIDIRECTIONAL_CONSUMER_NAME ,
557- target = self . _thread_main )
559+ name = _BIDIRECTIONAL_CONSUMER_NAME , target = self . _thread_main
560+ )
558561 thread .daemon = True
559562 thread .start ()
560563 self ._thread = thread
561- _LOGGER .debug (' Started helper thread %s' , thread .name )
564+ _LOGGER .debug (" Started helper thread %s" , thread .name )
562565
563566 def stop (self ):
564567 """Stop consuming the stream and shutdown the background thread."""
0 commit comments