Skip to content
This repository has been archived by the owner on Nov 23, 2020. It is now read-only.

Commit

Permalink
added stream_buffer configuration parameter for controlling the maxim…
Browse files Browse the repository at this point in the history
…um size of stream buffers #release-note
  • Loading branch information
lsbardel committed Nov 5, 2016
1 parent 182b7b7 commit 14d6f2e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 23 deletions.
50 changes: 27 additions & 23 deletions pulsar/apps/wsgi/formdata.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import email.parser
import asyncio
from http.client import HTTPMessage, _MAXLINE, _MAXHEADERS
import json

from http.client import HTTPMessage, _MAXHEADERS
from io import BytesIO
from urllib.parse import parse_qs
from base64 import b64encode
from functools import reduce
from cgi import valid_boundary, parse_header

from pulsar import HttpException, BadRequest, isawaitable, ensure_future
from pulsar.utils.system import json
from pulsar.utils.system import convert_bytes
from pulsar.utils.structures import MultiValueDict, mapping_iterator
from pulsar.utils.httpurl import (DEFAULT_CHARSET, ENCODE_BODY_METHODS,
JSON_CONTENT_TYPES, parse_options_header)


ONEMB = 2**20
# Default max size for body when not streaming
DEFAULT_MAXSIZE = 10*ONEMB

FORM_ENCODED_TYPES = ('application/x-www-form-urlencoded',
'application/x-url-encoded')
BODY_DATA = 0
Expand All @@ -34,9 +32,10 @@ class HttpBodyReader:
_expect_sent = None
_waiting = None

def __init__(self, headers, parser, transport, **kw):
def __init__(self, headers, parser, transport, limit, **kw):
self.headers = headers
self.parser = parser
self.limit = limit
self.reader = asyncio.StreamReader(**kw)
self.reader.set_transport(transport)
self.feed_data = self.reader.feed_data
Expand Down Expand Up @@ -69,9 +68,14 @@ def read(self, n=-1):
self.can_continue()
return self.reader.read(n=n)

def readline(self):
self.can_continue()
return self.reader.readline()
async def readline(self):
try:
line = await self.reader.readuntil(b'\n')
except asyncio.streams.LimitOverrunError as exc:
line = await self.read(exc.consumed) + await self.readline()
if len(line) > self.limit:
raise_large_body_error(self.limit)
return line

def readexactly(self, n):
self.can_continue()
Expand Down Expand Up @@ -123,6 +127,7 @@ def __init__(self, environ, options, stream):
self.environ = environ
self.options = options
self.stream = stream
self.limit = environ['pulsar.cfg'].stream_buffer
self.result = (MultiValueDict(), MultiValueDict())

@property
Expand Down Expand Up @@ -163,26 +168,21 @@ async def _consume(self, fp, boundary):

while terminator != lastpart:
nbytes = -1
data = None
current = None

if terminator:
headers = await parse_headers(fp)
current = MultipartPart(self, headers)

if nbytes > 0:
data = await fp.read(nbytes)
else:
data = b''
data = await fp.read(nbytes) if nbytes > 0 else b''

if current.name:
current.feed_data(data)
else:
current = None

while 1:
line = await fp.readline()
line = line or lastpart
line = await fp.readline() or lastpart
if line.startswith(sep):
terminator = line.rstrip()
if terminator in (nextpart, lastpart):
Expand All @@ -201,10 +201,8 @@ async def _consume(self, fp, boundary):
class BytesDecoder(FormDecoder):

def parse(self, mem_limit=None, **kw):
mem_limit = mem_limit or DEFAULT_MAXSIZE
if self.content_length > mem_limit:
raise HttpException("Request to big. Increase MAXMEM.",
status=LARGE_BODY_CODE)
if self.content_length > self.limit:
raise_large_body_error(self.limit)
inp = self.environ.get('wsgi.input') or BytesIO()
data = inp.read()

Expand Down Expand Up @@ -359,8 +357,6 @@ async def parse_headers(fp, _class=HTTPMessage):
headers = []
while True:
line = await fp.readline()
if len(line) > _MAXLINE:
raise HttpException("header line")
headers.append(line)
if len(headers) > _MAXHEADERS:
raise HttpException("got more than %d headers" % _MAXHEADERS)
Expand All @@ -370,6 +366,14 @@ async def parse_headers(fp, _class=HTTPMessage):
return email.parser.Parser(_class=_class).parsestr(hstring)


def raise_large_body_error(limit):
raise HttpException(
"Request content length too large. Limit is %s" %
convert_bytes(limit),
status=LARGE_BODY_CODE
)


class BytesProducer:

def __init__(self, bytes):
Expand Down
1 change: 1 addition & 0 deletions pulsar/apps/wsgi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def data_received(self, data):
self._body_reader = HttpBodyReader(headers,
parser,
self.transport,
self.cfg.stream_buffer,
loop=self._loop)
ensure_future(self._response(self.wsgi_environ()),
loop=self._loop)
Expand Down
13 changes: 13 additions & 0 deletions pulsar/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,19 @@ class ExecutionId(Global):
"""


class StreamBuffer(Global):
name = "stream_buffer"
flags = ["--stream-buffer"]
validator = validate_pos_int
type = int
default = 2 ** 24
desc = """Buffer limit for stream readers
When data in buffer exceeds this size, the framework throws buffer
limit errors
"""


############################################################################
# Worker Processes
section_docs['Worker Processes'] = """
Expand Down
27 changes: 27 additions & 0 deletions tests/http/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def setUpClass(cls):
s = server(bind='127.0.0.1:0', concurrency=concurrency,
name='httpbin-%s' % cls.__name__.lower(),
keep_alive=30, key_file=key_file, cert_file=cert_file,
stream_buffer=2**18,
workers=1)
cfg = await send('arbiter', 'run', s)
cls.app = cfg.app()
Expand Down Expand Up @@ -306,6 +307,32 @@ async def test_upload_files(self):
self.assertEqual(data['files'], {'test': ['simple file']})
self.assertEqual(data['args']['numero'], ['1', '2'])

async def test_upload_large_files(self):
from asyncio.streams import _DEFAULT_LIMIT
http = self._client
file_data = 'A' * _DEFAULT_LIMIT + 'B' * 1024
data = (('bla', 'foo'), ('unz', 'whatz'),
('numero', '1'), ('numero', '2'))
response = await http.put(self.httpbin('upload'), data=data,
files={'test': file_data})
self.assertEqual(response.status_code, 200)
ct = response.request.headers['content-type']
self.assertTrue(ct.startswith('multipart/form-data; boundary='))
data = response.json()
self.assertEqual(data['files']['test'][0], file_data)
self.assertEqual(data['args']['numero'], ['1', '2'])

async def test_upload_too_large_files(self):
http = self._client
file_data = 'A' * (2 ** 18 + 2 ** 12)
data = (('bla', 'foo'), ('unz', 'whatz'),
('numero', '1'), ('numero', '2'))
response = await http.put(self.httpbin('upload'), data=data,
files={'test': file_data})
self.assertEqual(response.status_code, 403)
self.assertEqual(response.content,
b'Request content length too large. Limit is 256.0KB')

def test_HttpResponse(self):
r = HttpResponse(loop=get_event_loop())
self.assertEqual(r.request, None)
Expand Down

1 comment on commit 14d6f2e

@lsbardel
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes #253 by adding the new config parameter stream_buffer by default set at 16Mb

Please sign in to comment.