Skip to content

Commit

Permalink
redis protocol: tokenizer, hash and zset (#146)
Browse files Browse the repository at this point in the history
* update osx entries in travis

* compose/parse commands

* Squashed 'deps/ccommon/' changes from bb298bc..ce0b9ea

ce0b9ea allow printing negative integers in cc_print (#141)
ab0edc8 add metrics to track buf_sock objects (#138)
ae02038 add travis ci (copied from pelikan) (#139)
964645a Merge pull request #135 from paegun/fix_cmake_install
70710c2 fixed and re-added cmake install instructions, w/ following notes: include directory made proper relative; opened pattern match b/c include directory should only contain files meant for inclusion.
5b095bc Merge pull request #126 from kevyang/kevyang/120
426d56a return NULL when cc_alloc/cc_realloc is called with size == 0
ad271d4 Merge pull request #133 from kevyang/132
47dbdba suppress unused parameter warning in debug_log_flush
648d19e Merge pull request #127 from kevyang/56
780941a Merge pull request #130 from kevyang/129
b8af6c0 Merge pull request #131 from kevyang/128
6ecc318 fix duplicate symbols in cc_signal
080c41d cc_array - stop doing arithmetic on void *
d526f7a add debug oriented memory management
a4fb927 Update bool member rules in style guide
05c6e1e explicitly make ccommon a C project to avoid checking for CXX related variables

git-subtree-dir: deps/ccommon
git-subtree-split: ce0b9ea

* use new macro imported from ccommon

* more tests

* modify response for redis

* add response check

* add response parse/compose

* Squashed 'deps/ccommon/' changes from ce0b9ea..b1babb2

b1babb2 change wheel's sleep timer to make it less flaky (#143)

git-subtree-dir: deps/ccommon
git-subtree-split: b1babb2

* grep for error in unittest logs

* change nelem type in token_array_nelem

* changes reflected Kevin's review on Mar 22

* address most comments

* Squashed 'deps/ccommon/' changes from b1babb2..9264bbb

9264bbb Zero byte (#147) (emergency fix needed for pelikan)
d4002d7 simplify cc_print_int64 (#146)
b164fcf Clean-up hash functions and introduce MurmurHash3 (#145)

git-subtree-dir: deps/ccommon
git-subtree-split: 9264bbb

* changes needed to use new hash function names

* add more tests and fix corresponding bugs

* temporarily disable flaky build

* feedback from seppo0010

* adding comment to token.h
Yao Yue authored Apr 4, 2017
1 parent 524922d commit 8a7e319
Showing 39 changed files with 2,717 additions and 557 deletions.
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -72,9 +72,14 @@ matrix:

# clang 3.7 on osx
- os: osx
osx_image: xcode7.1
osx_image: xcode7.3
env: C_COMPILER=clang

# # clang 4.2 on osx
# - os: osx
# osx_image: xcode8.2
# env: C_COMPILER=clang


before_install:
# for osx: 0. update brew; 1. install cmake if missing; 2. (gcc) unlink pre-installed gcc; 3. (gcc) install desired version of gcc
@@ -93,4 +98,5 @@ script:
- cmake ..
- make -j
- make check
- egrep -r ":F:|:E:" . || true
- cd ../test/integration && python test_twemcache.py
6 changes: 6 additions & 0 deletions deps/ccommon/.travis.yml
Original file line number Diff line number Diff line change
@@ -75,6 +75,11 @@ matrix:
osx_image: xcode7.1
env: C_COMPILER=clang

# # clang 4.2 on osx
# - os: osx
# osx_image: xcode8.2
# env: C_COMPILER=clang


before_install:
# for osx: 0. update brew; 1. install cmake if missing; 2. (gcc) unlink pre-installed gcc; 3. (gcc) install desired version of gcc
@@ -93,3 +98,4 @@ script:
- cmake ..
- make -j
- make check
- egrep -r ":F:|:E:" . || true
2 changes: 1 addition & 1 deletion deps/ccommon/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ endif()

# version info
set(${PROJECT_NAME}_VERSION_MAJOR 1)
set(${PROJECT_NAME}_VERSION_MINOR 1)
set(${PROJECT_NAME}_VERSION_MINOR 2)
set(${PROJECT_NAME}_VERSION_PATCH 0)
set(${PROJECT_NAME}_VERSION
${${PROJECT_NAME}_VERSION_MAJOR}.${${PROJECT_NAME}_VERSION_MINOR}.${${PROJECT_NAME}_VERSION_PATCH}
4 changes: 4 additions & 0 deletions deps/ccommon/include/buffer/cc_buf.h
Original file line number Diff line number Diff line change
@@ -169,6 +169,10 @@ buf_read(char *dst, struct buf *src, uint32_t count)
static inline uint32_t
buf_write(struct buf *dst, char *src, uint32_t count)
{
if (count == 0) {
return 0;
}

ASSERT(dst != NULL && src != NULL);

uint32_t len = MIN(buf_wsize(dst), count);
38 changes: 0 additions & 38 deletions deps/ccommon/include/cc_lookup3.h

This file was deleted.

3 changes: 3 additions & 0 deletions deps/ccommon/include/cc_print.h
Original file line number Diff line number Diff line change
@@ -44,7 +44,10 @@ extern "C" {

/* behavior undefined if there isn't enough space in buf */
size_t cc_print_uint64_unsafe(char *buf, uint64_t n);
size_t cc_print_int64_unsafe(char *buf, int64_t n);

size_t cc_print_uint64(char *buf, size_t size, uint64_t n);
size_t cc_print_int64(char *buf, size_t size, int64_t n);

size_t _scnprintf(char *buf, size_t size, const char *fmt, ...);
size_t _vscnprintf(char *buf, size_t size, const char *fmt, va_list args);
4 changes: 4 additions & 0 deletions deps/ccommon/include/cc_util.h
Original file line number Diff line number Diff line change
@@ -57,13 +57,17 @@ extern "C" {
* # define UINT16_MAX (65535)
* # define UINT32_MAX (4294967295U)
* # define UINT64_MAX (__UINT64_C(18446744073709551615))
*
* # define INT64_MIN -9223372036854775808LL
*/
#define CC_UINT8_MAXLEN (3 + 1)
#define CC_UINT16_MAXLEN (5 + 1)
#define CC_UINT32_MAXLEN (10 + 1)
#define CC_UINT64_MAXLEN (20 + 1)
#define CC_UINTMAX_MAXLEN CC_UINT64_MAXLEN

#define CC_INT64_MAXLEN (1 + 19 + 1)

/* alignment */
/* Make data 'd' or pointer 'p', n-byte aligned, where n is a power of 2 */
#define CC_ALIGNMENT sizeof(unsigned long) /* platform word */
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ extern "C" {
#include <stdint.h>
#include <stdlib.h>

uint32_t hash(const void *key, size_t length, const uint32_t initval);
uint32_t hash_lookup3(const void *key, size_t length, const uint32_t initval);

#ifdef __cplusplus
}
39 changes: 39 additions & 0 deletions deps/ccommon/include/hash/cc_murmur3.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* ccommon - a cache common library.
* Copyright (C) 2013 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* The cc_murmur3.[ch] are adapated from the canonical implementation of
* MurmurHash3 by Austin Appleby, released as part of SMHasher:
* https://github.com/aappleby/smhasher
*
* Changes include renaming fuctions, removing MSVC-related code, adding "static"
* keyword to local-scope functions according to C language spec (original code is
* in C++), to better fit them into the scope and style of ccommon
*
* The actual implementation is untouched.
*/

#pragma once

#include <stdint.h>


void hash_murmur3_32 ( const void * key, int len, uint32_t seed, void * out );

void hash_murmur3_128_x86 ( const void * key, int len, uint32_t seed, void * out );

void hash_murmur3_128_x64 ( const void * key, int len, uint32_t seed, void * out );
39 changes: 39 additions & 0 deletions deps/ccommon/src/cc_print.c
Original file line number Diff line number Diff line change
@@ -22,6 +22,9 @@
* implementation as a reference (folly/Conv.h)
*/

/* use our own macro instead of llabs() to make sure it works with INT64_MIN */
#define abs_int64(_x) ((_x) >= 0 ? (_x) : -(_x))

static inline void
_print_uint64(char *buf, size_t d, uint64_t n)
{
@@ -46,6 +49,22 @@ cc_print_uint64_unsafe(char *buf, uint64_t n)
return d;
}

size_t
cc_print_int64_unsafe(char *buf, int64_t n)
{
size_t d;
uint64_t ab = abs_int64(n);

if (n < 0) {
*buf++ = '-';
}

d = digits(ab);
_print_uint64(buf, d, ab);

return d + (n < 0);
}

size_t
cc_print_uint64(char *buf, size_t size, uint64_t n)
{
@@ -61,6 +80,26 @@ cc_print_uint64(char *buf, size_t size, uint64_t n)
return d;
}

size_t
cc_print_int64(char *buf, size_t size, int64_t n)
{
size_t d;
uint64_t ab = abs_int64(n);

d = digits(ab);
if (size < d + (n < 0)) {
return 0;
}

if (n < 0) {
*buf++ = '-';
}

_print_uint64(buf, d, n);

return d + (n < 0);
}

size_t
_vscnprintf(char *buf, size_t size, const char *fmt, va_list args)
{
2 changes: 1 addition & 1 deletion deps/ccommon/src/hash/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
set(SOURCE
${SOURCE}
hash/cc_hash.c
hash/cc_lookup3.c
hash/cc_murmur3.c
PARENT_SCOPE)
446 changes: 0 additions & 446 deletions deps/ccommon/src/hash/cc_hash.c

This file was deleted.

272 changes: 212 additions & 60 deletions deps/ccommon/src/hash/cc_lookup3.c
Original file line number Diff line number Diff line change
@@ -1,36 +1,50 @@
/*
Excerpt and modified from lookup3.c (http://burtleburtle.net/bob/c/lookup3.c),
originally by Bob Jenkins, May 2006, Public Domain.
*/

#include <cc_lookup3.h>

#include <cc_define.h>
* ccommon - a cache common library.
* Copyright (C) 2013 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <stdint.h> /* defines uint32_t etc */
#include <sys/param.h> /* attempt to define endianness */
/*
* Hash table
*
* The hash function used here is by Bob Jenkins, 1996:
* <http://burtleburtle.net/bob/hash/doobs.html>
* "By Bob Jenkins, 1996. bob_jenkins@burtleburtle.net.
* You may use this code any way you wish, private, educational,
* or commercial. It's free."
*
*/

/*
* My best guess at if you are big-endian or little-endian. This may
* need adjustment.
* Since the hash function does bit manipulation, it needs to know
* whether it's big or little-endian. HAVE_LITTLE_ENDIAN and HAVE_BIG_ENDIAN
* are set in the configure script.
*/
#if (defined(CC_LITTLE_ENDIAN)) || \
(defined(i386) || defined(__i386__) || defined(__i486__) || \
defined(__i586__) || defined(__i686__) || defined(vax) || defined(MIPSEL))
# define HASH_LITTLE_ENDIAN 1
# define HASH_BIG_ENDIAN 0
#elif (defined(CC_BIG_ENDIAN)) || \
(defined(sparc) || defined(POWERPC) || defined(mc68000) || defined(sel))
#include <hash/cc_lookup3.h>

#if defined CC_BIG_ENDIAN && CC_BIG_ENDIAN == 1
# define HASH_LITTLE_ENDIAN 0
# define HASH_BIG_ENDIAN 1
# define HASH_BIG_ENDIAN 1
#elif defined CC_LITTLE_ENDIAN && CC_LITTLE_ENDIAN == 1
# define HASH_LITTLE_ENDIAN 1
# define HASH_BIG_ENDIAN 0
#else
# define HASH_LITTLE_ENDIAN 0
# define HASH_BIG_ENDIAN 0
# define HASH_BIG_ENDIAN 0
#endif

#define hashsize(n) ((uint32_t)1<<(n))
#define hashmask(n) (hashsize(n)-1)
#define rot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
#define rot(x,k) (((x)<<(k)) ^ ((x)>>(32-(k))))

/*
-------------------------------------------------------------------------------
@@ -122,35 +136,11 @@ and these came close:
c ^= b; c -= rot(b,24); \
}

/*
-------------------------------------------------------------------------------
hashlittle() -- hash a variable-length key into a 32-bit value
k : the key (the unaligned variable-length array of bytes)
length : the length of the key, counting by bytes
initval : can be any 4-byte value
Returns a 32-bit value. Every bit of the key affects every bit of
the return value. Two keys differing by one or two bits will have
totally different hash values.
The best hash table sizes are powers of 2. There is no need to do
mod a prime (mod is sooo slow!). If you need less than 32 bits,
use a bitmask. For example, if you need only 10 bits, do
h = (h & hashmask(10));
In which case, the hash table should have hashsize(10) elements.
If you are hashing n strings (uint8_t **)k, do it like this:
for (i=0, h=0; i<n; ++i) h = hashlittle( k[i], len[i], h);
By Bob Jenkins, 2006. bob_jenkins@burtleburtle.net. You may use this
code any way you wish, private, educational, or commercial. It's free.
Use for hash table lookup, or anything where one collision in 2^^32 is
acceptable. Do NOT use for cryptographic purposes.
-------------------------------------------------------------------------------
*/

uint32_t
hashlittle( const void *key, size_t length, uint32_t initval)
#if HASH_LITTLE_ENDIAN == 1
uint32_t hash_lookup3(
const void *key, /* the key to hash */
size_t length, /* length of the key */
const uint32_t initval) /* initval */
{
uint32_t a,b,c; /* internal state */
union { const void *ptr; size_t i; } u; /* needed for Mac Powerbook G4 */
@@ -161,6 +151,9 @@ hashlittle( const void *key, size_t length, uint32_t initval)
u.ptr = key;
if (HASH_LITTLE_ENDIAN && ((u.i & 0x3) == 0)) {
const uint32_t *k = (const uint32_t *)key; /* read 32-bit chunks */
#ifdef VALGRIND
const uint8_t *k8;
#endif /* ifdef VALGRIND */

/*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */
while (length > 12)
@@ -173,6 +166,7 @@ hashlittle( const void *key, size_t length, uint32_t initval)
k += 3;
}

/*----------------------------- handle the last (probably partial) block */
/*
* "k[2]&0xffffff" actually reads beyond the end of the string, but
* then masks off the part it's not allowed to read. Because the
@@ -182,6 +176,8 @@ hashlittle( const void *key, size_t length, uint32_t initval)
* still catch it and complain. The masking trick does make the hash
* noticably faster for short strings (like English words).
*/
#ifndef VALGRIND

switch(length)
{
case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
@@ -196,9 +192,31 @@ hashlittle( const void *key, size_t length, uint32_t initval)
case 3 : a+=k[0]&0xffffff; break;
case 2 : a+=k[0]&0xffff; break;
case 1 : a+=k[0]&0xff; break;
case 0 : return c; /* zero length strings require no mixing */
case 0 : return c; /* zero length strings require no mixing */
}

#else /* make valgrind happy */

k8 = (const uint8_t *)k;
switch(length)
{
case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
case 11: c+=((uint32_t)k8[10])<<16; /* fall through */
case 10: c+=((uint32_t)k8[9])<<8; /* fall through */
case 9 : c+=k8[8]; /* fall through */
case 8 : b+=k[1]; a+=k[0]; break;
case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */
case 6 : b+=((uint32_t)k8[5])<<8; /* fall through */
case 5 : b+=k8[4]; /* fall through */
case 4 : a+=k[0]; break;
case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */
case 2 : a+=((uint32_t)k8[1])<<8; /* fall through */
case 1 : a+=k8[0]; break;
case 0 : return c; /* zero length strings require no mixing */
}

#endif /* !valgrind */

} else if (HASH_LITTLE_ENDIAN && ((u.i & 0x1) == 0)) {
const uint16_t *k = (const uint16_t *)key; /* read 16-bit chunks */
const uint8_t *k8;
@@ -222,28 +240,28 @@ hashlittle( const void *key, size_t length, uint32_t initval)
b+=k[2]+(((uint32_t)k[3])<<16);
a+=k[0]+(((uint32_t)k[1])<<16);
break;
case 11: c+=((uint32_t)k8[10])<<16; /* fall through */
case 10: c+=k[4];
case 11: c+=((uint32_t)k8[10])<<16; /* @fallthrough */
case 10: c+=k[4]; /* @fallthrough@ */
b+=k[2]+(((uint32_t)k[3])<<16);
a+=k[0]+(((uint32_t)k[1])<<16);
break;
case 9 : c+=k8[8]; /* fall through */
case 9 : c+=k8[8]; /* @fallthrough */
case 8 : b+=k[2]+(((uint32_t)k[3])<<16);
a+=k[0]+(((uint32_t)k[1])<<16);
break;
case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */
case 7 : b+=((uint32_t)k8[6])<<16; /* @fallthrough */
case 6 : b+=k[2];
a+=k[0]+(((uint32_t)k[1])<<16);
break;
case 5 : b+=k8[4]; /* fall through */
case 5 : b+=k8[4]; /* @fallthrough */
case 4 : a+=k[0]+(((uint32_t)k[1])<<16);
break;
case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */
case 3 : a+=((uint32_t)k8[2])<<16; /* @fallthrough */
case 2 : a+=k[0];
break;
case 1 : a+=k8[0];
break;
case 0 : return c; /* zero length requires no mixing */
case 0 : return c; /* zero length strings require no mixing */
}

} else { /* need to read the key one byte at a time */
@@ -285,10 +303,144 @@ hashlittle( const void *key, size_t length, uint32_t initval)
case 2 : a+=((uint32_t)k[1])<<8;
case 1 : a+=k[0];
break;
case 0 : return c; /* zero length strings require no mixing */
}
}

final(a,b,c);
return c; /* zero length strings require no mixing */
}

#elif HASH_BIG_ENDIAN == 1
/*
* hashbig():
* This is the same as hashword() on big-endian machines. It is different
* from hashlittle() on all machines. hashbig() takes advantage of
* big-endian byte ordering.
*/
uint32_t hash_lookup3( const void *key, size_t length, const uint32_t initval)
{
uint32_t a,b,c;
union { const void *ptr; size_t i; } u; /* to cast key to (size_t) happily */

/* Set up the internal state */
a = b = c = 0xdeadbeef + ((uint32_t)length) + initval;

u.ptr = key;
if (HASH_BIG_ENDIAN && ((u.i & 0x3) == 0)) {
const uint32_t *k = key; /* read 32-bit chunks */
#ifdef VALGRIND
const uint8_t *k8;
#endif /* ifdef VALGRIND */

/*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */
while (length > 12)
{
a += k[0];
b += k[1];
c += k[2];
mix(a,b,c);
length -= 12;
k += 3;
}

/*----------------------------- handle the last (probably partial) block */
/*
* "k[2]<<8" actually reads beyond the end of the string, but
* then shifts out the part it's not allowed to read. Because the
* string is aligned, the illegal read is in the same word as the
* rest of the string. Every machine with memory protection I've seen
* does it on word boundaries, so is OK with this. But VALGRIND will
* still catch it and complain. The masking trick does make the hash
* noticably faster for short strings (like English words).
*/
#ifndef VALGRIND

switch(length)
{
case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
case 11: c+=k[2]&0xffffff00; b+=k[1]; a+=k[0]; break;
case 10: c+=k[2]&0xffff0000; b+=k[1]; a+=k[0]; break;
case 9 : c+=k[2]&0xff000000; b+=k[1]; a+=k[0]; break;
case 8 : b+=k[1]; a+=k[0]; break;
case 7 : b+=k[1]&0xffffff00; a+=k[0]; break;
case 6 : b+=k[1]&0xffff0000; a+=k[0]; break;
case 5 : b+=k[1]&0xff000000; a+=k[0]; break;
case 4 : a+=k[0]; break;
case 3 : a+=k[0]&0xffffff00; break;
case 2 : a+=k[0]&0xffff0000; break;
case 1 : a+=k[0]&0xff000000; break;
case 0 : return c; /* zero length strings require no mixing */
}

#else /* make valgrind happy */

k8 = (const uint8_t *)k;
switch(length) /* all the case statements fall through */
{
case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
case 11: c+=((uint32_t)k8[10])<<8; /* fall through */
case 10: c+=((uint32_t)k8[9])<<16; /* fall through */
case 9 : c+=((uint32_t)k8[8])<<24; /* fall through */
case 8 : b+=k[1]; a+=k[0]; break;
case 7 : b+=((uint32_t)k8[6])<<8; /* fall through */
case 6 : b+=((uint32_t)k8[5])<<16; /* fall through */
case 5 : b+=((uint32_t)k8[4])<<24; /* fall through */
case 4 : a+=k[0]; break;
case 3 : a+=((uint32_t)k8[2])<<8; /* fall through */
case 2 : a+=((uint32_t)k8[1])<<16; /* fall through */
case 1 : a+=((uint32_t)k8[0])<<24; break;
case 0 : return c;
}

#endif /* !VALGRIND */

} else { /* need to read the key one byte at a time */
const uint8_t *k = key;

/*--------------- all but the last block: affect some 32 bits of (a,b,c) */
while (length > 12)
{
a += ((uint32_t)k[0])<<24;
a += ((uint32_t)k[1])<<16;
a += ((uint32_t)k[2])<<8;
a += ((uint32_t)k[3]);
b += ((uint32_t)k[4])<<24;
b += ((uint32_t)k[5])<<16;
b += ((uint32_t)k[6])<<8;
b += ((uint32_t)k[7]);
c += ((uint32_t)k[8])<<24;
c += ((uint32_t)k[9])<<16;
c += ((uint32_t)k[10])<<8;
c += ((uint32_t)k[11]);
mix(a,b,c);
length -= 12;
k += 12;
}

/*-------------------------------- last block: affect all 32 bits of (c) */
switch(length) /* all the case statements fall through */
{
case 12: c+=k[11];
case 11: c+=((uint32_t)k[10])<<8;
case 10: c+=((uint32_t)k[9])<<16;
case 9 : c+=((uint32_t)k[8])<<24;
case 8 : b+=k[7];
case 7 : b+=((uint32_t)k[6])<<8;
case 6 : b+=((uint32_t)k[5])<<16;
case 5 : b+=((uint32_t)k[4])<<24;
case 4 : a+=k[3];
case 3 : a+=((uint32_t)k[2])<<8;
case 2 : a+=((uint32_t)k[1])<<16;
case 1 : a+=((uint32_t)k[0])<<24;
break;
case 0 : return c;
}
}

final(a,b,c);
return c;
}
#else /* HASH_XXX_ENDIAN == 1 */
#error Must define HASH_BIG_ENDIAN or HASH_LITTLE_ENDIAN
#endif /* HASH_XXX_ENDIAN == 1 */
328 changes: 328 additions & 0 deletions deps/ccommon/src/hash/cc_murmur3.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
/*
* ccommon - a cache common library.
* Copyright (C) 2013 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


// Note - The x86 and x64 versions do _not_ produce the same results, as the
// algorithms are optimized for their respective platforms. You can still
// compile and run any of them on any platform, but your performance with the
// non-native version will be less than optimal.

#include "hash/cc_murmur3.h"

#define FORCE_INLINE inline __attribute__((always_inline))

static inline uint32_t rotl32 ( uint32_t x, int8_t r )
{
return (x << r) | (x >> (32 - r));
}

static inline uint64_t rotl64 ( uint64_t x, int8_t r )
{
return (x << r) | (x >> (64 - r));
}

#define ROTL32(x,y) rotl32(x,y)
#define ROTL64(x,y) rotl64(x,y)

#define BIG_CONSTANT(x) (x##LLU)

//-----------------------------------------------------------------------------
// Block read - if your platform needs to do endian-swapping or can only
// handle aligned reads, do the conversion here

static FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
{
return p[i];
}

static FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i )
{
return p[i];
}

//-----------------------------------------------------------------------------
// Finalization mix - force all bits of a hash block to avalanche

static FORCE_INLINE uint32_t fmix32 ( uint32_t h )
{
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;

return h;
}

//----------

static FORCE_INLINE uint64_t fmix64 ( uint64_t k )
{
k ^= k >> 33;
k *= BIG_CONSTANT(0xff51afd7ed558ccd);
k ^= k >> 33;
k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
k ^= k >> 33;

return k;
}

//-----------------------------------------------------------------------------

void hash_murmur3_32 ( const void * key, int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 4;

uint32_t h1 = seed;

const uint32_t c1 = 0xcc9e2d51;
const uint32_t c2 = 0x1b873593;

//----------
// body

const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);

for(int i = -nblocks; i; i++)
{
uint32_t k1 = getblock32(blocks,i);

k1 *= c1;
k1 = ROTL32(k1,15);
k1 *= c2;

h1 ^= k1;
h1 = ROTL32(h1,13);
h1 = h1*5+0xe6546b64;
}

//----------
// tail

const uint8_t * tail = (const uint8_t*)(data + nblocks*4);

uint32_t k1 = 0;

switch(len & 3)
{
case 3: k1 ^= tail[2] << 16;
case 2: k1 ^= tail[1] << 8;
case 1: k1 ^= tail[0];
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};

//----------
// finalization

h1 ^= len;

h1 = fmix32(h1);

*(uint32_t*)out = h1;
}

//-----------------------------------------------------------------------------

void hash_murmur3_128_x86 ( const void * key, const int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;

uint32_t h1 = seed;
uint32_t h2 = seed;
uint32_t h3 = seed;
uint32_t h4 = seed;

const uint32_t c1 = 0x239b961b;
const uint32_t c2 = 0xab0e9789;
const uint32_t c3 = 0x38b34ae5;
const uint32_t c4 = 0xa1e38b93;

//----------
// body

const uint32_t * blocks = (const uint32_t *)(data + nblocks*16);

for(int i = -nblocks; i; i++)
{
uint32_t k1 = getblock32(blocks,i*4+0);
uint32_t k2 = getblock32(blocks,i*4+1);
uint32_t k3 = getblock32(blocks,i*4+2);
uint32_t k4 = getblock32(blocks,i*4+3);

k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;

h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b;

k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;

h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747;

k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;

h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35;

k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;

h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17;
}

//----------
// tail

const uint8_t * tail = (const uint8_t*)(data + nblocks*16);

uint32_t k1 = 0;
uint32_t k2 = 0;
uint32_t k3 = 0;
uint32_t k4 = 0;

switch(len & 15)
{
case 15: k4 ^= tail[14] << 16;
case 14: k4 ^= tail[13] << 8;
case 13: k4 ^= tail[12] << 0;
k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;

case 12: k3 ^= tail[11] << 24;
case 11: k3 ^= tail[10] << 16;
case 10: k3 ^= tail[ 9] << 8;
case 9: k3 ^= tail[ 8] << 0;
k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;

case 8: k2 ^= tail[ 7] << 24;
case 7: k2 ^= tail[ 6] << 16;
case 6: k2 ^= tail[ 5] << 8;
case 5: k2 ^= tail[ 4] << 0;
k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;

case 4: k1 ^= tail[ 3] << 24;
case 3: k1 ^= tail[ 2] << 16;
case 2: k1 ^= tail[ 1] << 8;
case 1: k1 ^= tail[ 0] << 0;
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};

//----------
// finalization

h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len;

h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;

h1 = fmix32(h1);
h2 = fmix32(h2);
h3 = fmix32(h3);
h4 = fmix32(h4);

h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;

((uint32_t*)out)[0] = h1;
((uint32_t*)out)[1] = h2;
((uint32_t*)out)[2] = h3;
((uint32_t*)out)[3] = h4;
}

//-----------------------------------------------------------------------------

void hash_murmur3_128_x64 ( const void * key, const int len,
const uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;

uint64_t h1 = seed;
uint64_t h2 = seed;

const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);

//----------
// body

const uint64_t * blocks = (const uint64_t *)(data);

for(int i = 0; i < nblocks; i++)
{
uint64_t k1 = getblock64(blocks,i*2+0);
uint64_t k2 = getblock64(blocks,i*2+1);

k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;

h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729;

k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;

h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5;
}

//----------
// tail

const uint8_t * tail = (const uint8_t*)(data + nblocks*16);

uint64_t k1 = 0;
uint64_t k2 = 0;

switch(len & 15)
{
case 15: k2 ^= ((uint64_t)tail[14]) << 48;
case 14: k2 ^= ((uint64_t)tail[13]) << 40;
case 13: k2 ^= ((uint64_t)tail[12]) << 32;
case 12: k2 ^= ((uint64_t)tail[11]) << 24;
case 11: k2 ^= ((uint64_t)tail[10]) << 16;
case 10: k2 ^= ((uint64_t)tail[ 9]) << 8;
case 9: k2 ^= ((uint64_t)tail[ 8]) << 0;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;

case 8: k1 ^= ((uint64_t)tail[ 7]) << 56;
case 7: k1 ^= ((uint64_t)tail[ 6]) << 48;
case 6: k1 ^= ((uint64_t)tail[ 5]) << 40;
case 5: k1 ^= ((uint64_t)tail[ 4]) << 32;
case 4: k1 ^= ((uint64_t)tail[ 3]) << 24;
case 3: k1 ^= ((uint64_t)tail[ 2]) << 16;
case 2: k1 ^= ((uint64_t)tail[ 1]) << 8;
case 1: k1 ^= ((uint64_t)tail[ 0]) << 0;
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
};

//----------
// finalization

h1 ^= len; h2 ^= len;

h1 += h2;
h2 += h1;

h1 = fmix64(h1);
h2 = fmix64(h2);

h1 += h2;
h2 += h1;

((uint64_t*)out)[0] = h1;
((uint64_t*)out)[1] = h2;
}

//-----------------------------------------------------------------------------


7 changes: 5 additions & 2 deletions deps/ccommon/test/time/wheel/check_wheel.c
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ END_TEST

START_TEST(test_timing_wheel_recur)
{
#define TICK_NS 10000000
#define TICK_NS 50000000
#define NSLOT 3
#define NTICK 2

@@ -136,11 +136,13 @@ START_TEST(test_timing_wheel_recur)

timing_wheel_insert(tw, &delay, true, _incr_cb, &i);

nanosleep(&ts, NULL);
/* tick unchanged */
timing_wheel_execute(tw);
ck_assert_int_eq(tw->nprocess, 0);
ck_assert_int_eq(tw->nevent, 1);

/* next 2 tick */
nanosleep(&ts, NULL);
nanosleep(&ts, NULL);
timing_wheel_execute(tw);
ck_assert_int_eq(tw->nevent, 1);
@@ -152,6 +154,7 @@ START_TEST(test_timing_wheel_recur)
ck_assert_int_eq(tw->nprocess, 2);
ck_assert_int_eq(i, 2);

/* flush events */
timing_wheel_stop(tw);
timing_wheel_flush(tw);
ck_assert_int_eq(tw->nevent, 0);
4 changes: 2 additions & 2 deletions src/hotkey/kc_map.c
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@

#include <cc_bstring.h>
#include <cc_debug.h>
#include <cc_hash.h>
#include <hash/cc_lookup3.h>
#include <cc_mm.h>
#include <cc_pool.h>

@@ -180,7 +180,7 @@ kc_map_teardown(void)
static inline struct kcme_slh *
_get_bucket(const struct bstring *key)
{
return &(table[hash(key->data, key->len, 0) % table_size]);
return &(table[hash_lookup3(key->data, key->len, 0) % table_size]);
}

uint32_t
1 change: 1 addition & 0 deletions src/protocol/data/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
add_subdirectory(memcache)
add_subdirectory(ping)
add_subdirectory(redis)
2 changes: 1 addition & 1 deletion src/protocol/data/memcache/compose.c
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ _write_uint64(struct buf **buf, uint64_t val)
struct buf *b;


/* NOTE(yao): here we are being conservative on how many bytes wee need
/* NOTE(yao): here we are being conservative on how many bytes we need
* to print a (64-bit) integer. The actual number might be smaller.
* But since it is 21 bytes at most (including \0' while buffers usually
* are KBs in size, it is unlikely to cause many extra expansions.
8 changes: 8 additions & 0 deletions src/protocol/data/redis/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
set(SOURCE
compose.c
parse.c
request.c
response.c
token.c)

add_library(protocol_redis ${SOURCE})
28 changes: 28 additions & 0 deletions src/protocol/data/redis/cmd_hash.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

/*
* Note: negative # of arguments means variable number of arguments:
* e.g. `-2' means at least two arguments. This notation is inherited from
* the original Redis server implementation.
*/

/* type string # of args */
#define REQ_HASH(ACTION) \
ACTION( REQ_HDEL, "hdel", -3 )\
ACTION( REQ_HDELALL, "hdelall", 2 )\
ACTION( REQ_HEXISTS, "hexists", 3 )\
ACTION( REQ_HGET, "hget", 3 )\
ACTION( REQ_HGETALL, "hgetall", 2 )\
ACTION( REQ_HINCRBY, "hincrby", 4 )\
ACTION( REQ_HINCRBYFLOAT, "hincrbyfloat", 4 )\
ACTION( REQ_HKEYS, "hkeys", 2 )\
ACTION( REQ_HLEN, "hlen", 2 )\
ACTION( REQ_HMGET, "hmget", -3 )\
ACTION( REQ_HMSET, "hmset", -4 )\
ACTION( REQ_HSET, "hset", 4 )\
ACTION( REQ_HSETNX, "hsetnx", 4 )\
ACTION( REQ_HSTRLEN, "hstrlen", 3 )\
ACTION( REQ_HVALS, "hvals", 2 )\
ACTION( REQ_HSCAN, "hscan", -3 )

/* "hlen KEY" == "*2\r\n$4\r\nhlen\r\n$3\r\nKEY\r\n" */
6 changes: 6 additions & 0 deletions src/protocol/data/redis/cmd_misc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#pragma once

/* type string # of args */
#define REQ_MISC(ACTION) \
ACTION( REQ_PING, "ping", -1 )\
ACTION( REQ_QUIT, "quit", 1 )
25 changes: 25 additions & 0 deletions src/protocol/data/redis/cmd_zset.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

/* type string # of args */
#define REQ_ZSET(ACTION) \
ACTION( REQ_ZADD, "zadd", -4 )\
ACTION( REQ_ZINCRBY, "zincrby", 4 )\
ACTION( REQ_ZREM, "zrem", -3 )\
ACTION( REQ_ZREMRANGEBYSCORE, "zremrangebyscore", 4 )\
ACTION( REQ_ZREMRANGEBYRANK, "zremrangebyrank", 4 )\
ACTION( REQ_ZREMRANGEBYLEX, "zremrangebylex", 4 )\
ACTION( REQ_ZUNIONSTORE, "zunionstore", -4 )\
ACTION( REQ_ZINTERSTORE, "zinterstore", -4 )\
ACTION( REQ_ZRANGE, "zrange", -4 )\
ACTION( REQ_ZRANGEBYSCORE, "zrangebyscore", -4 )\
ACTION( REQ_ZREVRANGEBYSCORE, "zrevrangebyscore", -4 )\
ACTION( REQ_ZRANGEBYLEX, "zrangebylex", -4 )\
ACTION( REQ_ZREVRANGEBYLEX, "zrevrangebylex", -4 )\
ACTION( REQ_ZCOUNT, "zcount", 4 )\
ACTION( REQ_ZLEXCOUNT, "zlexcount", 4 )\
ACTION( REQ_ZREVRANGE, "zrevrange", -4 )\
ACTION( REQ_ZCARD, "zcard", 2 )\
ACTION( REQ_ZSCORE, "zscore", 3 )\
ACTION( REQ_ZRANK, "zrank", 3 )\
ACTION( REQ_ZREVRANK, "zrevrank", 3 )\
ACTION( REQ_ZSCAN, "zscan", -3 )
92 changes: 92 additions & 0 deletions src/protocol/data/redis/compose.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include "compose.h"

#include "request.h"
#include "response.h"
#include "token.h"

#include <cc_debug.h>
#include <cc_print.h>

#define COMPOSE_MODULE_NAME "protocol::redis::compose"

static bool compose_init = false;
static compose_req_metrics_st *compose_req_metrics = NULL;
static compose_rsp_metrics_st *compose_rsp_metrics = NULL;

void
compose_setup(compose_req_metrics_st *req, compose_rsp_metrics_st *rsp)
{
log_info("set up the %s module", COMPOSE_MODULE_NAME);

if (compose_init) {
log_warn("%s has already been setup, overwrite", COMPOSE_MODULE_NAME);
}

compose_req_metrics = req;
compose_rsp_metrics = rsp;

compose_init = true;
}

void
compose_teardown(void)
{
log_info("tear down the %s module", COMPOSE_MODULE_NAME);

if (!compose_init) {
log_warn("%s has never been setup", COMPOSE_MODULE_NAME);
}
compose_req_metrics = NULL;
compose_rsp_metrics = NULL;
compose_init = false;
}

int
compose_req(struct buf **buf, struct request *req)
{
int n;

n = compose_array_header(buf, req->token->nelem);
if (n < 0) {
return n;
}

for (int i = 0; i < req->token->nelem; i++) {
int ret;

ret = compose_element(buf, array_get(req->token, i));
if (ret < 0) {
return ret;
} else {
n += ret;
}
}

return n;
}

int
compose_rsp(struct buf **buf, struct response *rsp)
{
int n = 0;

if (rsp->type == ELEM_ARRAY) {
n = compose_array_header(buf, rsp->token->nelem);
if (n < 0) {
return n;
}
}

for (int i = 0; i < rsp->token->nelem; i++) {
int ret;

ret = compose_element(buf, array_get(rsp->token, i));
if (ret < 0) {
return ret;
} else {
n += ret;
}
}

return n;
}
44 changes: 44 additions & 0 deletions src/protocol/data/redis/compose.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include <buffer/cc_dbuf.h>
#include <cc_define.h>
#include <cc_metric.h>

#include <stdint.h>

/* name Type description */
#define COMPOSE_REQ_METRIC(ACTION) \
ACTION( request_compose, METRIC_COUNTER, "# requests composed" )\
ACTION( request_compose_ex, METRIC_COUNTER, "# composing error" )

/* name Type description */
#define COMPOSE_RSP_METRIC(ACTION) \
ACTION( response_compose, METRIC_COUNTER, "# responses composed" )\
ACTION( response_compose_ex, METRIC_COUNTER, "# rsp composing error")

typedef struct {
COMPOSE_REQ_METRIC(METRIC_DECLARE)
} compose_req_metrics_st;

typedef struct {
COMPOSE_RSP_METRIC(METRIC_DECLARE)
} compose_rsp_metrics_st;

typedef enum compose_rstatus {
COMPOSE_OK = 0,
COMPOSE_EUNFIN = -1,
COMPOSE_ENOMEM = -2,
COMPOSE_EINVALID = -3,
COMPOSE_EOTHER = -4,
} compose_rstatus_t;

struct request;
struct response;

void compose_setup(compose_req_metrics_st *req, compose_rsp_metrics_st *rsp);
void compose_teardown(void);

/* if the return value is negative, it can be interpreted as compose_rstatus */
int compose_req(struct buf **buf, struct request *req);

int compose_rsp(struct buf **buf, struct response *rsp);
180 changes: 180 additions & 0 deletions src/protocol/data/redis/parse.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#include "parse.h"

#include "request.h"
#include "response.h"
#include "token.h"

#include <cc_array.h>
#include <cc_debug.h>
#include <cc_print.h>
#include <cc_util.h>

#include <ctype.h>

#define PARSE_MODULE_NAME "protocol::redis::parse"

static bool parse_init = false;
static parse_req_metrics_st *parse_req_metrics = NULL;
static parse_rsp_metrics_st *parse_rsp_metrics = NULL;

void
parse_setup(parse_req_metrics_st *req, parse_rsp_metrics_st *rsp)
{
log_info("set up the %s module", PARSE_MODULE_NAME);

if (parse_init) {
log_warn("%s has already been setup, overwrite", PARSE_MODULE_NAME);
}

parse_req_metrics = req;
parse_rsp_metrics = rsp;
parse_init = true;
}

void
parse_teardown(void)
{
log_info("tear down the %s module", PARSE_MODULE_NAME);

if (!parse_init) {
log_warn("%s has never been setup", PARSE_MODULE_NAME);
}

parse_req_metrics = NULL;
parse_rsp_metrics = NULL;
parse_init = false;
}

static parse_rstatus_t
_parse_cmd(struct request *req)
{
cmd_type_e type;
struct command cmd;
struct element *el;
int narg;

ASSERT(req != NULL);

/* check verb */
type = REQ_UNKNOWN;
el = array_first(req->token);

ASSERT (el->type == ELEM_BULK);
while (++type < REQ_SENTINEL &&
bstring_compare(&command_table[type].bstr, &el->bstr) != 0) {}
if (type == REQ_SENTINEL) {
log_warn("unrecognized command detected: %.*s", el->bstr.len,
el->bstr.data);
return PARSE_EINVALID;
}
req->type = type;

/* check narg */
cmd = command_table[type];
narg = req->token->nelem;
if ((cmd.narg >= 0 && cmd.narg != narg) || narg + cmd.narg < 0) {
log_warn("wrong number of arguments for '%.*s': %d expected, %d given",
cmd.bstr.len, cmd.bstr.data, cmd.narg, narg);
return PARSE_EINVALID;
}

return PARSE_OK;
}


parse_rstatus_t
parse_req(struct request *req, struct buf *buf)
{
parse_rstatus_t status = PARSE_OK;
char *old_rpos = buf->rpos;
int64_t nelem;
struct element *el;

log_verb("parsing buf %p into req %p", buf, req);

/* get number of elements in the array */
if (!token_is_array(buf)) {
log_debug("parse req failed: not an array");
return PARSE_EINVALID;
}
status = token_array_nelem(&nelem, buf);
if (status != PARSE_OK) {
buf->rpos = old_rpos;
return status;
}
if (nelem < 1 || nelem > req->token->nalloc) {
log_debug("parse req: invalid array size, %d not in [1, %"PRIu32"]",
nelem, req->token->nalloc);
return PARSE_EINVALID;
}

/* parse elements */
while (nelem > 0) {
el = array_push(req->token);
status = parse_element(el, buf);
if (status != PARSE_OK) {
log_verb("parse element returned status %d", status);
request_reset(req);
buf->rpos = old_rpos;
return status;
}
nelem--;
}

status = _parse_cmd(req);
if (status != PARSE_OK) {
buf->rpos = old_rpos;
return status;
}

return PARSE_OK;
}

parse_rstatus_t
parse_rsp(struct response *rsp, struct buf *buf)
{
parse_rstatus_t status = PARSE_OK;
char *old_rpos = buf->rpos;
int64_t nelem = 1;
struct element *el;

ASSERT(rsp->type == ELEM_UNKNOWN);

log_verb("parsing buf %p into rsp %p", buf, rsp);

if (token_is_array(buf)) {
status = token_array_nelem(&nelem, buf);
if (status != PARSE_OK) {
buf->rpos = old_rpos;
return status;
}
rsp->type = ELEM_ARRAY;
if (nelem > rsp->token->nalloc) {
log_debug("parse rsp: invalid # of elements, %d > %"PRIu32, nelem,
rsp->token->nalloc);
return PARSE_EOVERSIZE;
}
if (nelem < 0) {
rsp->nil = true;
return PARSE_OK;
}
}

/* parse elements */
while (nelem > 0) {
el = array_push(rsp->token);
status = parse_element(el, buf);
if (status != PARSE_OK) {
log_verb("parse element returned status %d", status);
response_reset(rsp);
buf->rpos = old_rpos;
return status;
}
if (rsp->type == ELEM_UNKNOWN) {
rsp->type = el->type;
}
nelem--;
}

return PARSE_OK;
}
56 changes: 56 additions & 0 deletions src/protocol/data/redis/parse.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include "request.h"
#include "response.h"

#include <buffer/cc_buf.h>
#include <cc_define.h>
#include <cc_metric.h>

#include <stdint.h>

/* Note(yao): the prefix cmd_ is mostly to be compatible with Twemcache metric
* names.
* On the other hand, the choice of putting request in front of parse instead of
* the other way around in `request_parse' is to allow users to easily query all
* metrics related to requests , similar for responses.
*/
/* name type description */
#define PARSE_REQ_METRIC(ACTION) \
ACTION( request_parse, METRIC_COUNTER, "# requests parsed" )\
ACTION( request_parse_ex, METRIC_COUNTER, "# parsing error" )

/* name type description */
#define PARSE_RSP_METRIC(ACTION) \
ACTION( response_parse, METRIC_COUNTER, "# responses parsed" )\
ACTION( response_parse_ex, METRIC_COUNTER, "# rsp parsing error" )\

typedef struct {
PARSE_REQ_METRIC(METRIC_DECLARE)
} parse_req_metrics_st;

typedef struct {
PARSE_RSP_METRIC(METRIC_DECLARE)
} parse_rsp_metrics_st;

typedef enum parse_rstatus {
PARSE_OK = 0,
PARSE_EUNFIN = -1,
PARSE_EEMPTY = -2,
PARSE_EOVERSIZE = -3,
PARSE_EINVALID = -4,
PARSE_EOTHER = -5,
} parse_rstatus_t;

void parse_setup(parse_req_metrics_st *req, parse_rsp_metrics_st *rsp);
void parse_teardown(void);

static inline bool
key_valid(struct bstring *key)
{
return (key->len > 0 && key->len <= KEY_MAXLEN);
}

parse_rstatus_t parse_req(struct request *req, struct buf *buf);

parse_rstatus_t parse_rsp(struct response *rsp, struct buf *buf);
17 changes: 17 additions & 0 deletions src/protocol/data/redis/process.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

struct request;
struct response;

/**
* Responses can be chained, using the same field that supports pooling. It is
* the responsibility of the caller to provide enough response structs if more
* than one response is necessary- e.g. get/gets commands with batching, or
* the stats command.
*
* Since response pool is not thread-safe, it is very important not trying to
* use the same response pool from more than one thread, including the helper
* thread(s). When the need arises for that, we will need to support resource
* pool(s) that are either thread-local or identifiable instead of static ones.
*/
void process_request(struct response *rsp, struct request *req);
214 changes: 214 additions & 0 deletions src/protocol/data/redis/request.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#include "request.h"

#include "token.h"

#include <cc_debug.h>
#include <cc_mm.h>
#include <cc_pool.h>

#define REQUEST_MODULE_NAME "protocol::redis::request"

static bool request_init = false;
static request_metrics_st *request_metrics = NULL;

struct command command_table[REQ_SENTINEL];
#define CMD_INIT(_type, _str, _narg) \
{ .type = _type, .bstr = { sizeof(_str) - 1, (_str) }, .narg = _narg },
struct command command_table[REQ_SENTINEL] = {
{ .type = REQ_UNKNOWN, .bstr = { 0, NULL }, .narg = 0 },
REQ_HASH(CMD_INIT)
REQ_ZSET(CMD_INIT)
REQ_MISC(CMD_INIT)
};
#undef CMD_INIT

static size_t ntoken = REQ_NTOKEN;
FREEPOOL(req_pool, reqq, request);
static struct req_pool reqp;
static bool reqp_init = false;

void
request_reset(struct request *req)
{
ASSERT(req != NULL);

STAILQ_NEXT(req, next) = NULL;
req->free = false;

req->noreply = 0;
req->serror = 0;
req->cerror = 0;

req->type = REQ_UNKNOWN;
req->token->nelem = 0;
}

struct request *
request_create(void)
{
rstatus_i status;
struct request *req = cc_alloc(sizeof(struct request));

if (req == NULL) {
return NULL;
}

status = array_create(&req->token, ntoken, sizeof(struct element));
if (status != CC_OK) {
cc_free(req);
return NULL;
}
request_reset(req);

INCR(request_metrics, request_create);
INCR(request_metrics, request_curr);

return req;
}

static struct request *
_request_create(void)
{
struct request *req = request_create();

if (req != NULL) {
INCR(request_metrics, request_free);
}

return req;
}

void
request_destroy(struct request **request)
{
struct request *req = *request;
ASSERT(req != NULL);

INCR(request_metrics, request_destroy);
DECR(request_metrics, request_curr);
array_destroy(&req->token);
cc_free(req);
*request = NULL;
}

static void
_request_destroy(struct request **request)
{
request_destroy(request);
DECR(request_metrics, request_free);
}

static void
request_pool_destroy(void)
{
struct request *req, *treq;

if (!reqp_init) {
log_warn("request pool was never created, ignore");
}

log_info("destroying request pool: free %"PRIu32, reqp.nfree);

FREEPOOL_DESTROY(req, treq, &reqp, next, _request_destroy);
reqp_init = false;
}

static void
request_pool_create(uint32_t max)
{
struct request *req;

if (reqp_init) {
log_warn("request pool has already been created, re-creating");

request_pool_destroy();
}

log_info("creating request pool: max %"PRIu32, max);

FREEPOOL_CREATE(&reqp, max);
reqp_init = true;

FREEPOOL_PREALLOC(req, &reqp, max, next, _request_create);
if (reqp.nfree < max) {
log_crit("cannot preallocate request pool, OOM. abort");
exit(EXIT_FAILURE);
}
}

struct request *
request_borrow(void)
{
struct request *req;

FREEPOOL_BORROW(req, &reqp, next, _request_create);
if (req == NULL) {
log_debug("borrow req failed: OOM %d");

return NULL;
}
request_reset(req);

DECR(request_metrics, request_free);
INCR(request_metrics, request_borrow);
log_vverb("borrowing req %p", req);

return req;
}

void
request_return(struct request **request)
{
struct request *req = *request;

if (req == NULL) {
return;
}

INCR(request_metrics, request_free);
INCR(request_metrics, request_return);
log_vverb("return req %p", req);

req->free = true;
FREEPOOL_RETURN(req, &reqp, next);

*request = NULL;
}

void
request_setup(request_options_st *options, request_metrics_st *metrics)
{
uint32_t max = REQ_POOLSIZE;

log_info("set up the %s module", REQUEST_MODULE_NAME);

if (request_init) {
log_warn("%s has already been setup, overwrite", REQUEST_MODULE_NAME);
}

request_metrics = metrics;

if (options != NULL) {
ntoken = option_uint(&options->request_ntoken);
max = option_uint(&options->request_poolsize);
}
request_pool_create(max);

request_init = true;
}

void
request_teardown(void)
{
log_info("tear down the %s module", REQUEST_MODULE_NAME);

if (!request_init) {
log_warn("%s has never been setup", REQUEST_MODULE_NAME);
}

ntoken = REQ_NTOKEN;
request_pool_destroy();
request_metrics = NULL;

request_init = false;
}
79 changes: 79 additions & 0 deletions src/protocol/data/redis/request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#pragma once

#include "cmd_hash.h"
#include "cmd_misc.h"
#include "cmd_zset.h"

#include <cc_array.h>
#include <cc_define.h>
#include <cc_metric.h>
#include <cc_option.h>
#include <cc_queue.h>

#include <inttypes.h>

#define REQ_NTOKEN 127 /* # tokens in a command */
#define KEY_MAXLEN 255
#define REQ_POOLSIZE 0

/* name type default description */
#define REQUEST_OPTION(ACTION) \
ACTION( request_ntoken, OPTION_TYPE_UINT, REQ_NTOKEN, "# tokens in request")\
ACTION( request_poolsize, OPTION_TYPE_UINT, REQ_POOLSIZE, "request pool size")

typedef struct {
REQUEST_OPTION(OPTION_DECLARE)
} request_options_st;

/* name type description */
#define REQUEST_METRIC(ACTION) \
ACTION( request_curr, METRIC_GAUGE, "# req created" )\
ACTION( request_free, METRIC_GAUGE, "# free req in pool" )\
ACTION( request_borrow, METRIC_COUNTER, "# reqs borrowed" )\
ACTION( request_return, METRIC_COUNTER, "# reqs returned" )\
ACTION( request_create, METRIC_COUNTER, "# reqs created" )\
ACTION( request_destroy, METRIC_COUNTER, "# reqs destroyed" )

typedef struct {
REQUEST_METRIC(METRIC_DECLARE)
} request_metrics_st;

#define GET_TYPE(_type, _str, narg) _type,
typedef enum cmd_type {
REQ_UNKNOWN,
REQ_HASH(GET_TYPE)
REQ_ZSET(GET_TYPE)
REQ_MISC(GET_TYPE)
REQ_SENTINEL
} cmd_type_e;
#undef GET_TYPE

struct command {
cmd_type_e type;
struct bstring bstr;
int32_t narg;
};

extern struct command command_table[REQ_SENTINEL];

struct request {
STAILQ_ENTRY(request) next; /* allow request pooling/chaining */
bool free;

bool noreply; /* skip response */
bool serror; /* server error */
bool cerror; /* client error */

cmd_type_e type;
struct array *token; /* array elements are tokens */
};

void request_setup(request_options_st *options, request_metrics_st *metrics);
void request_teardown(void);

struct request *request_create(void);
void request_destroy(struct request **req);
void request_reset(struct request *req);

struct request *request_borrow(void);
void request_return(struct request **req);
206 changes: 206 additions & 0 deletions src/protocol/data/redis/response.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#include "response.h"

#include "token.h"

#include <cc_debug.h>
#include <cc_mm.h>
#include <cc_pool.h>

#define RESPONSE_MODULE_NAME "protocol::redis::response"

static bool response_init = false;
static response_metrics_st *response_metrics = NULL;

static size_t ntoken = RSP_NTOKEN;
FREEPOOL(rsp_pool, rspq, response);
static struct rsp_pool rspp;
static bool rspp_init = false;

void
response_reset(struct response *rsp)
{
ASSERT(rsp != NULL);

STAILQ_NEXT(rsp, next) = NULL;
rsp->free = false;

rsp->type = ELEM_UNKNOWN;
rsp->nil = false;
rsp->token->nelem = 0;
}

struct response *
response_create(void)
{
rstatus_i status;
struct response *rsp = cc_alloc(sizeof(struct response));

if (rsp == NULL) {
return NULL;
}

status = array_create(&rsp->token, ntoken, sizeof(struct element));
if (status != CC_OK) {
cc_free(rsp);
return NULL;
}
response_reset(rsp);

INCR(response_metrics, response_create);
INCR(response_metrics, response_curr);

return rsp;
}

static struct response *
_response_create(void)
{
struct response *rsp = response_create();

if (rsp != NULL) {
INCR(response_metrics, response_free);
}

return rsp;
}

void
response_destroy(struct response **response)
{
struct response *rsp = *response;
ASSERT(rsp != NULL);

INCR(response_metrics, response_destroy);
DECR(response_metrics, response_curr);
array_destroy(&rsp->token);
cc_free(rsp);
*response = NULL;
}

static void
_response_destroy(struct response **response)
{
response_destroy(response);
DECR(response_metrics, response_free);
}

static void
response_pool_destroy(void)
{
struct response *rsp, *trsp;

if (rspp_init) {
log_info("destroying response pool: free %"PRIu32, rspp.nfree);

FREEPOOL_DESTROY(rsp, trsp, &rspp, next, _response_destroy);
rspp_init = false;
} else {
log_warn("response pool was never created, ignore");
}
}

static void
response_pool_create(uint32_t max)
{
struct response *rsp;

if (rspp_init) {
log_warn("response pool has already been created, re-creating");

response_pool_destroy();
}

log_info("creating response pool: max %"PRIu32, max);

FREEPOOL_CREATE(&rspp, max);
rspp_init = true;

FREEPOOL_PREALLOC(rsp, &rspp, max, next, _response_create);
if (rspp.nfree < max) {
log_crit("cannot preallocate response pool, OOM. abort");
exit(EXIT_FAILURE);
}
}

struct response *
response_borrow(void)
{
struct response *rsp;

FREEPOOL_BORROW(rsp, &rspp, next, _response_create);
if (rsp == NULL) {
log_debug("borrow rsp failed: OOM %d");

return NULL;
}
response_reset(rsp);

DECR(response_metrics, response_free);
INCR(response_metrics, response_borrow);
log_vverb("borrowing rsp %p", rsp);

return rsp;
}

/*
* Return a single response object
*/
void
response_return(struct response **response)
{
ASSERT(response != NULL);

struct response *rsp = *response;

if (rsp == NULL) {
return;
}

INCR(response_metrics, response_free);
INCR(response_metrics, response_return);
log_vverb("return rsp %p", rsp);

rsp->free = true;
FREEPOOL_RETURN(rsp, &rspp, next);

*response = NULL;
}

void
response_setup(response_options_st *options, response_metrics_st *metrics)
{
uint32_t max = RSP_POOLSIZE;

log_info("set up the %s module", RESPONSE_MODULE_NAME);

if (response_init) {
log_warn("%s has already been setup, overwrite", RESPONSE_MODULE_NAME);
}

response_metrics = metrics;

if (options != NULL) {
ntoken = option_uint(&options->response_ntoken);
max = option_uint(&options->response_poolsize);
}

response_pool_create(max);

response_init = true;
}

void
response_teardown(void)
{
log_info("tear down the %s module", RESPONSE_MODULE_NAME);

if (!response_init) {
log_warn("%s has never been setup", RESPONSE_MODULE_NAME);
}

ntoken = RSP_NTOKEN;
response_pool_destroy();
response_metrics = NULL;

response_init = false;
}
66 changes: 66 additions & 0 deletions src/protocol/data/redis/response.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#pragma once

#include <cc_array.h>
#include <cc_bstring.h>
#include <cc_define.h>
#include <cc_metric.h>
#include <cc_option.h>
#include <cc_queue.h>
#include <cc_util.h>

#define RSP_NTOKEN 255 /* # tokens in a command */
#define RSP_POOLSIZE 0

/* name type default description */
#define RESPONSE_OPTION(ACTION) \
ACTION( response_ntoken, OPTION_TYPE_UINT, RSP_NTOKEN, "# tokens in response" )\
ACTION( response_poolsize, OPTION_TYPE_UINT, RSP_POOLSIZE, "response pool size" )

typedef struct {
RESPONSE_OPTION(OPTION_DECLARE)
} response_options_st;

/* name type description */
#define RESPONSE_METRIC(ACTION) \
ACTION( response_curr, METRIC_GAUGE, "# rsp created" )\
ACTION( response_free, METRIC_GAUGE, "# free rsp in pool" )\
ACTION( response_borrow, METRIC_COUNTER, "# rsps borrowed" )\
ACTION( response_return, METRIC_COUNTER, "# rsps returned" )\
ACTION( response_create, METRIC_COUNTER, "# rsps created" )\
ACTION( response_destroy, METRIC_COUNTER, "# rsps destroyed" )

typedef struct {
RESPONSE_METRIC(METRIC_DECLARE)
} response_metrics_st;

/**
* Note: there are some semi special values here:
* - a dummy entry RSP_UNKNOWN so we can use it as the initial type value;
* - a RSP_NUMERIC type that doesn't have a corresponding message body.
*/
#define RSP_STR_OK "+OK\r\n"

/*
* NOTE(yao): we store fields as location in rbuf, this assumes the data will
* not be overwritten prematurely.
* Whether this is a reasonable design decision eventually remains to be seen.
*/

struct response {
STAILQ_ENTRY(response) next; /* allow response pooling/chaining */
bool free;

int type;
bool nil;
struct array *token; /* array elements are tokens */
};

void response_setup(response_options_st *options, response_metrics_st *metrics);
void response_teardown(void);

struct response *response_create(void);
void response_destroy(struct response **rsp);
void response_reset(struct response *rsp);

struct response *response_borrow(void);
void response_return(struct response **rsp);
340 changes: 340 additions & 0 deletions src/protocol/data/redis/token.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
#include "token.h"

#include "request.h"
#include "response.h"

#include <cc_define.h>
#include <cc_print.h>
#include <cc_util.h>

#include <ctype.h>

#define STR_MAXLEN 255 /* max length for simple string or error */
#define BULK_MAXLEN (512 * MiB)
#define ARRAY_MAXLEN (64 * MiB)

#define NIL_STR "$-1\r\n"


static inline compose_rstatus_t
_check_buf_size(struct buf **buf, uint32_t n)
{
while (n > buf_wsize(*buf)) {
if (dbuf_double(buf) != CC_OK) {
log_debug("failed to write %u bytes to buf %p: insufficient "
"buffer space", n, *buf);

return COMPOSE_ENOMEM;
}
}

return COMPOSE_OK;
}


static parse_rstatus_t
_read_str(struct bstring *str, struct buf *buf)
{
/*
* Note: buf->rpos is updated in this function, the caller is responsible
* for resetting the pointer if necessary.
*/

str->len = 0;
str->data = buf->rpos;
/*
* Note: according to @antirez, simple strings are not supposed to be empty.
* However, there's no particular harm allowing a null simple string, so
* we allow it in this function
*/
for (; buf->rpos < buf->wpos; buf->rpos++) {
if (line_end(buf)) {
buf->rpos += CRLF_LEN;
log_vverb("simple string detected at %p, length %"PRIu32, str->len);

return PARSE_OK;
}
if (++str->len > STR_MAXLEN) {
log_warn("simple string max length (%d) exceeded", STR_MAXLEN);

return PARSE_EOVERSIZE;
}
}

return PARSE_EUNFIN;
}


static parse_rstatus_t
_read_int(int64_t *num, struct buf *buf, int64_t min, int64_t max)
{
/*
* Note: buf->rpos is updated in this function, the caller is responsible
* for resetting the pointer if necessary.
*/
size_t len = 0;
int64_t sign = 1;

if (*buf->rpos == '-') {
sign = -1;
buf->rpos++;
}

*num = 0;
for (; buf_rsize(buf) > 0; buf->rpos++) {
if (isdigit(*buf->rpos)) {
if (*num < min / 10 || *num > max / 10) {
/* TODO(yao): catch the few numbers that will still overflow */
log_warn("ill formatted token: integer out of bounds");

return PARSE_EOVERSIZE;
}

len++;
*num = *num * 10ULL + sign * (*buf->rpos - '0');
} else {
if (len == 0 || *buf->rpos != CR) {
log_warn("invalid character encountered: %c", *buf->rpos);

return PARSE_EINVALID;
}
if (line_end(buf)) {
buf->rpos += CRLF_LEN;
log_vverb("parsed integer, value %"PRIi64, *num);

return PARSE_OK;
} else {
return PARSE_EUNFIN;
}
}
}

return PARSE_EUNFIN;
}

static parse_rstatus_t
_read_bulk(struct bstring *str, struct buf *buf)
{
parse_rstatus_t status;
int64_t len;

bstring_init(str);
status = _read_int(&len, buf, -1, BULK_MAXLEN);
if (status != PARSE_OK) {
return status;
}
if (len < 0) {
log_vverb("null bulk string detected at %p", buf->rpos);

return PARSE_EEMPTY;
}

if (buf_rsize(buf) >= len + CRLF_LEN) {
/* have enough bytes for the whole payload plus CRLF */
str->len = len;
str->data = buf->rpos;
buf->rpos += str->len;

if (line_end(buf)) {
buf->rpos += CRLF_LEN;
log_vverb("bulk string detected at %p, length %"PRIu32, buf->rpos,
len);

return PARSE_OK;
} else {
if (*buf->rpos == CR) {
return PARSE_EUNFIN;
}

log_warn("invalid character encountered, expecting CRLF: %c%c",
*buf->rpos, *(buf->rpos + 1));

return PARSE_EINVALID;
}
}

return PARSE_EUNFIN;
}

static inline int
_write_int(struct buf *buf, int64_t val)
{
size_t n = 0;

n = cc_print_int64_unsafe(buf->wpos, val);
buf->wpos += n;

buf_write(buf, CRLF, CRLF_LEN);

return (n + CRLF_LEN);
}

static inline int
_write_bstr(struct buf *buf, struct bstring *bstr)
{
buf_write(buf, bstr->data, bstr->len);
buf_write(buf, CRLF, CRLF_LEN);

return (bstr->len + CRLF_LEN);
}


bool
token_is_array(struct buf *buf)
{
ASSERT(buf != NULL);

return *(buf->rpos) == '*';
}

parse_rstatus_t
token_array_nelem(int64_t *nelem, struct buf *buf)
{
ASSERT(nelem != NULL && buf != NULL);
ASSERT(token_is_array(buf));

buf->rpos++;
return _read_int(nelem, buf, -1, ARRAY_MAXLEN);
}


/* this function does not handle array, which is a composite type */
parse_rstatus_t
parse_element(struct element *el, struct buf *buf)
{
char *p;
parse_rstatus_t status;

ASSERT(buf_rsize(buf) > 0);

log_verb("detecting the next element %p in buf %p", el, buf);

p = buf->rpos++;
switch (*p) {
case '+':
/* simple string */
el->type = ELEM_STR;
status = _read_str(&el->bstr, buf);
break;

case '-':
/* error */
el->type = ELEM_ERR;
status = _read_str(&el->bstr, buf);
break;

case ':':
/* integer */
el->type = ELEM_INT;
status = _read_int(&el->num, buf, INT64_MIN, INT64_MAX);
break;

case '$':
/* bulk string */
el->type = ELEM_BULK;
status = _read_bulk(&el->bstr, buf);
if (status == PARSE_EEMPTY) {
status = PARSE_OK;
el->type = ELEM_NIL;
}
break;

default:
return PARSE_EINVALID;
}

if (status != PARSE_OK) { /* rewind */
buf->rpos = p;
}

return status;
}


int
compose_array_header(struct buf **buf, int nelem)
{
struct buf *b;
size_t n = 1 + CRLF_LEN + CC_INT64_MAXLEN;

if (_check_buf_size(buf, n) != COMPOSE_OK) {
return COMPOSE_ENOMEM;
}

b = *buf;
*b->wpos++ = '*';
return (1 + _write_int(b, nelem));
}

/* this function does not handle array, which is a composite type */
int
compose_element(struct buf **buf, struct element *el)
{
size_t n = 1 + CRLF_LEN;
struct buf *b;

ASSERT(el->type > 0);

/* estimate size (overestimate space needed for integers (int, bulk)) */
switch (el->type) {
case ELEM_STR:
case ELEM_ERR:
n += el->bstr.len;
break;

case ELEM_INT:
n += CC_INT64_MAXLEN;
break;

case ELEM_BULK:
n += el->bstr.len + CC_INT64_MAXLEN + CRLF_LEN;
break;

case ELEM_NIL:
n += 2; /* "-1" */
break;

default:
return COMPOSE_EINVALID;
}

if (_check_buf_size(buf, n) != COMPOSE_OK) {
return COMPOSE_ENOMEM;
}

b = *buf;
log_verb("write element %p in buf %p", el, b);

switch (el->type) {
case ELEM_STR:
n = buf_write(b, "+", 1);
n += _write_bstr(b, &el->bstr);
break;

case ELEM_ERR:
n = buf_write(b, "-", 1);
n += _write_bstr(b, &el->bstr);
break;

case ELEM_INT:
n = buf_write(b, ":", 1);
n += _write_int(b, el->num);
break;

case ELEM_BULK:
n = buf_write(b, "$", 1);
n += _write_int(b, el->bstr.len);
n += _write_bstr(b, &el->bstr);
break;

case ELEM_NIL:
n = sizeof(NIL_STR) - 1;
buf_write(b, NIL_STR, n);
break;

default:
NOT_REACHED();
}

return n;
}
96 changes: 96 additions & 0 deletions src/protocol/data/redis/token.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once

/*
* this file handles the serialization / desrialization formats used by Redis:
* - RESP (REdis Serialization Protocol)
* - Simple format (not implemented yet)
* - Cap'n'proto (not implemented yet)
*/

/**
* functions that deal with tokens in RESP (REdis Serialization Protocol).
* RESP is text-based protocol that uses special characters and prefixed-length
* to achieve high-performance parsing.
*
* RESP has the following guidelines for requests/responses:
* - Clients send commands to a Redis server as a RESP Array of Bulk Strings.
* - The server replies with one of the RESP types according to the command
* implementation.
*
* Different types have different leading character
* - For Simple Strings the first byte of the reply is "+"
* - For Errors the first byte of the reply is "-"
* - For Integers the first byte of the reply is ":"
* - For Bulk Strings the first byte of the reply is "$"
* - For Arrays the first byte of the reply is "*"
*
* Note:
* - In RESP, tokens of each type are always terminated with "\r\n" (CRLF).
* - There are multiple ways of representing Null values:
* + Null Bulk String: "$-1\r\n"
* + Null Array: "*-1\r\n"
*/

/**
* It makes sense to always parse Simple Strings, Errors, and Integers in
* full. However, for Bulk Strings and Arrays, it is possible that they
* will be big enough that we cannot always expect the full content to be
* received at once, and hence it makes sense to allow partial parsing.
*
* For Bulk Strings, there are always two tokens, 1) the length; and 2) the
* string content. Since the content can be quite large, we should remember
* how many bytes have been received and how many more to expect.
*
* Array is a composite type, where individual elements can be any of the other
* type, and different types can mix in a single array. So to parse an array,
* we need to handle both a subset of all elements and incompleteness of the
* last element.
*/

#include "parse.h"
#include "compose.h"

#include <buffer/cc_buf.h>
#include <cc_bstring.h>
#include <cc_util.h>

/* array is not a basic element type */
typedef enum element_type {
ELEM_UNKNOWN = 0,
ELEM_STR = 1,
ELEM_ERR = 2,
ELEM_INT = 3,
ELEM_BULK = 4,
ELEM_ARRAY = 5,
ELEM_NIL = 6,
} element_type_e;

struct element {
element_type_e type;
union {
struct bstring bstr;
int64_t num;
};
};

static inline bool
is_crlf(struct buf *buf)
{
ASSERT(buf_rsize(buf) >= CRLF_LEN);

return (*buf->rpos == CR && *(buf->rpos + 1) == LF);
}


static inline bool
line_end(struct buf *buf)
{
return (buf_rsize(buf) >= CRLF_LEN && is_crlf(buf));
}

bool token_is_array(struct buf *buf);
parse_rstatus_t token_array_nelem(int64_t *nelem, struct buf *buf);
parse_rstatus_t parse_element(struct element *el, struct buf *buf);

int compose_array_header(struct buf **buf, int nelem);
int compose_element(struct buf **buf, struct element *el);
6 changes: 6 additions & 0 deletions src/protocol/data/redis_include.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include "redis/compose.h"
#include "redis/token.h"
#include "redis/parse.h"
#include "redis/process.h"
#include "redis/request.h"
#include "redis/response.h"
6 changes: 4 additions & 2 deletions src/storage/cuckoo/cuckoo.c
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

#include <cc_debug.h>
#include <cc_define.h>
#include <cc_lookup3.h>
#include <hash/cc_murmur3.h>
#include <cc_mm.h>

/* TODO(yao): make D and iv[] configurable */
@@ -82,9 +82,11 @@ static void
cuckoo_hash(uint32_t offset[], struct bstring *key)
{
int i;
uint32_t hv;

for (i = 0; i < D; ++i) {
offset[i] = hashlittle(key->data, key->len, iv[i]) % max_nitem;
hash_murmur3_32(key->data, key->len, iv[i], &hv);
offset[i] = hv % max_nitem;
}

return;
4 changes: 2 additions & 2 deletions src/storage/slab/hashtable.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "hashtable.h"

#include <cc_hash.h>
#include <hash/cc_lookup3.h>
#include <cc_mm.h>

/*
@@ -65,7 +65,7 @@ hashtable_destroy(struct hash_table *ht)
static struct item_slh *
_get_bucket(const char *key, size_t klen, struct hash_table *ht)
{
return &(ht->table[hash(key, klen, 0) & HASHMASK(ht->hash_power)]);
return &(ht->table[hash_lookup3(key, klen, 0) & HASHMASK(ht->hash_power)]);
}

void
1 change: 1 addition & 0 deletions test/protocol/data/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
add_subdirectory(memcache)
add_subdirectory(redis)
11 changes: 11 additions & 0 deletions test/protocol/data/redis/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set(suite redis)
set(test_name check_${suite})

set(source check_${suite}.c)

add_executable(${test_name} ${source})
target_link_libraries(${test_name} protocol_${suite})
target_link_libraries(${test_name} ccommon-static ${CHECK_LIBRARIES})

add_dependencies(check ${test_name})
add_test(${test_name} ${test_name})
582 changes: 582 additions & 0 deletions test/protocol/data/redis/check_redis.c

Large diffs are not rendered by default.

0 comments on commit 8a7e319

Please sign in to comment.