Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
03b69e1
feat: impl tdigest.revrank
donghao526 Aug 17, 2025
97df4a1
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
70a39d3
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
dde8410
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
0d3e9cc
feat: impl tdigest.revrank
donghao526 Aug 19, 2025
bb172a8
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
a64add4
test: add unit test for tdigest.revrank
donghao526 Aug 19, 2025
3954b1f
Merge branch 'unstable' into feature/tdigest-revrank
PragmaTwice Aug 19, 2025
05d1202
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
f688e14
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
8bcad0f
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
46ac984
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
495e072
add golang test cases for tdigest.revrank
donghao526 Aug 20, 2025
2b6785d
add golang test cases for tdigest.revrank
invalid-email-address Aug 20, 2025
3af3b54
feat: impl tdigest.revrank
invalid-email-address Aug 20, 2025
f3d85d3
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 20, 2025
e68689d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 21, 2025
eb8674f
feat: impl tdigest.revrank
invalid-email-address Aug 27, 2025
c70f410
Merge branch 'feature/tmp' into feature/tdigest-revrank
invalid-email-address Aug 27, 2025
b991d0d
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Aug 27, 2025
4c9a41d
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
invalid-email-address Aug 28, 2025
543fda0
fix(replication): Fix Seg Fault On Signal When Replication is Enabled…
zhixinwen Aug 20, 2025
e0d39a7
chore(.asf.yaml): make 2.13 a protected branches (#3129)
PragmaTwice Aug 20, 2025
a4ed14c
feat(ts): Add support for data writing and `TS.CREATE`, `TS.ADD/MADD`…
yezhizi Aug 20, 2025
9d6c532
feat(ts): Add `TS.INFO` command (#3133)
yezhizi Aug 22, 2025
ff658f8
chore(.asf.yaml): enable auto merge and disable wiki (#3137)
PragmaTwice Aug 22, 2025
53e82f8
chore: remove unused `autoResizeBlockAndSST` method and config (#3136)
jonahgao Aug 22, 2025
6df3309
feat(scripting): support strict key-accessing mode for lua scripting …
PragmaTwice Aug 23, 2025
201afed
feat(Dockerfile): add a UID for the user in the container (#3138)
SpecLad Aug 24, 2025
0851c22
feat(ts): Add data query support and `TS.RANGE` command (#3140)
yezhizi Aug 25, 2025
c7ed36f
feat(ts): Add `TS.GET` command (#3142)
yezhizi Aug 26, 2025
3a898fe
chore(config): enable `level_compaction_dynamic_level_bytes` by defau…
jonahgao Aug 26, 2025
3711578
perf(storage): eliminate unnecessary `rocksdb::DB::ListColumnFamilies…
jonahgao Aug 27, 2025
4b4f684
fix(scan): pattern-based SCAN iterations may skip remaining keys (#3146)
sryanyuan Aug 27, 2025
bd268b4
style: add some comments on TDigestRank
donghao526 Aug 28, 2025
8e6a7f9
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Aug 28, 2025
6662240
refactor: remove commented code
donghao526 Aug 28, 2025
e7f06a2
style: format code
donghao526 Aug 28, 2025
367981c
Merge branch 'unstable' into feature/tdigest-revrank
LindaSummer Aug 29, 2025
4b8cd6a
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Sep 18, 2025
07836fd
feat: sort the input using map in revrank
donghao526 Sep 19, 2025
f44bc56
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Sep 19, 2025
2aded75
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 14, 2025
f4a9c53
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
5023de8
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 25, 2025
e3629d9
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 25, 2025
ae05623
fix: fix format
donghao526 Oct 25, 2025
0cf8c8a
fix: fix clang-tidy
donghao526 Oct 26, 2025
f855895
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 26, 2025
c6d5fc7
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 27, 2025
fbeb111
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Oct 27, 2025
f7d4b43
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 27, 2025
62dfbb5
feat: add the support of TDIGEST.REVRANK command
donghao526 Oct 27, 2025
3471dac
fix: remove unnecessary empty check for tdigest.revrank
donghao526 Oct 28, 2025
8e700fc
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Oct 28, 2025
e943ab8
feat: use a stable way to compare two double values
donghao526 Oct 29, 2025
95516b1
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Oct 29, 2025
85e4c44
fix: fix the comments for clarity
donghao526 Oct 30, 2025
ac32080
fix: fix the comments for clarity
donghao526 Oct 30, 2025
79d2e33
fix: use DoubleEqual to ensure the two centorid.mean comparison stable
donghao526 Oct 30, 2025
625510d
fix: Corrected phrasing for proper naming convention
donghao526 Oct 30, 2025
cc01373
fix: Corrected phrasing for proper naming convention
donghao526 Oct 30, 2025
07b1a09
feat: add a guard to validate the revrank result
donghao526 Oct 31, 2025
b14dee6
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Oct 31, 2025
d385597
Merge branch 'unstable' into feature/tdigest-revrank
aleksraiden Oct 31, 2025
222dec1
feat: unique the inputs before call tdigest.RevRank
donghao526 Nov 1, 2025
97426ac
Merge branch 'feature/tdigest-revrank' of github.com:donghao526/kvroc…
donghao526 Nov 1, 2025
baeafed
fix: fix typo
donghao526 Nov 1, 2025
9e97b6e
Merge branch 'unstable' into feature/tdigest-revrank
donghao526 Nov 2, 2025
8a8592a
chore: update comments for clarity
donghao526 Nov 3, 2025
cc4f736
style: use a new line for the increment to make the code better for r…
donghao526 Nov 3, 2025
17d9d48
fix: fix the range of the check for ranks
donghao526 Nov 3, 2025
83fae63
chore: resolve the grammatical error in the comments
donghao526 Nov 3, 2025
54ad714
Merge branch 'unstable' into feature/tdigest-revrank
PragmaTwice Nov 3, 2025
e3d85b4
refactor: replace the unnecessary std::unordered_set with a std::set
donghao526 Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,54 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};

class CommandTDigestRevRank : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
key_name_ = args[1];

std::set<std::string> unique_inputs_set(args.begin() + 2, args.end());
origin_inputs_.assign(args.begin() + 2, args.end());

unique_inputs_.reserve(unique_inputs_set.size());
size_t i = 0;
for (const auto &input : unique_inputs_set) {
auto value = ParseFloat(input);
if (!value) {
return {Status::RedisParseErr, errValueIsNotFloat};
}
unique_inputs_.push_back(*value);
unique_inputs_order_[input] = i;
++i;
}
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<int> result;
result.reserve(origin_inputs_.size());
if (const auto s = tdigest.RevRank(ctx, key_name_, unique_inputs_, result); !s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}

std::vector<std::string> rev_ranks;
rev_ranks.reserve(origin_inputs_.size());
for (const auto &v : origin_inputs_) {
rev_ranks.push_back(redis::Integer(result[unique_inputs_order_[v]]));
}
*output = redis::Array(rev_ranks);
return Status::OK();
}

private:
std::string key_name_;
std::vector<double> unique_inputs_;
std::map<std::string, size_t> unique_inputs_order_;
std::vector<std::string> origin_inputs_;
};

class CommandTDigestMinMax : public Commander {
public:
explicit CommandTDigestMinMax(bool is_min) : is_min_(is_min) {}
Expand Down Expand Up @@ -369,6 +417,7 @@ REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestRevRank>("tdigest.revrank", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange));
Expand Down
85 changes: 65 additions & 20 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class DummyCentroids {
return iter_ != centroids_.cend();
}

bool IsBegin() { return iter_ == centroids_.cbegin(); }

// The Prev function can only be called for item is not cend,
// because we must guarantee the iterator to be inside the valid range before iteration.
bool Prev() {
Expand Down Expand Up @@ -186,8 +188,37 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const Slice& digest_name, con
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
rocksdb::Status TDigest::mergeNodes(engine::Context& ctx, const std::string& ns_key, TDigestMetadata* metadata) {
if (metadata->unmerged_nodes == 0) {
return rocksdb::Status::OK();
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, metadata); !status.ok()) {
return status;
}

std::string metadata_bytes;
metadata->Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}

ctx.RefreshLatestSnapshot();
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
Expand All @@ -198,31 +229,45 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (metadata.unmerged_nodes > 0) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}
if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, &metadata); !status.ok()) {
return status;
}
std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

std::string metadata_bytes;
metadata.Encode(&metadata_bytes);
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}
auto dump_centroids = DummyCentroids(metadata, centroids);
auto status = TDigestRevRank(dump_centroids, inputs, result);
if (!status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); !status.ok()) {
return status;
}
rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

ctx.RefreshLatestSnapshot();
if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class TDigest : public SubKeyScanner {

rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);

rocksdb::Status RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

private:
Expand Down Expand Up @@ -117,6 +118,8 @@ class TDigest : public SubKeyScanner {
std::string internalSegmentGuardPrefixKey(const TDigestMetadata& metadata, const std::string& ns_key,
SegmentType seg) const;

rocksdb::Status mergeNodes(engine::Context& ctx, const std::string& ns_key, TDigestMetadata* metadata);

rocksdb::Status mergeCurrentBuffer(engine::Context& ctx, const std::string& ns_key,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
const std::vector<double>* additional_buffer = nullptr,
Expand Down
91 changes: 91 additions & 0 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <fmt/format.h>

#include <map>
#include <numeric>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -150,3 +152,92 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q) {
diff /= (lc.weight / 2 + rc.weight / 2);
return Lerp(lc.mean, rc.mean, diff);
}

inline int DoubleCompare(double a, double b, double rel_eps = 1e-12, double abs_eps = 1e-9) {
double diff = a - b;
double adiff = std::abs(diff);
if (adiff <= abs_eps) return 0;
double maxab = std::max(std::abs(a), std::abs(b));
if (adiff <= maxab * rel_eps) return 0;
return (diff < 0) ? -1 : 1;
}

inline bool DoubleEqual(double a, double b, double rel_eps = 1e-12, double abs_eps = 1e-9) {
return DoubleCompare(a, b, rel_eps, abs_eps) == 0;
}

struct DoubleComparator {
bool operator()(const double& a, const double& b) const { return DoubleCompare(a, b) == -1; }
};

template <typename TD>
inline Status TDigestRevRank(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
std::map<double, size_t, DoubleComparator> value_to_indices;
for (size_t i = 0; i < inputs.size(); ++i) {
value_to_indices[inputs[i]] = i;
}
Comment on lines +175 to +178
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When duplicate input values exist, this map will only store the index of the last occurrence, causing incorrect results to be returned. The value_to_indices map should use a multimap or map to vector to store all indices for duplicate values, not just the last one.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inputs have been deduplicated before calling this function, and there are no duplicate input values


result.clear();
result.resize(inputs.size(), -2);
auto it = value_to_indices.rbegin();

// handle inputs larger than maximum
while (it != value_to_indices.rend() && it->first > td.Max()) {
result[it->second] = -1;
++it;
}

auto iter = td.End();
double cumulative_weight = 0;
while (iter->Valid() && it != value_to_indices.rend()) {
auto centroid = GET_OR_RET(iter->GetCentroid());
auto input_value = it->first;
if (DoubleEqual(centroid.mean, input_value)) {
auto current_mean = centroid.mean;
auto current_mean_cumulative_weight = cumulative_weight + centroid.weight / 2;
cumulative_weight += centroid.weight;

// handle all the previous centroids which has the same mean
while (!iter->IsBegin() && iter->Prev()) {
auto next_centroid = GET_OR_RET(iter->GetCentroid());
if (!DoubleEqual(current_mean, next_centroid.mean)) {
// move back to the last equal centroid, because we will process it in the next loop
iter->Next();
break;
}
current_mean_cumulative_weight += next_centroid.weight / 2;
cumulative_weight += next_centroid.weight;
}

// handle the prev inputs which have the same value
result[it->second] = static_cast<int>(current_mean_cumulative_weight);
++it;
if (iter->IsBegin()) {
break;
}
iter->Prev();
} else if (DoubleCompare(centroid.mean, input_value) > 0) {
cumulative_weight += centroid.weight;
if (iter->IsBegin()) {
break;
}
iter->Prev();
} else {
result[it->second] = static_cast<int>(cumulative_weight);
++it;
}
}

// handle inputs less than minimum
while (it != value_to_indices.rend()) {
result[it->second] = static_cast<int>(td.TotalWeight());
++it;
}

for (auto r : result) {
if (r <= -2) {
return Status{Status::InvalidArgument, "invalid result when computing revrank"};
}
}
return Status::OK();
}
76 changes: 76 additions & 0 deletions tests/cppunit/types/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,79 @@ TEST_F(RedisTDigestTest, Quantile_returns_nan_on_empty_tdigest) {
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_FALSE(result.quantiles) << "should not have quantiles with empty tdigest";
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_containing_different_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 20, 30, 40, 50, 60};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {0, 10, 20, 30, 40, 50, 60, 70};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{6, 5, 4, 3, 2, 1, 0, -1};

for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_the_set_containing_several_identical_elements) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());
std::vector<double> input{10, 10, 10, 20, 20};
status = tdigest_->Add(*ctx_, test_digest_name, input);
ASSERT_TRUE(status.ok()) << status.ToString();

std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{3, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();

status = tdigest_->Add(*ctx_, test_digest_name, std::vector<double>{10});
ASSERT_TRUE(status.ok()) << status.ToString();

result.clear();
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result_new = std::vector<double>{4, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result_new[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}

TEST_F(RedisTDigestTest, RevRank_on_empty_tdigest) {
std::string test_digest_name = "test_digest_revrank" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<int> result;
result.reserve(2);
const std::vector<double> value = {10, 20};
status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
const auto expect_result = std::vector<double>{-2, -2};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
}
Loading