Skip to content

Commit

Permalink
fix(server): debug populate consume less memory (dragonflydb#4384)
Browse files Browse the repository at this point in the history
* fix server: debug populate consume less memory

Signed-off-by: adi_holden <adi@dragonflydb.io>
  • Loading branch information
adiholden authored Jan 1, 2025
1 parent 2abe2c0 commit 810af83
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 31 deletions.
48 changes: 28 additions & 20 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ tuple<const CommandId*, absl::InlinedVector<string, 5>> GeneratePopulateCommand(
args.push_back(GenerateValue(val_size, random_value, gen));
}
} else if (type == "JSON") {
cid = registry.Find("JSON.SET");
cid = registry.Find("JSON.MERGE");
args.push_back("$");

string json = "{";
for (size_t i = 0; i < elements; ++i) {
absl::StrAppend(&json, "\"", i, "\":\"", GenerateValue(val_size, random_value, gen), "\",");
absl::StrAppend(&json, "\"", GenerateValue(val_size / 2, random_value, gen), "\":\"",
GenerateValue(val_size / 2, random_value, gen), "\",");
}
json[json.size() - 1] = '}'; // Replace last ',' with '}'
args.push_back(json);
Expand Down Expand Up @@ -157,30 +158,37 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool
absl::InlinedVector<string_view, 5> args_view;
facade::CapturingReplyBuilder crb;
ConnectionContext local_cntx{cntx, stub_tx.get()};

absl::InsecureBitGen gen;
for (unsigned i = 0; i < batch.sz; ++i) {
string key = absl::StrCat(prefix, ":", batch.index[i]);
uint32_t elements_left = elements;

while (elements_left) {
// limit rss grow by 32K by limiting the element count in each command.
uint32_t max_batch_elements = std::max(32_KB / val_size, 1ULL);
uint32_t populate_elements = std::min(max_batch_elements, elements_left);
elements_left -= populate_elements;
auto [cid, args] =
GeneratePopulateCommand(type, key, val_size, random_value, populate_elements,
*sf->service().mutable_registry(), &gen);
if (!cid) {
LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?";
break;
}

auto [cid, args] = GeneratePopulateCommand(type, std::move(key), val_size, random_value,
elements, *sf->service().mutable_registry(), &gen);
if (!cid) {
LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?";
break;
}

args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);
args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);

stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);
stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);

sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}
}

local_tx->UnlockMulti();
Expand Down
3 changes: 2 additions & 1 deletion tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
("ZSET", 250_000, 100, 100),
("LIST", 300_000, 100, 100),
("STRING", 3_500_000, 1000, 1),
("STREAM", 260_000, 100, 100),
("STREAM", 280_000, 100, 100),
],
)
# We limit to 5gb just in case to sanity check the gh runner. Otherwise, if we ask for too much
Expand Down Expand Up @@ -69,6 +69,7 @@ async def check_memory():
await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs")

await check_memory()
await client.execute_command("FLUSHALL")


@pytest.mark.asyncio
Expand Down
14 changes: 4 additions & 10 deletions tests/dragonfly/snapshot_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis):
@dfly_args({"serialization_max_chunk_size": 4096, "proactor_threads": 1})
@pytest.mark.parametrize(
"cont_type",
[
("HASH"),
("SET"),
("ZSET"),
("LIST"),
],
[("HASH"), ("SET"), ("ZSET"), ("LIST"), ("STREAM")],
)
@pytest.mark.slow
async def test_big_value_serialization_memory_limit(df_factory, cont_type):
Expand All @@ -590,17 +585,16 @@ async def test_big_value_serialization_memory_limit(df_factory, cont_type):
await client.execute_command(
f"debug populate 1 prefix {element_size} TYPE {cont_type} RAND ELEMENTS {elements}"
)
await asyncio.sleep(1)

info = await client.info("ALL")
# rss double's because of DEBUG POPULATE
assert info["used_memory_peak_rss"] > (one_gb * 2)
assert info["used_memory_peak_rss"] < (one_gb * 1.2)
# if we execute SAVE below without big value serialization we trigger the assertion below.
# note the peak would reach (one_gb * 3) without it.
await client.execute_command("SAVE")
info = await client.info("ALL")

upper_limit = 2_250_000_000 # 2.25 GB
assert info["used_memory_peak_rss"] < upper_limit
assert info["used_memory_peak_rss"] < (one_gb * 1.3)

await client.execute_command("FLUSHALL")
await client.close()
Expand Down

0 comments on commit 810af83

Please sign in to comment.