Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ json-storage-format json
# Default: no
txn-context-enabled no

# Whether to enable hash field expiration feature.
# NOTE: This option only affects newly hash object
# Default: no
hash-field-expiration no

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
244 changes: 243 additions & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "error_constants.h"
#include "scan_base.h"
#include "server/server.h"
#include "time_util.h"
#include "types/redis_hash.h"

namespace redis {
Expand Down Expand Up @@ -444,6 +445,238 @@ class CommandHRandField : public Commander {
bool no_parameters_ = true;
};

class CommandFieldExpireBase : public Commander {
protected:
Status commonParse(const std::vector<std::string> &args, int start_idx) {
CommandParser parser(args, start_idx);
std::string_view expire_flag, num_flag;
uint64_t fields_num = 0;
while (parser.Good()) {
if (parser.EatEqICaseFlag("FIELDS", num_flag)) {
fields_num = GET_OR_RET(parser.template TakeInt<uint64_t>());
break;
} else if (parser.EatEqICaseFlag("NX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::NX;
} else if (parser.EatEqICaseFlag("XX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::XX;
} else if (parser.EatEqICaseFlag("GT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::GT;
} else if (parser.EatEqICaseFlag("LT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::LT;
} else {
return parser.InvalidSyntax();
}
}

auto remains = parser.Remains();
auto size = args.size();
if (remains != fields_num) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

for (size_t i = size - remains; i < size; i++) {
fields_.emplace_back(args_[i]);
}

return Status::OK();
}

Status expireFieldExecute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) {
if (!srv->storage->GetConfig()->hash_field_expiration) {
return {Status::RedisExecErr, "field expiration feature is disabled"};
}

std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.ExpireFields(ctx, args_[1], expire_, fields_, field_expire_type_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}

return Status::OK();
}

Status ttlExpireExecute(engine::Context &ctx, Server *srv, Connection *conn, std::vector<int64_t> &ret) {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.TTLFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
return Status::OK();
}

uint64_t expire_ = 0;
HashFieldExpireType field_expire_type_ = HashFieldExpireType::None;
std::vector<Slice> fields_;
};

class CommandHExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000 + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHPExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHPExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHExpireTime : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer((now + ttl) / 1000));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHPExpireTime : public CommandFieldExpireBase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The command HExpireTime/HTTL/HPTTL doesn't support NX/XX/LT/GT flags. By the way, it should be good to remove the ttlExpireExecute/expireFieldExecute and only add a reply helper function like redis::IntArray. @torwig @PragmaTwice What do you think?

public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer(now + ttl));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl));
}
return Status::OK();
}
};

class CommandHPTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl));
}
return Status::OK();
}
};

class CommandHPersist : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.PersistFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
Expand All @@ -460,6 +693,15 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1), )
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireAt>("hexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireTime>("hexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPExpire>("hpexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireAt>("hpexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireTime>("hpexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("httl", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPTTL>("hpttl", -5, "read-only", 1, 1, 1), )

} // namespace redis
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ Config::Config() {
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},
{"hash-field-expiration", false, new YesNoField(&hash_field_expiration, false)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ struct Config {

bool skip_block_cache_deallocation_on_close = false;

// whether to enable hash field expiration feature
bool hash_field_expiration = false;

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down
5 changes: 4 additions & 1 deletion src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "db_util.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
#include "types/redis_hash.h"

namespace engine {

Expand Down Expand Up @@ -132,7 +133,9 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const Slice &key, const Sl
return false;
}

return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));
return IsMetadataExpired(ikey, metadata) ||
(metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value)) ||
(metadata.Type() == kRedisHash && redis::Hash::IsFieldExpired(cached_metadata_, value));
}

} // namespace engine
Loading
Loading