Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket internal implementation refactoring #1021

Merged
merged 11 commits into from
Jul 31, 2016
6 changes: 6 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ CHANGES

- Fix logger report for unix socket 8e8469b

- Rename aiohttp.websocket to aiohttp._ws_impl

- Rename aiohttp.MsgType tp aiohttp.WSMsgType

- Introduce aiohttp.WSMessage officially

0.22.3 (07-26-2016)
-------------------

Expand Down
11 changes: 8 additions & 3 deletions aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
from .parsers import * # noqa
from .streams import * # noqa
from .multipart import * # noqa
from .websocket_client import * # noqa
from .client_ws import ClientWebSocketResponse # noqa
from ._ws_impl import WSMsgType, WSCloseCode, Message, WebSocketError # noqa
from .file_sender import FileSender # noqa


MsgType = WSMsgType # backward compatibility


__all__ = (client.__all__ + # noqa
client_reqrep.__all__ + # noqa
errors.__all__ + # noqa
Expand All @@ -30,5 +34,6 @@
streams.__all__ + # noqa
multidict.__all__ + # noqa
multipart.__all__ + # noqa
websocket_client.__all__ + # noqa
('hdrs', 'FileSender'))
('hdrs', 'FileSender', 'WSMsgType', 'MsgType', 'WSCloseCode',
'WebSocketError', 'Message',
'ClientWebSocketResponse'))
153 changes: 75 additions & 78 deletions aiohttp/websocket.py → aiohttp/_ws_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,42 @@
from aiohttp import errors, hdrs
from aiohttp.log import ws_logger

from enum import IntEnum


__all__ = ('WebSocketParser', 'WebSocketWriter', 'do_handshake',
'Message', 'WebSocketError',
'MSG_TEXT', 'MSG_BINARY', 'MSG_CLOSE', 'MSG_PING', 'MSG_PONG')

# Frame opcodes defined in the spec.
OPCODE_CONTINUATION = 0x0
MSG_TEXT = OPCODE_TEXT = 0x1
MSG_BINARY = OPCODE_BINARY = 0x2
MSG_CLOSE = OPCODE_CLOSE = 0x8
MSG_PING = OPCODE_PING = 0x9
MSG_PONG = OPCODE_PONG = 0xa

CLOSE_OK = 1000
CLOSE_GOING_AWAY = 1001
CLOSE_PROTOCOL_ERROR = 1002
CLOSE_UNSUPPORTED_DATA = 1003
CLOSE_INVALID_TEXT = 1007
CLOSE_POLICY_VIOLATION = 1008
CLOSE_MESSAGE_TOO_BIG = 1009
CLOSE_MANDATORY_EXTENSION = 1010
CLOSE_INTERNAL_ERROR = 1011
CLOSE_SERVICE_RESTART = 1012
CLOSE_TRY_AGAIN_LATER = 1013

ALLOWED_CLOSE_CODES = (
CLOSE_OK,
CLOSE_GOING_AWAY,
CLOSE_PROTOCOL_ERROR,
CLOSE_UNSUPPORTED_DATA,
CLOSE_INVALID_TEXT,
CLOSE_POLICY_VIOLATION,
CLOSE_MESSAGE_TOO_BIG,
CLOSE_MANDATORY_EXTENSION,
CLOSE_INTERNAL_ERROR,
CLOSE_SERVICE_RESTART,
CLOSE_TRY_AGAIN_LATER,
)
'Message', 'WebSocketError', 'WSMsgType', 'WSCloseCode')


class WSCloseCode(IntEnum):
ok = 1000
going_away = 1001
protocol_error = 1002
unsupported_data = 1003
invalid_text = 1007
policy_violation = 1008
message_too_big = 1009
mandatory_extension = 1010
internal_error = 1011
service_restart = 1012
try_again_later = 1013


ALLOWED_CLOSE_CODES = {int(i) for i in WSCloseCode}


class WSMsgType(IntEnum):
continuation = 0x0
text = 0x1
binary = 0x2
ping = 0x9
pong = 0xa
close = 0x8
closed = 0x101
error = 0x102


WS_KEY = b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
WS_HDRS = (hdrs.UPGRADE,
hdrs.CONNECTION,
hdrs.SEC_WEBSOCKET_VERSION,
hdrs.SEC_WEBSOCKET_KEY,
hdrs.SEC_WEBSOCKET_PROTOCOL)


UNPACK_LEN2 = Struct('!H').unpack_from
Expand All @@ -81,6 +73,9 @@ def json(self, *, loads=json.loads):
return loads(self.data)


CLOSED_MESSAGE = Message(WSMsgType.closed, None, None)


class WebSocketError(Exception):
"""WebSocket protocol parser error."""

Expand All @@ -93,39 +88,40 @@ def WebSocketParser(out, buf):
while True:
fin, opcode, payload = yield from parse_frame(buf)

if opcode == OPCODE_CLOSE:
if opcode == WSMsgType.close:
if len(payload) >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
if close_code not in ALLOWED_CLOSE_CODES and close_code < 3000:
if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'Invalid close code: {}'.format(close_code))
try:
close_message = payload[2:].decode('utf-8')
except UnicodeDecodeError as exc:
raise WebSocketError(
CLOSE_INVALID_TEXT,
WSCloseCode.invalid_text,
'Invalid UTF-8 text message') from exc
msg = Message(OPCODE_CLOSE, close_code, close_message)
msg = Message(WSMsgType.close, close_code, close_message)
elif payload:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'Invalid close frame: {} {} {!r}'.format(
fin, opcode, payload))
else:
msg = Message(OPCODE_CLOSE, 0, '')
msg = Message(WSMsgType.close, 0, '')

out.feed_data(msg, 0)

elif opcode == OPCODE_PING:
out.feed_data(Message(OPCODE_PING, payload, ''), len(payload))
elif opcode == WSMsgType.ping:
out.feed_data(Message(WSMsgType.ping, payload, ''), len(payload))

elif opcode == OPCODE_PONG:
out.feed_data(Message(OPCODE_PONG, payload, ''), len(payload))
elif opcode == WSMsgType.pong:
out.feed_data(Message(WSMsgType.pong, payload, ''), len(payload))

elif opcode not in (OPCODE_TEXT, OPCODE_BINARY):
elif opcode not in (WSMsgType.text, WSMsgType.binary):
raise WebSocketError(
CLOSE_PROTOCOL_ERROR, "Unexpected opcode={!r}".format(opcode))
WSCloseCode.protocol_error,
"Unexpected opcode={!r}".format(opcode))
else:
# load text/binary
data = [payload]
Expand All @@ -135,58 +131,59 @@ def WebSocketParser(out, buf):

# We can receive ping/close in the middle of
# text message, Case 5.*
if _opcode == OPCODE_PING:
if _opcode == WSMsgType.ping:
out.feed_data(
Message(OPCODE_PING, payload, ''), len(payload))
Message(WSMsgType.ping, payload, ''), len(payload))
fin, _opcode, payload = yield from parse_frame(buf, True)
elif _opcode == OPCODE_CLOSE:
elif _opcode == WSMsgType.close:
if len(payload) >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
if (close_code not in ALLOWED_CLOSE_CODES and
close_code < 3000):
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'Invalid close code: {}'.format(close_code))
try:
close_message = payload[2:].decode('utf-8')
except UnicodeDecodeError as exc:
raise WebSocketError(
CLOSE_INVALID_TEXT,
WSCloseCode.invalid_text,
'Invalid UTF-8 text message') from exc
msg = Message(OPCODE_CLOSE, close_code, close_message)
msg = Message(WSMsgType.close, close_code,
close_message)
elif payload:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'Invalid close frame: {} {} {!r}'.format(
fin, opcode, payload))
else:
msg = Message(OPCODE_CLOSE, 0, '')
msg = Message(WSMsgType.close, 0, '')

out.feed_data(msg, 0)
fin, _opcode, payload = yield from parse_frame(buf, True)

if _opcode != OPCODE_CONTINUATION:
if _opcode != WSMsgType.continuation:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'The opcode in non-fin frame is expected '
'to be zero, got {!r}'.format(_opcode))
else:
data.append(payload)

if opcode == OPCODE_TEXT:
if opcode == WSMsgType.text:
try:
text = b''.join(data).decode('utf-8')
out.feed_data(
Message(
OPCODE_TEXT, text, ''), len(text))
WSMsgType.text, text, ''), len(text))
except UnicodeDecodeError as exc:
raise WebSocketError(
CLOSE_INVALID_TEXT,
WSCloseCode.invalid_text,
'Invalid UTF-8 text message') from exc
else:
data = b''.join(data)
out.feed_data(
Message(OPCODE_BINARY, data, ''), len(data))
Message(WSMsgType.binary, data, ''), len(data))


native_byteorder = sys.byteorder
Expand Down Expand Up @@ -245,17 +242,17 @@ def parse_frame(buf, continuation=False):
# frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
if rsv1 or rsv2 or rsv3:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'Received frame with non-zero reserved bits')

if opcode > 0x7 and fin == 0:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'Received fragmented control frame')

if fin == 0 and opcode == OPCODE_CONTINUATION and not continuation:
if fin == 0 and opcode == WSMsgType.continuation and not continuation:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
'Received new fragment frame with non-zero '
'opcode {!r}'.format(opcode))

Expand All @@ -265,7 +262,7 @@ def parse_frame(buf, continuation=False):
# Control frames MUST have a payload length of 125 bytes or less
if opcode > 0x7 and length > 125:
raise WebSocketError(
CLOSE_PROTOCOL_ERROR,
WSCloseCode.protocol_error,
"Control frame payload cannot be larger than 125 bytes")

# read payload
Expand Down Expand Up @@ -329,29 +326,29 @@ def pong(self, message=b''):
"""Send pong message."""
if isinstance(message, str):
message = message.encode('utf-8')
self._send_frame(message, OPCODE_PONG)
self._send_frame(message, WSMsgType.pong)

def ping(self, message=b''):
"""Send ping message."""
if isinstance(message, str):
message = message.encode('utf-8')
self._send_frame(message, OPCODE_PING)
self._send_frame(message, WSMsgType.ping)

def send(self, message, binary=False):
"""Send a frame over the websocket with message as its payload."""
if isinstance(message, str):
message = message.encode('utf-8')
if binary:
self._send_frame(message, OPCODE_BINARY)
self._send_frame(message, WSMsgType.binary)
else:
self._send_frame(message, OPCODE_TEXT)
self._send_frame(message, WSMsgType.text)

def close(self, code=1000, message=b''):
"""Close the websocket, sending the specified code and message."""
if isinstance(message, str):
message = message.encode('utf-8')
self._send_frame(
PACK_CLOSE_CODE(code) + message, opcode=OPCODE_CLOSE)
PACK_CLOSE_CODE(code) + message, opcode=WSMsgType.close)


def do_handshake(method, headers, transport, protocols=()):
Expand Down
4 changes: 2 additions & 2 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from .client_reqrep import ClientRequest, ClientResponse
from .errors import WSServerHandshakeError
from .helpers import CookieJar
from .websocket import WS_KEY, WebSocketParser, WebSocketWriter
from .websocket_client import ClientWebSocketResponse
from ._ws_impl import WS_KEY, WebSocketParser, WebSocketWriter
from .client_ws import ClientWebSocketResponse
from . import hdrs, helpers


Expand Down
Loading