Skip to content

Commit

Permalink
Optimize varint decoding, increasing consume performance by ~15%
Browse files Browse the repository at this point in the history
Directly access the slice's segments, rather than using the
more costly abstraction.

This also fixes a bug where varint decoding would truncate/wrap numbers
above SIZE_MAX, which is just 32-bits on 32-bit platforms.
  • Loading branch information
edenhill committed Aug 19, 2019
1 parent e8b1c06 commit faf9226
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 51 deletions.
53 changes: 50 additions & 3 deletions src/rdbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,6 @@ size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) {
if (unlikely(!seg || seg->seg_absof+rof >= slice->end))
return 0;

rd_assert(seg->seg_absof+rof <= slice->end);


*p = (const void *)(seg->seg_p + rof);
rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice));

Expand Down Expand Up @@ -870,6 +867,56 @@ size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
}


/**
* @brief Read a varint-encoded signed integer from \p slice,
* storing the decoded number in \p nump on success (return value > 0).
*
* @returns the number of bytes read on success or 0 in case of
* buffer underflow.
*/
size_t rd_slice_read_varint (rd_slice_t *slice, int64_t *nump) {
uint64_t num = 0;
int shift = 0;
size_t rof = slice->rof;
const rd_segment_t *seg;

/* Traverse segments, byte for byte, until varint is decoded
* or no more segments available (underflow). */
for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
for ( ; rof < seg->seg_of ; rof++) {
unsigned char oct;

if (unlikely(seg->seg_absof+rof >= slice->end))
return 0; /* Underflow */

oct = *(const unsigned char *)(seg->seg_p + rof);

num |= (uint64_t)(oct & 0x7f) << shift;
shift += 7;

if (!(oct & 0x80)) {
/* Done: no more bytes expected */

/* Zig-zag decoding */
*nump = (int64_t)((num >> 1) ^
-(int64_t)(num & 1));

/* Update slice's read pointer and offset */
if (slice->seg != seg)
slice->seg = seg;
slice->rof = rof + 1; /* including the +1 byte
* that was just read */

return shift / 7;
}
}

rof = 0;
}

return 0; /* Underflow */
}


/**
* @returns a pointer to \p size contiguous bytes at the current read offset.
Expand Down
2 changes: 1 addition & 1 deletion src/rdbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size);
size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
void *dst, size_t size);

size_t rd_slice_read_varint (rd_slice_t *slice, size_t *nump);
size_t rd_slice_read_varint (rd_slice_t *slice, int64_t *nump);

const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size);

Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ rd_tmpabuf_write_str0 (const char *func, int line,
*/
#define rd_kafka_buf_read_varint(rkbuf,dst) do { \
int64_t _v; \
size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \
size_t _r = rd_slice_read_varint(&(rkbuf)->rkbuf_reader, &_v);\
if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
"varint parsing failed");\
Expand Down Expand Up @@ -413,8 +413,8 @@ rd_tmpabuf_write_str0 (const char *func, int line,
*/
#define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do { \
int64_t _len2; \
size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \
&_len2); \
size_t _r = rd_slice_read_varint(&(rkbuf)->rkbuf_reader, \
&_len2); \
if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
"varint parsing failed"); \
Expand Down
84 changes: 51 additions & 33 deletions src/rdvarint.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,6 @@
#include "rdunittest.h"


/**
* @brief Read a varint-encoded signed integer from \p slice.
*/
size_t rd_varint_dec_slice (rd_slice_t *slice, int64_t *nump) {
size_t num = 0;
int shift = 0;
unsigned char oct;

/* FIXME: Optimize to use something better than read() */
do {
size_t r = rd_slice_read(slice, &oct, sizeof(oct));
if (unlikely(r == 0))
return 0; /* Underflow */
num |= (uint64_t)(oct & 0x7f) << shift;
shift += 7;
} while (oct & 0x80);

*nump = (int64_t)((num >> 1) ^ -(int64_t)(num & 1));

return shift / 7;
}





static int do_test_rd_uvarint_enc_i64 (const char *file, int line,
int64_t num, const char *exp,
size_t exp_size) {
Expand Down Expand Up @@ -86,27 +60,43 @@ static int do_test_rd_uvarint_enc_i64 (const char *file, int line,

/* Verify with slice decoder */
rd_buf_init(&b, 1, 0);
rd_buf_push(&b, buf, sz, NULL);
rd_buf_push(&b, buf, sizeof(buf), NULL); /* including trailing 0xff
* garbage which should be
* ignored by decoder */
rd_slice_init_full(&slice, &b);

/* Should fail for incomplete reads */
ir = rd_slice_narrow_copy(&slice, &bad_slice,
rd_slice_remains(&slice)-1);
ir = rd_slice_narrow_copy(&slice, &bad_slice, sz-1);
RD_UT_ASSERT(ir, "narrow_copy failed");
ret_num = -1;
r = rd_varint_dec_slice(&bad_slice, &ret_num);
r = rd_slice_read_varint(&bad_slice, &ret_num);
RD_UT_ASSERT(RD_UVARINT_DEC_FAILED(r),
"varint decode failed should have failed, returned %"PRIusz,
"varint decode failed should have failed, "
"returned %"PRIusz,
r);
r = rd_slice_offset(&bad_slice);
RD_UT_ASSERT(r == 0,
"expected slice position to not change, but got %"PRIusz,
r);

/* Verify proper slice */
ret_num = -1;
r = rd_varint_dec_slice(&slice, &ret_num);
r = rd_slice_read_varint(&slice, &ret_num);
RD_UT_ASSERT(!RD_UVARINT_DEC_FAILED(r),
"varint decode failed: %"PRIusz, r);
RD_UT_ASSERT(ret_num == num,
"varint decode returned wrong number: "
"%"PRId64" != %"PRId64, ret_num, num);
RD_UT_ASSERT(r == sz,
"expected varint decoder to read %"PRIusz" bytes, "
"not %"PRIusz,
sz, r);
r = rd_slice_offset(&slice);
RD_UT_ASSERT(r == sz,
"expected slice position to change to %"PRIusz
", but got %"PRIusz,
sz, r);


rd_buf_destroy(&b);

Expand All @@ -117,10 +107,38 @@ static int do_test_rd_uvarint_enc_i64 (const char *file, int line,
int unittest_rdvarint (void) {
int fails = 0;

fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__, 0,
(const char[]){ 0 }, 1);
fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__, 1,
(const char[]){ 0x2 }, 1);
fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__, -1,
(const char[]){ 0x1 }, 1);
fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__, 23,
(const char[]){ 23<<1 }, 1);
(const char[]){ 0x2e }, 1);
fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__, -23,
(const char[]){ 0x2d }, 1);
fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__, 253,
(const char[]){ 0xfa, 3 }, 2);
fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__,
1234567890101112,
(const char[]){ 0xf0,
0x8d,
0xd3,
0xc8,
0xa7,
0xb5,
0xb1,
0x04 }, 8);
fails += do_test_rd_uvarint_enc_i64(__FILE__, __LINE__,
-1234567890101112,
(const char[]){ 0xef,
0x8d,
0xd3,
0xc8,
0xa7,
0xb5,
0xb1,
0x04 }, 8);

return fails;
}
14 changes: 3 additions & 11 deletions src/rdvarint.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ size_t rd_uvarint_enc_i32 (char *dst, size_t dstsize, int32_t num) {
* @returns the number of bytes read from \p src.
*/
static RD_INLINE RD_UNUSED
size_t rd_uvarint_dec (const char *src, size_t srcsize, size_t *nump) {
size_t rd_uvarint_dec (const char *src, size_t srcsize, uint64_t *nump) {
size_t of = 0;
size_t num = 0;
uint64_t num = 0;
int shift = 0;

do {
Expand All @@ -130,7 +130,7 @@ size_t rd_uvarint_dec (const char *src, size_t srcsize, size_t *nump) {

static RD_INLINE RD_UNUSED
size_t rd_varint_dec_i64 (const char *src, size_t srcsize, int64_t *nump) {
size_t n;
uint64_t n;
size_t r;

r = rd_uvarint_dec(src, srcsize, &n);
Expand All @@ -141,14 +141,6 @@ size_t rd_varint_dec_i64 (const char *src, size_t srcsize, int64_t *nump) {
}


/**
* @brief Read a varint-encoded signed integer from \p slice.
*
* @sa rd_uvarint_dec()
*/
size_t rd_varint_dec_slice (rd_slice_t *slice, int64_t *nump);


/**
* @returns the maximum encoded size for a type
*/
Expand Down

0 comments on commit faf9226

Please sign in to comment.