Skip to content

Commit a72cada

Browse files
authored
Merge branch 'unstable' into feat/command_exists_optimization
2 parents 1a6d994 + 2a0eb9e commit a72cada

26 files changed

+1201
-54
lines changed

cmake/fmt.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ include_guard()
2020
include(cmake/utils.cmake)
2121

2222
FetchContent_DeclareGitHubWithMirror(fmt
23-
fmtlib/fmt 11.2.0
24-
MD5=feeba3828e393f7dec473052bf0eef97
23+
fmtlib/fmt 12.0.0
24+
MD5=3fa90363bce77d4ab9c229d4f6757fdf
2525
)
2626

2727
FetchContent_MakeAvailableWithArgs(fmt)

cmake/jsoncons.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ include_guard()
2020
include(cmake/utils.cmake)
2121

2222
FetchContent_DeclareGitHubWithMirror(jsoncons
23-
danielaparker/jsoncons v1.4.0
24-
MD5=9288495ca2798082e1e96c8b824e9331
23+
danielaparker/jsoncons v1.4.1
24+
MD5=6e860305aa0aaa1ca0066e23e42acda4
2525
)
2626

2727
FetchContent_MakeAvailableWithArgs(jsoncons

kvrocks.conf

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,5 +1124,56 @@ rocksdb.max_compaction_bytes 0
11241124
# Default: 0
11251125
rocksdb.sst_file_delete_rate_bytes_per_sec 0
11261126

1127+
# Enable RocksDB periodic compaction to force full compaction of SST files older than the specified time (in seconds).
1128+
# If a compaction filter is registered, it will be applied during these compactions.
1129+
# Set to 0 to disable this feature.
1130+
#
1131+
# Default: 18446744073709551614 (0xFFFFFFFFFFFFFFFE, UINT64_MAX - 1), a special value indicating RocksDB-controlled behavior.
1132+
# Currently, RocksDB interprets this default as 30 days (2592000 seconds).
1133+
#
1134+
# Typical use cases:
1135+
# - Enforcing data cleanup via compaction filters (e.g., TTL expiration)
1136+
# - Automatically refreshing data encoding/compression formats without manual intervention
1137+
#
1138+
# Reference: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#periodic-compaction
1139+
#
1140+
# rocksdb.periodic_compaction_seconds 2592000
1141+
1142+
# Enable RocksDB Time-to-Live (TTL) to automatically schedule compaction for SST files containing expired data.
1143+
# - Files containing data older than the TTL (in seconds) will be prioritized for background compaction.
1144+
# - Requires a registered compaction filter (e.g., TTL filter) to identify and remove expired entries.
1145+
# - Set to 0 to disable TTL-based compaction.
1146+
#
1147+
# Default: 18446744073709551614 (0xFFFFFFFFFFFFFFFE, UINT64_MAX - 1), delegating control to RocksDB.
1148+
# Current RocksDB behavior interprets this default as 30 days (2592000 seconds).
1149+
#
1150+
# Use cases:
1151+
# - Automatic expiration of ephemeral data (e.g., session tokens, temporary logs)
1152+
# - Lifecycle management for time-series datasets
1153+
#
1154+
# Reference: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#ttl
1155+
#
1156+
# rocksdb.ttl 2592000
1157+
1158+
# Schedule RocksDB periodic compactions during daily off-peak windows to reduce operational impact.
1159+
#
1160+
# Requirements:
1161+
# - Periodic compaction must be enabled (`periodic-compaction-seconds > 0`)
1162+
# - Time format: "HH:MM-HH:MM" in UTC (e.g., "02:00-04:30" for a 2.5-hour window)
1163+
# - Empty string disables off-peak scheduling
1164+
#
1165+
# Behavior:
1166+
# - RocksDB proactively triggers periodic compactions during the specified off-peak window
1167+
# - Compactions are optimized to complete before the next peak period begins
1168+
#
1169+
# Default: "" (disabled)
1170+
#
1171+
# Typical use cases:
1172+
# - Minimize compaction I/O during business hours for latency-sensitive workloads
1173+
# - Align resource-heavy operations with maintenance windows
1174+
#
1175+
# Reference: https://github.com/facebook/rocksdb/wiki/Daily-Off%E2%80%90peak-Time-Option
1176+
rocksdb.daily_offpeak_time_utc ""
1177+
11271178
################################ NAMESPACE #####################################
11281179
# namespace.test change.me

src/commands/cmd_server.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1586,7 +1586,7 @@ REDIS_REGISTER_COMMANDS(
15861586
MakeCmdAttr<CommandSlaveOf>("replicaof", 3, "read-only exclusive no-script admin", NO_KEY),
15871587
MakeCmdAttr<CommandStats>("stats", 1, "read-only", NO_KEY),
15881588
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive admin", NO_KEY),
1589-
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading bypass-multi no-script", NO_KEY),
1589+
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading bypass-multi no-script admin", NO_KEY),
15901590
MakeCmdAttr<CommandApplyBatch>("applybatch", -2, "write no-multi", NO_KEY),
15911591
MakeCmdAttr<CommandDump>("dump", 2, "read-only", 1, 1, 1),
15921592
MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin", NO_KEY),

src/commands/cmd_stream.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,12 +932,13 @@ class CommandXPending : public Commander {
932932

933933
static Status SendExtResults([[maybe_unused]] Connection *conn, std::string *output,
934934
std::vector<StreamNACK> &ext_results) {
935+
auto now = util::GetTimeStampMS();
935936
output->append(redis::MultiLen(ext_results.size()));
936937
for (const auto &entry : ext_results) {
937938
output->append(redis::MultiLen(4));
938939
output->append(redis::BulkString(entry.id.ToString()));
939940
output->append(redis::BulkString(entry.pel_entry.consumer_name));
940-
output->append(redis::Integer(entry.pel_entry.last_delivery_time_ms));
941+
output->append(redis::Integer(now - entry.pel_entry.last_delivery_time_ms));
941942
output->append(redis::Integer(entry.pel_entry.last_delivery_count));
942943
}
943944

src/commands/cmd_timeseries.cc

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,15 @@ class KeywordCommandBase : public Commander {
217217
};
218218

219219
class CommandTSCreateBase : public KeywordCommandBase {
220+
public:
221+
Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, [[maybe_unused]] Connection *conn,
222+
[[maybe_unused]] std::string *output) override {
223+
if (srv->GetConfig()->cluster_enabled && getCreateOption().labels.size()) {
224+
return {Status::RedisExecErr, "Specifying LABELS is not supported in cluster mode"};
225+
}
226+
return Status::OK();
227+
}
228+
220229
protected:
221230
const TSCreateOption &getCreateOption() const { return create_option_; }
222231

@@ -308,6 +317,9 @@ class CommandTSCreate : public CommandTSCreateBase {
308317
return CommandTSCreateBase::Parse(args);
309318
}
310319
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
320+
auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
321+
if (!sc.IsOK()) return sc;
322+
311323
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
312324
auto s = timeseries_db.Create(ctx, args_[1], getCreateOption());
313325
if (!s.ok() && s.IsInvalidArgument()) return {Status::RedisExecErr, errKeyAlreadyExists};
@@ -387,6 +399,9 @@ class CommandTSAdd : public CommandTSCreateBase {
387399
return CommandTSCreateBase::Parse(args);
388400
}
389401
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
402+
auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
403+
if (!sc.IsOK()) return sc;
404+
390405
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
391406
const auto &option = getCreateOption();
392407

@@ -851,6 +866,9 @@ class CommandTSMGet : public CommandTSMGetBase {
851866
return CommandTSMGetBase::Parse(args);
852867
}
853868
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
869+
if (srv->GetConfig()->cluster_enabled) {
870+
return {Status::RedisExecErr, "TS.MGet is not supported in cluster mode"};
871+
}
854872
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
855873
std::vector<TSMGetResult> results;
856874
auto s = timeseries_db.MGet(ctx, getMGetOption(), is_return_latest_, &results);
@@ -898,6 +916,9 @@ class CommandTSMRange : public CommandTSRangeBase, public CommandTSMGetBase {
898916
return Status::OK();
899917
}
900918
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
919+
if (srv->GetConfig()->cluster_enabled) {
920+
return {Status::RedisExecErr, "TS.MRANGE is not supported in cluster mode"};
921+
}
901922
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
902923
std::vector<TSMRangeResult> results;
903924
auto s = timeseries_db.MRange(ctx, option_, &results);
@@ -978,6 +999,117 @@ class CommandTSMRange : public CommandTSRangeBase, public CommandTSMGetBase {
978999
TSMRangeOption option_;
9791000
};
9801001

1002+
class CommandTSIncrByDecrBy : public CommandTSCreateBase {
1003+
public:
1004+
CommandTSIncrByDecrBy() { registerDefaultHandlers(); }
1005+
Status Parse(const std::vector<std::string> &args) override {
1006+
CommandParser parser(args, 2);
1007+
auto value_parse = parser.TakeFloat<double>();
1008+
if (!value_parse.IsOK()) {
1009+
return {Status::RedisParseErr, errInvalidValue};
1010+
}
1011+
value_ = value_parse.GetValue();
1012+
if (util::ToUpper(args[0]) == "TS.DECRBY") {
1013+
value_ = -value_;
1014+
}
1015+
CommandTSCreateBase::setSkipNum(3);
1016+
return CommandTSCreateBase::Parse(args);
1017+
}
1018+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
1019+
auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
1020+
if (!sc.IsOK()) return sc;
1021+
1022+
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
1023+
const auto &option = getCreateOption();
1024+
1025+
if (!is_ts_set_) {
1026+
// TODO: Should modify function `Add` and `IncrBy` to add a sample with current time
1027+
}
1028+
TSChunk::AddResult res;
1029+
auto s = timeseries_db.IncrBy(ctx, args_[1], {ts_, value_}, option, &res);
1030+
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
1031+
1032+
if (res.type == TSChunk::AddResultType::kOld) {
1033+
*output +=
1034+
redis::Error({Status::NotOK, "timestamp must be equal to or higher than the maximum existing timestamp"});
1035+
} else {
1036+
*output += FormatAddResultAsRedisReply(res);
1037+
}
1038+
return Status::OK();
1039+
}
1040+
1041+
protected:
1042+
void registerDefaultHandlers() override {
1043+
CommandTSCreateBase::registerDefaultHandlers();
1044+
registerHandler("TIMESTAMP", [this](TSOptionsParser &parser) {
1045+
auto s = handleTimeStamp(parser, ts_);
1046+
if (!s.IsOK()) return s;
1047+
is_ts_set_ = true;
1048+
return Status::OK();
1049+
});
1050+
}
1051+
static Status handleTimeStamp(TSOptionsParser &parser, uint64_t &ts) {
1052+
auto parse_timestamp = parser.TakeInt<uint64_t>();
1053+
if (!parse_timestamp.IsOK()) {
1054+
return {Status::RedisParseErr, errInvalidTimestamp};
1055+
}
1056+
ts = parse_timestamp.GetValue();
1057+
return Status::OK();
1058+
}
1059+
1060+
private:
1061+
bool is_ts_set_ = false;
1062+
uint64_t ts_ = 0;
1063+
double value_ = 0;
1064+
};
1065+
1066+
class CommandTSDel : public Commander {
1067+
public:
1068+
Status Parse(const std::vector<std::string> &args) override {
1069+
if (args.size() < 4) {
1070+
return {Status::RedisParseErr, "wrong number of arguments for 'ts.del' command"};
1071+
}
1072+
CommandParser parser(args, 2);
1073+
// Parse start timestamp
1074+
auto start_parse = parser.TakeInt<uint64_t>();
1075+
if (!start_parse.IsOK()) {
1076+
auto start_ts_str = parser.TakeStr();
1077+
if (!start_ts_str.IsOK() || start_ts_str.GetValue() != "-") {
1078+
return {Status::RedisParseErr, "wrong fromTimestamp"};
1079+
}
1080+
// "-" means use default start timestamp: 0
1081+
} else {
1082+
start_ts_ = start_parse.GetValue();
1083+
}
1084+
// Parse end timestamp
1085+
auto end_parse = parser.TakeInt<uint64_t>();
1086+
if (!end_parse.IsOK()) {
1087+
auto end_ts_str = parser.TakeStr();
1088+
if (!end_ts_str.IsOK() || end_ts_str.GetValue() != "+") {
1089+
return {Status::RedisParseErr, "wrong toTimestamp"};
1090+
}
1091+
// "+" means use default end timestamp: MAX_TIMESTAMP
1092+
} else {
1093+
end_ts_ = end_parse.GetValue();
1094+
}
1095+
return Commander::Parse(args);
1096+
}
1097+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
1098+
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
1099+
uint64_t deleted_count = 0;
1100+
auto s = timeseries_db.Del(ctx, args_[1], start_ts_, end_ts_, &deleted_count);
1101+
if (!s.ok()) {
1102+
return {Status::RedisExecErr, s.ToString()};
1103+
}
1104+
*output = redis::Integer(deleted_count);
1105+
return Status::OK();
1106+
}
1107+
1108+
private:
1109+
uint64_t start_ts_ = 0;
1110+
uint64_t end_ts_ = TSSample::MAX_TIMESTAMP;
1111+
};
1112+
9811113
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", -2, "write", 1, 1, 1),
9821114
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 1),
9831115
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, -3, 1),
@@ -986,6 +1118,9 @@ REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", -2
9861118
MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 1, 1, 1),
9871119
MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, "write", 1, 2, 1),
9881120
MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", NO_KEY),
989-
MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, "read-only", NO_KEY), );
1121+
MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, "read-only", NO_KEY),
1122+
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3, "write", 1, 1, 1),
1123+
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3, "write", 1, 1, 1),
1124+
MakeCmdAttr<CommandTSDel>("ts.del", -4, "write", 1, 1, 1), );
9901125

9911126
} // namespace redis

src/config/config.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,10 @@ Config::Config() {
307307
{"rocksdb.max_compaction_bytes", false, new Int64Field(&rocks_db.max_compaction_bytes, 0, 0, INT64_MAX)},
308308
{"rocksdb.sst_file_delete_rate_bytes_per_sec", false,
309309
new Int64Field(&rocks_db.sst_file_delete_rate_bytes_per_sec, 0, 0, INT64_MAX)},
310+
{"rocksdb.periodic_compaction_seconds", false,
311+
new UInt64Field(&rocks_db.periodic_compaction_seconds, kDefaultRocksdbPeriodicCompactionSeconds, 0, UINT64_MAX)},
312+
{"rocksdb.ttl", false, new UInt64Field(&rocks_db.ttl, kDefaultRocksdbTTL, 0, UINT64_MAX)},
313+
{"rocksdb.daily_offpeak_time_utc", false, new StringField(&rocks_db.daily_offpeak_time_utc, "")},
310314

311315
/* rocksdb write options */
312316
{"rocksdb.write_options.sync", true, new YesNoField(&rocks_db.write_options.sync, false)},
@@ -732,6 +736,9 @@ void Config::initFieldCallback() {
732736
srv->storage->SetSstFileDeleteRateBytesPerSecond(rocks_db.sst_file_delete_rate_bytes_per_sec);
733737
return Status::OK();
734738
}},
739+
{"rocksdb.periodic_compaction_seconds", set_cf_option_cb},
740+
{"rocksdb.ttl", set_cf_option_cb},
741+
{"rocksdb.daily_offpeak_time_utc", set_db_option_cb},
735742
{"rocksdb.level0_slowdown_writes_trigger",
736743
[this, &set_cf_option_cb](Server *srv, const std::string &k,
737744
[[maybe_unused]] const std::string &v) -> Status {

src/config/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ constexpr const uint32_t kDefaultPort = 6666;
5757
constexpr const char *kDefaultNamespace = "__namespace";
5858
constexpr int KVROCKS_MAX_LSM_LEVEL = 7;
5959

60+
constexpr const uint64_t kDefaultRocksdbTTL = UINT64_MAX - 1;
61+
constexpr const uint64_t kDefaultRocksdbPeriodicCompactionSeconds = UINT64_MAX - 1;
62+
6063
const std::vector<ConfigEnum<spdlog::level::level_enum>> log_levels{
6164
{"debug", spdlog::level::debug}, {"info", spdlog::level::info}, {"warning", spdlog::level::warn},
6265
{"error", spdlog::level::err}, {"fatal", spdlog::level::critical},
@@ -243,6 +246,9 @@ struct Config {
243246
bool partition_filters;
244247
int64_t max_compaction_bytes;
245248
int64_t sst_file_delete_rate_bytes_per_sec = 0;
249+
uint64_t periodic_compaction_seconds = kDefaultRocksdbPeriodicCompactionSeconds;
250+
uint64_t ttl = kDefaultRocksdbTTL;
251+
std::string daily_offpeak_time_utc;
246252

247253
struct WriteOptions {
248254
bool sync;

src/server/server.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,26 @@ Server::InfoEntries Server::GetRocksDBInfo() {
10241024
cf_stats_map["memtable-limit-delays"]);
10251025
entries.emplace_back("memtable_count_limit_stop[" + cf_handle->GetName() + "]",
10261026
cf_stats_map["memtable-limit-stops"]);
1027+
1028+
// Get the SST file count in all levels
1029+
std::string sst_file_at_level = "[";
1030+
for (int level = 0; level < KVROCKS_MAX_LSM_LEVEL; level++) {
1031+
std::string sst_file_count;
1032+
db->GetProperty(cf_handle, rocksdb::DB::Properties::kNumFilesAtLevelPrefix + std::to_string(level),
1033+
&sst_file_count);
1034+
if (level != 0) {
1035+
sst_file_at_level += ",";
1036+
}
1037+
sst_file_at_level += sst_file_count;
1038+
}
1039+
entries.emplace_back("num_files_at_level[" + cf_handle->GetName() + "]", sst_file_at_level + "]");
1040+
1041+
// Get the estimate pending compaction bytes for the current column family
1042+
std::string estimate_pending_compaction_bytes;
1043+
db->GetProperty(cf_handle, rocksdb::DB::Properties::kEstimatePendingCompactionBytes,
1044+
&estimate_pending_compaction_bytes);
1045+
entries.emplace_back("estimate_pending_compaction_bytes[" + cf_handle->GetName() + "]",
1046+
estimate_pending_compaction_bytes);
10271047
}
10281048

10291049
auto rocksdb_stats = storage->GetDB()->GetDBOptions().statistics;

0 commit comments

Comments
 (0)