Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit f594e43

Browse files
authored
Switch the JSON byte producer from a pull to a push producer. (#8116)
1 parent cfeb37f commit f594e43

File tree

5 files changed

+53
-46
lines changed

5 files changed

+53
-46
lines changed

changelog.d/8116.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Iteratively encode JSON to avoid blocking the reactor.

synapse/http/server.py

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
500500
pass
501501

502502

503-
@implementer(interfaces.IPullProducer)
503+
@implementer(interfaces.IPushProducer)
504504
class _ByteProducer:
505505
"""
506506
Iteratively write bytes to the request.
@@ -515,52 +515,64 @@ def __init__(
515515
):
516516
self._request = request
517517
self._iterator = iterator
518+
self._paused = False
518519

519-
def start(self) -> None:
520-
self._request.registerProducer(self, False)
520+
# Register the producer and start producing data.
521+
self._request.registerProducer(self, True)
522+
self.resumeProducing()
521523

522524
def _send_data(self, data: List[bytes]) -> None:
523525
"""
524-
Send a list of strings as a response to the request.
526+
Send a list of bytes as a chunk of a response.
525527
"""
526528
if not data:
527529
return
528530
self._request.write(b"".join(data))
529531

532+
def pauseProducing(self) -> None:
533+
self._paused = True
534+
530535
def resumeProducing(self) -> None:
531536
# We've stopped producing in the meantime (note that this might be
532537
# re-entrant after calling write).
533538
if not self._request:
534539
return
535540

536-
# Get the next chunk and write it to the request.
537-
#
538-
# The output of the JSON encoder is coalesced until min_chunk_size is
539-
# reached. (This is because JSON encoders produce a very small output
540-
# per iteration.)
541-
#
542-
# Note that buffer stores a list of bytes (instead of appending to
543-
# bytes) to hopefully avoid many allocations.
544-
buffer = []
545-
buffered_bytes = 0
546-
while buffered_bytes < self.min_chunk_size:
547-
try:
548-
data = next(self._iterator)
549-
buffer.append(data)
550-
buffered_bytes += len(data)
551-
except StopIteration:
552-
# The entire JSON object has been serialized, write any
553-
# remaining data, finalize the producer and the request, and
554-
# clean-up any references.
555-
self._send_data(buffer)
556-
self._request.unregisterProducer()
557-
self._request.finish()
558-
self.stopProducing()
559-
return
560-
561-
self._send_data(buffer)
541+
self._paused = False
542+
543+
# Write until there's backpressure telling us to stop.
544+
while not self._paused:
545+
# Get the next chunk and write it to the request.
546+
#
547+
# The output of the JSON encoder is buffered and coalesced until
548+
# min_chunk_size is reached. This is because JSON encoders produce
549+
# very small output per iteration and the Request object converts
550+
# each call to write() to a separate chunk. Without this there would
551+
# be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n").
552+
#
553+
# Note that buffer stores a list of bytes (instead of appending to
554+
# bytes) to hopefully avoid many allocations.
555+
buffer = []
556+
buffered_bytes = 0
557+
while buffered_bytes < self.min_chunk_size:
558+
try:
559+
data = next(self._iterator)
560+
buffer.append(data)
561+
buffered_bytes += len(data)
562+
except StopIteration:
563+
# The entire JSON object has been serialized, write any
564+
# remaining data, finalize the producer and the request, and
565+
# clean-up any references.
566+
self._send_data(buffer)
567+
self._request.unregisterProducer()
568+
self._request.finish()
569+
self.stopProducing()
570+
return
571+
572+
self._send_data(buffer)
562573

563574
def stopProducing(self) -> None:
575+
# Clear a circular reference.
564576
self._request = None
565577

566578

@@ -620,8 +632,7 @@ def respond_with_json(
620632
if send_cors:
621633
set_cors_headers(request)
622634

623-
producer = _ByteProducer(request, encoder(json_object))
624-
producer.start()
635+
_ByteProducer(request, encoder(json_object))
625636
return NOT_DONE_YET
626637

627638

tests/rest/client/v1/test_login.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ def test_POST_ratelimiting_per_address(self):
6262
"identifier": {"type": "m.id.user", "user": "kermit" + str(i)},
6363
"password": "monkey",
6464
}
65-
request_data = json.dumps(params)
66-
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
65+
request, channel = self.make_request(b"POST", LOGIN_URL, params)
6766
self.render(request)
6867

6968
if i == 5:
@@ -76,14 +75,13 @@ def test_POST_ratelimiting_per_address(self):
7675
# than 1min.
7776
self.assertTrue(retry_after_ms < 6000)
7877

79-
self.reactor.advance(retry_after_ms / 1000.0)
78+
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)
8079

8180
params = {
8281
"type": "m.login.password",
8382
"identifier": {"type": "m.id.user", "user": "kermit" + str(i)},
8483
"password": "monkey",
8584
}
86-
request_data = json.dumps(params)
8785
request, channel = self.make_request(b"POST", LOGIN_URL, params)
8886
self.render(request)
8987

@@ -111,8 +109,7 @@ def test_POST_ratelimiting_per_account(self):
111109
"identifier": {"type": "m.id.user", "user": "kermit"},
112110
"password": "monkey",
113111
}
114-
request_data = json.dumps(params)
115-
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
112+
request, channel = self.make_request(b"POST", LOGIN_URL, params)
116113
self.render(request)
117114

118115
if i == 5:
@@ -132,7 +129,6 @@ def test_POST_ratelimiting_per_account(self):
132129
"identifier": {"type": "m.id.user", "user": "kermit"},
133130
"password": "monkey",
134131
}
135-
request_data = json.dumps(params)
136132
request, channel = self.make_request(b"POST", LOGIN_URL, params)
137133
self.render(request)
138134

@@ -160,8 +156,7 @@ def test_POST_ratelimiting_per_account_failed_attempts(self):
160156
"identifier": {"type": "m.id.user", "user": "kermit"},
161157
"password": "notamonkey",
162158
}
163-
request_data = json.dumps(params)
164-
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
159+
request, channel = self.make_request(b"POST", LOGIN_URL, params)
165160
self.render(request)
166161

167162
if i == 5:
@@ -174,14 +169,13 @@ def test_POST_ratelimiting_per_account_failed_attempts(self):
174169
# than 1min.
175170
self.assertTrue(retry_after_ms < 6000)
176171

177-
self.reactor.advance(retry_after_ms / 1000.0)
172+
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)
178173

179174
params = {
180175
"type": "m.login.password",
181176
"identifier": {"type": "m.id.user", "user": "kermit"},
182177
"password": "notamonkey",
183178
}
184-
request_data = json.dumps(params)
185179
request, channel = self.make_request(b"POST", LOGIN_URL, params)
186180
self.render(request)
187181

tests/rest/client/v2_alpha/test_register.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def test_POST_ratelimiting_guest(self):
160160
else:
161161
self.assertEquals(channel.result["code"], b"200", channel.result)
162162

163-
self.reactor.advance(retry_after_ms / 1000.0)
163+
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)
164164

165165
request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
166166
self.render(request)
@@ -186,7 +186,7 @@ def test_POST_ratelimiting(self):
186186
else:
187187
self.assertEquals(channel.result["code"], b"200", channel.result)
188188

189-
self.reactor.advance(retry_after_ms / 1000.0)
189+
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)
190190

191191
request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
192192
self.render(request)

tests/storage/test_cleanup_extrems.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ def test_expiry_logic(self):
353353
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
354354
"3"
355355
] = 300000
356+
356357
self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
357358
# All entries within time frame
358359
self.assertEqual(
@@ -362,7 +363,7 @@ def test_expiry_logic(self):
362363
3,
363364
)
364365
# Oldest room to expire
365-
self.pump(1)
366+
self.pump(1.01)
366367
self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
367368
self.assertEqual(
368369
len(

0 commit comments

Comments
 (0)