Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use hardware accelerated CRC32C function if available #1389

Merged
merged 2 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,16 @@ Install the `python-snappy` module
.. code:: bash

pip install python-snappy


Optional crc32c install
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeffwidman Could you read this through, good enough?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great

***********************
Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python`
uses a new message protocol version, that requires calculation of `crc32c`,
which differs from `zlib.crc32` hash implementation. By default `kafka-python`
calculates it in pure python, which is quite slow. To speed it up we optionally
support https://pypi.python.org/pypi/crc32c package if it's installed.

.. code:: bash

pip install crc32c
14 changes: 11 additions & 3 deletions kafka/record/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import binascii

from kafka.record._crc32c import crc as crc32c_py
try:
from crc32c import crc32 as crc32c_c
except ImportError:
crc32c_c = None


def encode_varint(value, write):
Expand Down Expand Up @@ -113,11 +117,15 @@ def decode_varint(buffer, pos=0):
raise ValueError("Out of int64 range")


def calc_crc32c(memview):
_crc32c = crc32c_py
if crc32c_c is not None:
_crc32c = crc32c_c


def calc_crc32c(memview, _crc32c=_crc32c):
""" Calculate CRC-32C (Castagnoli) checksum over a memoryview of data
"""
crc = crc32c_py(memview)
return crc
return _crc32c(memview)


def calc_crc32(memview):
Expand Down
5 changes: 3 additions & 2 deletions test/record/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ def test_size_of_varint(encoded, decoded):
assert util.size_of_varint(decoded) == len(encoded)


def test_crc32c():
@pytest.mark.parametrize("crc32_func", [util.crc32c_c, util.crc32c_py])
def test_crc32c(crc32_func):
def make_crc(data):
crc = util.calc_crc32c(data)
crc = crc32_func(data)
return struct.pack(">I", crc)
assert make_crc(b"") == b"\x00\x00\x00\x00"
assert make_crc(b"a") == b"\xc1\xd0\x43\x30"
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ deps =
python-snappy
lz4
xxhash
crc32c
py26: unittest2
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
Expand Down