@@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
500500 pass
501501
502502
503- @implementer (interfaces .IPullProducer )
503+ @implementer (interfaces .IPushProducer )
504504class _ByteProducer :
505505 """
506506 Iteratively write bytes to the request.
@@ -515,52 +515,62 @@ def __init__(
515515 ):
516516 self ._request = request
517517 self ._iterator = iterator
518+ self ._paused = False
518519
519- def start (self ) -> None :
520- self ._request .registerProducer (self , False )
520+ # Register the producer and start producing data.
521+ self ._request .registerProducer (self , True )
522+ self .resumeProducing ()
521523
522524 def _send_data (self , data : List [bytes ]) -> None :
523525 """
524- Send a list of strings as a response to the request .
526+ Send a list of bytes as a chunk of a response .
525527 """
526528 if not data :
527529 return
528530 self ._request .write (b"" .join (data ))
529531
532+ def pauseProducing (self ) -> None :
533+ self ._paused = True
534+
530535 def resumeProducing (self ) -> None :
531536 # We've stopped producing in the meantime (note that this might be
532537 # re-entrant after calling write).
533538 if not self ._request :
534539 return
535540
536- # Get the next chunk and write it to the request.
537- #
538- # The output of the JSON encoder is coalesced until min_chunk_size is
539- # reached. (This is because JSON encoders produce a very small output
540- # per iteration.)
541- #
542- # Note that buffer stores a list of bytes (instead of appending to
543- # bytes) to hopefully avoid many allocations.
544- buffer = []
545- buffered_bytes = 0
546- while buffered_bytes < self .min_chunk_size :
547- try :
548- data = next (self ._iterator )
549- buffer .append (data )
550- buffered_bytes += len (data )
551- except StopIteration :
552- # The entire JSON object has been serialized, write any
553- # remaining data, finalize the producer and the request, and
554- # clean-up any references.
555- self ._send_data (buffer )
556- self ._request .unregisterProducer ()
557- self ._request .finish ()
558- self .stopProducing ()
559- return
560-
561- self ._send_data (buffer )
541+ self ._paused = False
542+
543+ # Write until there's backpressure telling us to stop.
544+ while not self ._paused :
545+ # Get the next chunk and write it to the request.
546+ #
547+ # The output of the JSON encoder is coalesced until min_chunk_size is
548+ # reached. (This is because JSON encoders produce a very small output
549+ # per iteration.)
550+ #
551+ # Note that buffer stores a list of bytes (instead of appending to
552+ # bytes) to hopefully avoid many allocations.
553+ buffer = []
554+ buffered_bytes = 0
555+ while buffered_bytes < self .min_chunk_size :
556+ try :
557+ data = next (self ._iterator )
558+ buffer .append (data )
559+ buffered_bytes += len (data )
560+ except StopIteration :
561+ # The entire JSON object has been serialized, write any
562+ # remaining data, finalize the producer and the request, and
563+ # clean-up any references.
564+ self ._send_data (buffer )
565+ self ._request .unregisterProducer ()
566+ self ._request .finish ()
567+ self .stopProducing ()
568+ return
569+
570+ self ._send_data (buffer )
562571
563572 def stopProducing (self ) -> None :
573+ # Clear a circular reference.
564574 self ._request = None
565575
566576
@@ -620,8 +630,7 @@ def respond_with_json(
620630 if send_cors :
621631 set_cors_headers (request )
622632
623- producer = _ByteProducer (request , encoder (json_object ))
624- producer .start ()
633+ _ByteProducer (request , encoder (json_object ))
625634 return NOT_DONE_YET
626635
627636
0 commit comments