Skip to content

Commit

Permalink
Fix encoding errors (celery#198)
Browse files Browse the repository at this point in the history
* Don't default content_encoding to utf-8 for bytes

This is not an acceptable default as the content may not be
valid utf-8, and even if it is, the producer likely does not
expect the message to be decoded by the consumer.

* Fix encoding of messages with multibyte characters

Body length was previously calculated using string length,
which may be less than the length of the encoded body when
it contains multibyte sequences. This caused the body of
the frame to be truncated.

* Respect content_encoding when encoding messages

Previously the content_encoding was ignored and messages
were always encoded as utf-8. This caused messages to be
incorrectly decoded if content_encoding is properly respected
when decoding.
  • Loading branch information
evanunderscore authored and auvipy committed Aug 13, 2018
1 parent 868e5d3 commit d879de2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
14 changes: 9 additions & 5 deletions amqp/method_framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from . import spec
from .basic_message import Message
from .exceptions import UnexpectedFrame
from .five import range
from .five import range, text_t
from .platform import pack, pack_into, unpack_from
from .utils import str_to_bytes

Expand Down Expand Up @@ -87,7 +87,7 @@ def on_frame(frame):

def frame_writer(connection, transport,
pack=pack, pack_into=pack_into, range=range, len=len,
bytes=bytes, str_to_bytes=str_to_bytes):
bytes=bytes, str_to_bytes=str_to_bytes, text_t=text_t):
"""Create closure that writes frames."""
write = transport.write

Expand All @@ -103,8 +103,12 @@ def write_frame(type_, channel, method_sig, args, content):
properties = None
args = str_to_bytes(args)
if content:
properties = content._serialize_properties()
body = content.body
if isinstance(body, text_t):
encoding = content.properties.setdefault(
'content_encoding', 'utf-8')
body = body.encode(encoding)
properties = content._serialize_properties()
bodylen = len(body)
framelen = (
len(args) +
Expand Down Expand Up @@ -137,7 +141,7 @@ def write_frame(type_, channel, method_sig, args, content):
framelen = len(frame)
write(pack('>BHI%dsB' % framelen,
3, channel, framelen,
str_to_bytes(frame), 0xce))
frame, 0xce))

else:
# ## FAST: pack into buffer and single write
Expand All @@ -162,7 +166,7 @@ def write_frame(type_, channel, method_sig, args, content):
if bodylen > 0:
framelen = bodylen
pack_into('>BHI%dsB' % framelen, buf, offset,
3, channel, framelen, str_to_bytes(body), 0xce)
3, channel, framelen, body, 0xce)
offset += 8 + framelen

write(view[:offset])
Expand Down
1 change: 0 additions & 1 deletion amqp/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,6 @@ def _serialize_properties(self):
flags = []
sformat, svalues = [], []
props = self.properties
props.setdefault('content_encoding', 'utf-8')
for key, proptype in self.PROPERTIES:
val = props.get(key, None)
if val is not None:
Expand Down
33 changes: 33 additions & 0 deletions t/unit/test_method_framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,48 @@ def test_write_fast_content(self):
frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
self.g(*frame)
self.write.assert_called()
assert 'content_encoding' not in msg.properties

def test_write_slow_content(self):
msg = Message(body=b'y' * 2048, content_type='utf-8')
frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
self.g(*frame)
self.write.assert_called()
assert 'content_encoding' not in msg.properties

def test_write_zero_len_body(self):
msg = Message(body=b'', content_type='application/octet-stream')
frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
self.g(*frame)
self.write.assert_called()
assert 'content_encoding' not in msg.properties

def test_write_fast_unicode(self):
msg = Message(body='\N{CHECK MARK}')
frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
self.g(*frame)
self.write.assert_called()
memory = self.write.call_args[0][0]
assert isinstance(memory, memoryview)
assert '\N{CHECK MARK}'.encode('utf-8') in memory.tobytes()
assert msg.properties['content_encoding'] == 'utf-8'

def test_write_slow_unicode(self):
msg = Message(body='y' * 2048 + '\N{CHECK MARK}')
frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
self.g(*frame)
self.write.assert_called()
memory = self.write.call_args[0][0]
assert isinstance(memory, bytes)
assert '\N{CHECK MARK}'.encode('utf-8') in memory
assert msg.properties['content_encoding'] == 'utf-8'

def test_write_non_utf8(self):
msg = Message(body='body', content_encoding='utf-16')
frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
self.g(*frame)
self.write.assert_called()
memory = self.write.call_args[0][0]
assert isinstance(memory, memoryview)
assert 'body'.encode('utf-16') in memory.tobytes()
assert msg.properties['content_encoding'] == 'utf-16'

0 comments on commit d879de2

Please sign in to comment.