Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions src/commands/cmd_cuckoo_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
#include "types/redis_cuckoo_chain.h"

namespace redis {

class CommandCFReserve : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.RESERVE key capacity [BUCKETSIZE bs] [MAXITERATIONS mi] [EXPANSION ex]
if (args.size() < 3) {
return {Status::RedisParseErr, "wrong number of arguments"};
}

// Parse capacity (required)
auto parse_capacity = ParseInt<uint64_t>(args[2], 10);
if (!parse_capacity) {
return {Status::RedisParseErr, "invalid capacity"};
}
capacity_ = *parse_capacity;
if (capacity_ <= 0) {
return {Status::RedisParseErr, "capacity must be larger than 0"};
}

// Parse optional parameters
CommandParser parser(args, 3);
while (parser.Good()) {
if (parser.EatEqICase("BUCKETSIZE")) {
auto parse_bucket_size = parser.TakeInt<uint8_t>();
if (!parse_bucket_size.IsOK()) {
return {Status::RedisParseErr, "invalid bucket size"};
}
bucket_size_ = parse_bucket_size.GetValue();
if (bucket_size_ == 0 || bucket_size_ > 255) {
return {Status::RedisParseErr, "bucket size must be between 1 and 255"};
}
} else if (parser.EatEqICase("MAXITERATIONS")) {
auto parse_max_iterations = parser.TakeInt<uint16_t>();
if (!parse_max_iterations.IsOK()) {
return {Status::RedisParseErr, "invalid max iterations"};
}
max_iterations_ = parse_max_iterations.GetValue();
if (max_iterations_ == 0) {
return {Status::RedisParseErr, "max iterations must be larger than 0"};
}
} else if (parser.EatEqICase("EXPANSION")) {
auto parse_expansion = parser.TakeInt<uint8_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, "invalid expansion factor"};
}
expansion_ = parse_expansion.GetValue();
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
auto s = cuckoo_db.Reserve(ctx, args_[1], capacity_, bucket_size_, max_iterations_, expansion_);

if (!s.ok()) {
if (s.IsInvalidArgument()) {
// Return error message to client
return {Status::RedisExecErr, s.ToString()};
}
return {Status::RedisExecErr, "failed to create cuckoo filter"};
}

*output = redis::SimpleString("OK");
return Status::OK();
}

private:
uint64_t capacity_ = kCFDefaultCapacity;
uint8_t bucket_size_ = kCFDefaultBucketSize;
uint16_t max_iterations_ = kCFDefaultMaxIterations;
uint8_t expansion_ = kCFDefaultExpansion;
};

// Register the CF.RESERVE command
REDIS_REGISTER_COMMANDS(CuckooFilter,
MakeCmdAttr<CommandCFReserve>("cf.reserve", -3, "write", 1, 1, 1))

} // namespace redis
1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ enum class CommandCategory : uint8_t {
Bit,
BloomFilter,
Cluster,
CuckooFilter,
Function,
Geo,
Hash,
Expand Down
44 changes: 44 additions & 0 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,47 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {

return rocksdb::Status::OK();
}


void CuckooChainMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);

PutFixed16(dst, n_filters);
PutFixed16(dst, expansion);
PutFixed64(dst, base_capacity);
PutFixed8(dst, bucket_size);
PutFixed16(dst, max_iterations);
PutFixed64(dst, num_deleted_items);
}

rocksdb::Status CuckooChainMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}

if (input->size() < 21) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

GetFixed16(input, &n_filters);
GetFixed16(input, &expansion);
GetFixed64(input, &base_capacity);
GetFixed8(input, &bucket_size);
GetFixed16(input, &max_iterations);
GetFixed64(input, &num_deleted_items);

return rocksdb::Status::OK();
}

uint64_t CuckooChainMetadata::GetTotalCapacity() const {
if (expansion == 0 || n_filters == 1) {
return base_capacity;
}

// Calculate total capacity across all filters
uint64_t total = 0;
for (uint16_t i = 0; i < n_filters; i++) {
total += base_capacity * std::pow(expansion, i);
}
return total;
}
37 changes: 36 additions & 1 deletion src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ enum RedisType : uint8_t {
kRedisHyperLogLog = 11,
kRedisTDigest = 12,
kRedisTimeSeries = 13,
kRedisCuckooFilter = 14,
kRedisTypeMax
};

inline constexpr const std::array<std::string_view, kRedisTypeMax> RedisTypeNames = {
"none", "string", "hash", "list", "set", "zset", "bitmap",
"sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries"};
"sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries", "cuckoofilter"};

struct RedisTypes {
RedisTypes(std::initializer_list<RedisType> list) {
Expand Down Expand Up @@ -306,6 +307,40 @@ class BloomChainMetadata : public Metadata {
bool IsScaling() const { return expansion != 0; };
};

class CuckooChainMetadata : public Metadata {
public:
/// The number of sub-filters in the chain
uint16_t n_filters;

/// Expansion factor for new filters
/// When a filter is full, a new one is created with capacity = base_capacity * expansion^n
uint16_t expansion;

/// The capacity of the first filter
uint64_t base_capacity;

/// Number of fingerprints per bucket
uint8_t bucket_size;

/// Maximum number of cuckoo kicks before considering filter full
uint16_t max_iterations;

/// Track number of deleted items for maintenance
uint64_t num_deleted_items;

explicit CuckooChainMetadata(bool generate_version = true)
: Metadata(kRedisCuckooFilter, generate_version),
n_filters(0), expansion(0), base_capacity(0),
bucket_size(0), max_iterations(0), num_deleted_items(0) {}

void Encode(std::string *dst) const override;
using Metadata::Decode;
rocksdb::Status Decode(Slice *input) override;

uint64_t GetTotalCapacity() const;
bool IsScaling() const { return expansion > 0; }
};

enum class JsonStorageFormat : uint8_t {
JSON = 0,
CBOR = 1,
Expand Down
88 changes: 88 additions & 0 deletions src/types/cuckoo_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include <cstdint>
#include <string>
#include <utility>
#include <vector>

#include "vendor/murmurhash2.h"

namespace redis {

// Cuckoo filter implementation from the paper:
// "Cuckoo Filter: Practically Better Than Bloom" by Fan et al.
// This is a bucket-based storage implementation where each bucket is stored
// as an independent key-value pair in RocksDB
//
// Hash calculation follows RedisBloom's design:
// - fp = hash % 255 + 1 (fingerprint, non-zero, range: 1-255)
// - h1 = hash (primary hash)
// - h2 = h1 ^ (fp * 0x5bd1e995) (alternate hash via XOR)
// - bucket_index = hash % num_buckets (only apply modulo when indexing)
class CuckooFilter {
public:
// Calculate the optimal number of buckets for the filter
static uint32_t OptimalNumBuckets(uint64_t capacity, uint8_t bucket_size) {
// A load factor of 95.5% is chosen for the cuckoo filter
uint32_t num_buckets = static_cast<uint32_t>(capacity / bucket_size / 0.955);
// Round up to next power of 2 for better hash distribution
if (num_buckets == 0) num_buckets = 1;
uint32_t power = 1;
while (power < num_buckets) power <<= 1;
return power;
}

// Generate fingerprint from hash (8-bit fingerprint, non-zero, range: 1-255)
// Following RedisBloom: fp = hash % 255 + 1
static uint8_t GenerateFingerprint(uint64_t hash) {
return static_cast<uint8_t>(hash % 255 + 1);
}

// Calculate alternate hash using XOR (following RedisBloom)
// h2 = h1 ^ (fp * 0x5bd1e995)
// This preserves symmetry: GetAltHash(fp, GetAltHash(fp, h)) == h
static uint64_t GetAltHash(uint8_t fingerprint, uint64_t hash) {
return hash ^ (static_cast<uint64_t>(fingerprint) * 0x5bd1e995);
}

// Legacy function for backward compatibility with tests
// Converts bucket index to hash, applies GetAltHash, then converts back to bucket index
static uint32_t GetAltBucketIndex(uint32_t bucket_idx, uint8_t fingerprint, uint32_t num_buckets) {
// Treat bucket_idx as a hash value for the calculation
uint64_t hash = bucket_idx;
uint64_t alt_hash = GetAltHash(fingerprint, hash);
// Convert back to bucket index
return static_cast<uint32_t>(alt_hash % num_buckets);
}

// Compute hash for a given item using MurmurHash2 (compatible with RedisBloom)
// This is the entry point for hashing items before they are inserted/checked in the filter
static uint64_t Hash(const char* data, size_t length) {
return HllMurMurHash64A(data, static_cast<int>(length), 0);
}

// Convenience overload for std::string
static uint64_t Hash(const std::string& item) { return Hash(item.data(), item.size()); }
};

} // namespace redis
Loading
Loading