Skip to content

Commit

Permalink
fix: improve error propagation with RESTORE commands (#4428)
Browse files Browse the repository at this point in the history
* fix: improve error propagation with RESTORE commands

Also, provide better logs if AddOrNew function fails adding a new entry
  • Loading branch information
romange authored Jan 9, 2025
1 parent f3426bd commit c77e7cc
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 50 deletions.
17 changes: 13 additions & 4 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,20 @@ bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const {
// we estimate how much memory we will take with the current capacity
// even though we may currently use less memory.
// see https://github.com/dragonflydb/dragonfly/issues/256#issuecomment-1227095503
size_t table_free_items = (tbl.capacity() - tbl.size()) + PrimeTable::kSegCapacity;
size_t obj_bytes_estimation =
db_slice_->bytes_per_object() * table_free_items * GetFlag(FLAGS_table_growth_margin);
size_t table_free_items = ((tbl.capacity() - tbl.size()) + PrimeTable::kSegCapacity) *
GetFlag(FLAGS_table_growth_margin);

size_t obj_bytes_estimation = db_slice_->bytes_per_object() * table_free_items;
bool res = mem_available > int64_t(PrimeTable::kSegBytes + obj_bytes_estimation);
VLOG(2) << "available: " << table_free_items << ", res: " << res;
if (res) {
VLOG(1) << "free_items: " << table_free_items
<< ", obj_bytes: " << db_slice_->bytes_per_object() << " "
<< " mem_available: " << mem_available;
} else {
LOG_EVERY_T(INFO, 1) << "Can't grow, free_items " << table_free_items
<< ", obj_bytes: " << db_slice_->bytes_per_object() << " "
<< " mem_available: " << mem_available;
}

return res;
}
Expand Down
89 changes: 43 additions & 46 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ std::optional<RdbVersion> GetRdbVersion(std::string_view msg) {
return std::nullopt;
}

// The footer looks like this: version (2 bytes) | crc64 (8 bytes)
const std::uint8_t* footer =
reinterpret_cast<const std::uint8_t*>(msg.data()) + (msg.size() - DUMP_FOOTER_SIZE);
const RdbVersion version = (*(footer + 1) << 8 | (*footer));
Expand All @@ -63,9 +64,10 @@ std::optional<RdbVersion> GetRdbVersion(std::string_view msg) {
return std::nullopt;
}

uint64_t expected_cs =
// Compute expected crc64 based on the actual data upto the expected crc64 field.
uint64_t actual_cs =
crc64(0, reinterpret_cast<const uint8_t*>(msg.data()), msg.size() - sizeof(uint64_t));
uint64_t actual_cs = absl::little_endian::Load64(footer + sizeof(version));
uint64_t expected_cs = absl::little_endian::Load64(footer + 2); // skip the version

if (actual_cs != expected_cs) {
LOG(WARNING) << "CRC check failed for restore command, expecting: " << expected_cs << " got "
Expand Down Expand Up @@ -155,12 +157,9 @@ class RdbRestoreValue : protected RdbLoaderBase {
rdb_version_ = rdb_version;
}

// Returns default ItAndUpdater if Add failed.
// In case a valid ItAndUpdater is returned, then second is true in case a new key is added,
// false if the existing key is updated (should not happen unless we have a bug).
pair<DbSlice::ItAndUpdater, bool> Add(string_view key, string_view payload, const DbContext& cntx,
const RestoreArgs& args, DbSlice* db_slice,
EngineShard* shard);
OpResult<DbSlice::AddOrFindResult> Add(string_view key, string_view payload,
const DbContext& cntx, const RestoreArgs& args,
DbSlice* db_slice);

private:
std::optional<OpaqueObj> Parse(io::Source* source);
Expand Down Expand Up @@ -191,17 +190,17 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* sourc
return std::optional<OpaqueObj>(std::move(obj));
}

pair<DbSlice::ItAndUpdater, bool> RdbRestoreValue::Add(string_view key, string_view data,
const DbContext& cntx,
const RestoreArgs& args, DbSlice* db_slice,
EngineShard* shard) {
OpResult<DbSlice::AddOrFindResult> RdbRestoreValue::Add(string_view key, string_view data,
const DbContext& cntx,
const RestoreArgs& args,
DbSlice* db_slice) {
InMemSource data_src(data);
PrimeValue pv;
bool first_parse = true;
do {
auto opaque_res = Parse(&data_src);
if (!opaque_res) {
return {};
return OpStatus::INVALID_VALUE;
}

LoadConfig config;
Expand All @@ -217,18 +216,16 @@ pair<DbSlice::ItAndUpdater, bool> RdbRestoreValue::Add(string_view key, string_v
if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) {
// we failed - report and exit
LOG(WARNING) << "error while trying to read data: " << ec;
return {};
return OpStatus::INVALID_VALUE;
}
} while (pending_read_.remaining > 0);

if (auto res = db_slice->AddOrUpdate(cntx, key, std::move(pv), args.ExpirationTime()); res) {
auto res = db_slice->AddOrUpdate(cntx, key, std::move(pv), args.ExpirationTime());
if (res) {
res->it->first.SetSticky(args.Sticky());
shard->search_indices()->AddDoc(key, cntx, res->it->second);
return {DbSlice::ItAndUpdater{
.it = res->it, .exp_it = res->exp_it, .post_updater = std::move(res->post_updater)},
res->is_new};
db_slice->shard_owner()->search_indices()->AddDoc(key, cntx, res->it->second);
}
return {};
return res;
}

[[nodiscard]] bool RestoreArgs::UpdateExpiration(int64_t now_msec) {
Expand Down Expand Up @@ -477,15 +474,17 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {
restore_args.SetSticky(serialized_value_.sticky);

RdbRestoreValue loader(serialized_value_.version.value());
auto [restored_dest_it, is_new] = loader.Add(dest_key_, serialized_value_.value, op_args.db_cntx,
restore_args, &db_slice, shard);

if (restored_dest_it.IsValid()) {
LOG_IF(DFATAL, !is_new) << "Unexpected override for key " << dest_key_ << " " << dest_found_;
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (bc) {
bc->AwakeWatched(t->GetDbIndex(), dest_key_);
}
auto add_res =
loader.Add(dest_key_, serialized_value_.value, op_args.db_cntx, restore_args, &db_slice);

if (!add_res)
return add_res.status();

LOG_IF(DFATAL, !add_res->is_new)
<< "Unexpected override for key " << dest_key_ << " " << dest_found_;
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (bc) {
bc->AwakeWatched(t->GetDbIndex(), dest_key_);
}

if (shard->journal()) {
Expand Down Expand Up @@ -534,8 +533,8 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
return OpStatus::KEY_NOTFOUND;
}

OpResult<bool> OpRestore(const OpArgs& op_args, std::string_view key, std::string_view payload,
RestoreArgs restore_args, RdbVersion rdb_version) {
OpStatus OpRestore(const OpArgs& op_args, std::string_view key, std::string_view payload,
RestoreArgs restore_args, RdbVersion rdb_version) {
if (!restore_args.UpdateExpiration(op_args.db_cntx.time_now_ms)) {
return OpStatus::OUT_OF_RANGE;
}
Expand Down Expand Up @@ -563,18 +562,17 @@ OpResult<bool> OpRestore(const OpArgs& op_args, std::string_view key, std::strin

if (restore_args.Expired()) {
VLOG(1) << "the new key '" << key << "' already expired, will not save the value";
return true;
return OpStatus::OK;
}

RdbRestoreValue loader(rdb_version);
auto [res_it, is_new] =
loader.Add(key, payload, op_args.db_cntx, restore_args, &db_slice, op_args.shard);
LOG_IF(DFATAL, res_it.IsValid() && !is_new)
auto add_res = loader.Add(key, payload, op_args.db_cntx, restore_args, &db_slice);
LOG_IF(DFATAL, add_res && !add_res->is_new)
<< "Unexpected override for key " << key << ", found previous " << found_prev
<< " override: " << restore_args.Replace()
<< ", type: " << ObjTypeToString(res_it.it->second.ObjType());
<< ", type: " << ObjTypeToString(add_res->it->second.ObjType());

return res_it.IsValid();
return add_res.status();
}

bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, string* scratch,
Expand Down Expand Up @@ -1505,18 +1503,17 @@ void GenericFamily::Restore(CmdArgList args, const CommandContext& cmd_cntx) {
rdb_version.value());
};

OpResult<bool> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
OpStatus result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));

if (result) {
if (result.value()) {
switch (result) {
case OpStatus::OK:
return builder->SendOk();
} else {
case OpStatus::KEY_EXISTS:
return builder->SendError("BUSYKEY Target key name already exists.");
case OpStatus::INVALID_VALUE:
return builder->SendError("Bad data format");
}
} else if (result.status() == OpStatus::KEY_EXISTS) {
return builder->SendError("BUSYKEY: key name already exists.");
} else {
return builder->SendError(result.status());
default:
return builder->SendError(result);
}
}

Expand Down
27 changes: 27 additions & 0 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,13 @@ TEST_F(GenericFamilyTest, Restore) {
EXPECT_EQ(resp.GetString(), "OK");
resp = Run({"zrange", "my-zset", "0", "-1"});
EXPECT_EQ("elon", resp.GetString());

// corrupt the dump file but keep the crc correct.
ZSET_LISTPACK_DUMP[0] = 0x12;
uint8_t crc64[8] = {0x4e, 0xa3, 0x4c, 0x89, 0xc4, 0x8b, 0xd9, 0xe4};
memcpy(ZSET_LISTPACK_DUMP + 19, crc64, 8);
resp = Run({"restore", "invalid", "0", ToSV(ZSET_LISTPACK_DUMP)});
EXPECT_THAT(resp, ErrArg("ERR Bad data format"));
}

TEST_F(GenericFamilyTest, Info) {
Expand Down Expand Up @@ -845,4 +852,24 @@ TEST_F(GenericFamilyTest, ExpireTime) {
EXPECT_EQ(expire_time_in_ms, CheckedInt({"PEXPIRETIME", "foo"}));
}

TEST_F(GenericFamilyTest, RestoreOOM) {
max_memory_limit = 20000000;
Run({"set", "src", string(5000, 'x')});
auto resp = Run({"dump", "src"});

string dump = resp.GetString();

// Let Dragonfly propagate max_memory_limit to shards. It does not have to be precise,
// the loop should have enough time for the internal processes to progress.
usleep(10000);
unsigned i = 0;
for (; i < 10000; ++i) {
resp = Run({"restore", absl::StrCat("dst", i), "0", dump});
if (resp != "OK")
break;
}
ASSERT_LT(i, 10000);
EXPECT_THAT(resp, ErrArg("Out of memory"));
}

} // namespace dfly

0 comments on commit c77e7cc

Please sign in to comment.