Skip to content

Commit

Permalink
apply: rename wb to kv_wb (#4721)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored May 20, 2019
1 parent 16b2427 commit b780a80
Showing 1 changed file with 49 additions and 49 deletions.
98 changes: 49 additions & 49 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ struct ApplyContext {
apply_res: Vec<ApplyRes>,
exec_ctx: Option<ExecContext>,

wb: Option<WriteBatch>,
wb_last_bytes: u64,
wb_last_keys: u64,
kv_wb: Option<WriteBatch>,
kv_wb_last_bytes: u64,
kv_wb_last_keys: u64,

last_applied_index: u64,
committed_count: usize,
Expand Down Expand Up @@ -321,11 +321,11 @@ impl ApplyContext {
engines,
router,
notifier,
wb: None,
kv_wb: None,
cbs: MustConsumeVec::new("callback of apply context"),
apply_res: vec![],
wb_last_bytes: 0,
wb_last_keys: 0,
kv_wb_last_bytes: 0,
kv_wb_last_keys: 0,
last_applied_index: 0,
committed_count: 0,
enable_sync_log: cfg.sync_log,
Expand All @@ -341,10 +341,10 @@ impl ApplyContext {
/// `prepare_for` -> `commit` [-> `commit` ...] -> `finish_for`.
/// After all delegates are handled, `write_to_db` method should be called.
pub fn prepare_for(&mut self, delegate: &ApplyDelegate) {
if self.wb.is_none() {
self.wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE));
self.wb_last_bytes = 0;
self.wb_last_keys = 0;
if self.kv_wb.is_none() {
self.kv_wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE));
self.kv_wb_last_bytes = 0;
self.kv_wb_last_keys = 0;
}
self.cbs.push(ApplyCallback::new(delegate.region.clone()));
self.last_applied_index = delegate.apply_state.get_applied_index();
Expand All @@ -356,7 +356,7 @@ impl ApplyContext {
/// This call is valid only when it's between a `prepare_for` and `finish_for`.
pub fn commit(&mut self, delegate: &mut ApplyDelegate) {
if self.last_applied_index < delegate.apply_state.get_applied_index() {
delegate.write_apply_state(&self.engines, self.wb.as_mut().unwrap());
delegate.write_apply_state(&self.engines, self.kv_wb.as_mut().unwrap());
}
// last_applied_index doesn't need to be updated, set persistent to true will
// force it call `prepare_for` automatically.
Expand All @@ -369,32 +369,32 @@ impl ApplyContext {
self.write_to_db();
self.prepare_for(delegate);
}
self.wb_last_bytes = self.wb().data_size() as u64;
self.wb_last_keys = self.wb().count() as u64;
self.kv_wb_last_bytes = self.kv_wb().data_size() as u64;
self.kv_wb_last_keys = self.kv_wb().count() as u64;
}

/// Writes all the changes into RocksDB.
pub fn write_to_db(&mut self) {
if self.wb.as_ref().map_or(false, |wb| !wb.is_empty()) {
if self.kv_wb.as_ref().map_or(false, |wb| !wb.is_empty()) {
let mut write_opts = WriteOptions::new();
write_opts.set_sync(self.enable_sync_log && self.sync_log_hint);
self.engines
.kv
.write_opt(self.wb(), &write_opts)
.write_opt(self.kv_wb(), &write_opts)
.unwrap_or_else(|e| {
panic!("failed to write to engine: {:?}", e);
});
self.sync_log_hint = false;
let data_size = self.wb().data_size();
let data_size = self.kv_wb().data_size();
if data_size > APPLY_WB_SHRINK_SIZE {
// Control the memory usage for the WriteBatch.
self.wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE));
self.kv_wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE));
} else {
// Clear data, reuse the WriteBatch, this can reduce memory allocations and deallocations.
self.wb().clear();
self.kv_wb().clear();
}
self.wb_last_bytes = 0;
self.wb_last_keys = 0;
self.kv_wb_last_bytes = 0;
self.kv_wb_last_keys = 0;
}
for cbs in self.cbs.drain(..) {
cbs.invoke_all(&self.host);
Expand All @@ -404,7 +404,7 @@ impl ApplyContext {
/// Finishes `Apply`s for the delegate.
pub fn finish_for(&mut self, delegate: &mut ApplyDelegate, results: VecDeque<ExecResult>) {
if !delegate.pending_remove {
delegate.write_apply_state(&self.engines, self.wb.as_mut().unwrap());
delegate.write_apply_state(&self.engines, self.kv_wb.as_mut().unwrap());
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
Expand All @@ -418,21 +418,21 @@ impl ApplyContext {
}

pub fn delta_bytes(&self) -> u64 {
self.wb().data_size() as u64 - self.wb_last_bytes
self.kv_wb().data_size() as u64 - self.kv_wb_last_bytes
}

pub fn delta_keys(&self) -> u64 {
self.wb().count() as u64 - self.wb_last_keys
self.kv_wb().count() as u64 - self.kv_wb_last_keys
}

#[inline]
pub fn wb(&self) -> &WriteBatch {
self.wb.as_ref().unwrap()
pub fn kv_wb(&self) -> &WriteBatch {
self.kv_wb.as_ref().unwrap()
}

#[inline]
pub fn wb_mut(&mut self) -> &mut WriteBatch {
self.wb.as_mut().unwrap()
pub fn kv_kv_wb_mut(&mut self) -> &mut WriteBatch {
self.kv_wb.as_mut().unwrap()
}

pub fn flush(&mut self) {
Expand Down Expand Up @@ -508,7 +508,7 @@ pub fn notify_stale_req(term: u64, cb: Callback) {
}

/// Checks if a write is needed to be issued before handling the command.
fn should_write_to_engine(cmd: &RaftCmdRequest, wb_keys: usize) -> bool {
fn should_write_to_engine(cmd: &RaftCmdRequest, kv_wb_keys: usize) -> bool {
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// ComputeHash require an up to date snapshot.
Expand All @@ -522,7 +522,7 @@ fn should_write_to_engine(cmd: &RaftCmdRequest, wb_keys: usize) -> bool {

// When write batch contains more than `recommended` keys, write the batch
// to engine.
if wb_keys >= WRITE_BATCH_MAX_KEYS {
if kv_wb_keys >= WRITE_BATCH_MAX_KEYS {
return true;
}

Expand Down Expand Up @@ -777,7 +777,7 @@ impl ApplyDelegate {
if !data.is_empty() {
let cmd = util::parse_data_at(data, index, &self.tag);

if should_write_to_engine(&cmd, apply_ctx.wb().count()) {
if should_write_to_engine(&cmd, apply_ctx.kv_wb().count()) {
apply_ctx.commit(self);
}

Expand Down Expand Up @@ -913,15 +913,15 @@ impl ApplyDelegate {
assert!(!self.pending_remove);

ctx.exec_ctx = Some(self.new_ctx(index, term));
ctx.wb_mut().set_save_point();
ctx.kv_kv_wb_mut().set_save_point();
let (resp, exec_result) = match self.exec_raft_cmd(ctx, req) {
Ok(a) => {
ctx.wb_mut().pop_save_point().unwrap();
ctx.kv_kv_wb_mut().pop_save_point().unwrap();
a
}
Err(e) => {
// clear dirty values.
ctx.wb_mut().rollback_to_save_point().unwrap();
ctx.kv_kv_wb_mut().rollback_to_save_point().unwrap();
match e {
Error::EpochNotMatch(..) => debug!(
"epoch not match";
Expand Down Expand Up @@ -1151,7 +1151,7 @@ impl ApplyDelegate {
}
// TODO: check whether cf exists or not.
rocks::util::get_cf_handle(&ctx.engines.kv, cf)
.and_then(|handle| ctx.wb().put_cf(handle, &key, value).map_err(Into::into))
.and_then(|handle| ctx.kv_wb().put_cf(handle, &key, value).map_err(Into::into))
.unwrap_or_else(|e| {
panic!(
"{} failed to write ({}, {}) to cf {}: {:?}",
Expand All @@ -1163,7 +1163,7 @@ impl ApplyDelegate {
)
});
} else {
ctx.wb().put(&key, value).unwrap_or_else(|e| {
ctx.kv_wb().put(&key, value).unwrap_or_else(|e| {
panic!(
"{} failed to write ({}, {}): {:?}",
self.tag,
Expand All @@ -1189,7 +1189,7 @@ impl ApplyDelegate {
let cf = req.get_delete().get_cf();
// TODO: check whether cf exists or not.
rocks::util::get_cf_handle(&ctx.engines.kv, cf)
.and_then(|handle| ctx.wb().delete_cf(handle, &key).map_err(Into::into))
.and_then(|handle| ctx.kv_wb().delete_cf(handle, &key).map_err(Into::into))
.unwrap_or_else(|e| {
panic!("{} failed to delete {}: {:?}", self.tag, escape(&key), e)
});
Expand All @@ -1201,7 +1201,7 @@ impl ApplyDelegate {
self.metrics.delete_keys_hint += 1;
}
} else {
ctx.wb().delete(&key).unwrap_or_else(|e| {
ctx.kv_wb().delete(&key).unwrap_or_else(|e| {
panic!("{} failed to delete {}: {:?}", self.tag, escape(&key), e)
});
self.metrics.delete_keys_hint += 1;
Expand Down Expand Up @@ -1497,8 +1497,8 @@ impl ApplyDelegate {
} else {
PeerState::Normal
};
let wb_mut = ctx.wb.as_mut().unwrap();
if let Err(e) = write_peer_state(&ctx.engines.kv, wb_mut, &region, state, None) {
let kv_wb_mut = ctx.kv_wb.as_mut().unwrap();
if let Err(e) = write_peer_state(&ctx.engines.kv, kv_wb_mut, &region, state, None) {
panic!("{} failed to update region state: {:?}", self.tag, e);
}

Expand Down Expand Up @@ -1609,7 +1609,7 @@ impl ApplyDelegate {
regions.push(derived.clone());
}
let kv = &ctx.engines.kv;
let wb_mut = ctx.wb.as_mut().unwrap();
let kv_wb_mut = ctx.kv_wb.as_mut().unwrap();
for req in split_reqs.get_requests() {
let mut new_region = Region::new();
// TODO: check new region id validation.
Expand All @@ -1625,8 +1625,8 @@ impl ApplyDelegate {
{
peer.set_id(*peer_id);
}
write_peer_state(kv, wb_mut, &new_region, PeerState::Normal, None)
.and_then(|_| write_initial_apply_state(kv, wb_mut, new_region.get_id()))
write_peer_state(kv, kv_wb_mut, &new_region, PeerState::Normal, None)
.and_then(|_| write_initial_apply_state(kv, kv_wb_mut, new_region.get_id()))
.unwrap_or_else(|e| {
panic!(
"{} fails to save split region {:?}: {:?}",
Expand All @@ -1639,7 +1639,7 @@ impl ApplyDelegate {
derived.set_start_key(keys.pop_front().unwrap());
regions.push(derived.clone());
}
write_peer_state(kv, wb_mut, &derived, PeerState::Normal, None).unwrap_or_else(|e| {
write_peer_state(kv, kv_wb_mut, &derived, PeerState::Normal, None).unwrap_or_else(|e| {
panic!("{} fails to update region {:?}: {:?}", self.tag, derived, e)
});
let mut resp = AdminResponse::new();
Expand Down Expand Up @@ -1692,7 +1692,7 @@ impl ApplyDelegate {
merging_state.set_commit(exec_ctx.index);
write_peer_state(
&ctx.engines.kv,
ctx.wb.as_mut().unwrap(),
ctx.kv_wb.as_mut().unwrap(),
&region,
PeerState::Merging,
Some(merging_state.clone()),
Expand Down Expand Up @@ -1863,15 +1863,15 @@ impl ApplyDelegate {
region.set_start_key(source_region.get_start_key().to_vec());
}
let kv = &ctx.engines.kv;
let wb_mut = ctx.wb.as_mut().unwrap();
write_peer_state(kv, wb_mut, &region, PeerState::Normal, None)
let kv_wb_mut = ctx.kv_wb.as_mut().unwrap();
write_peer_state(kv, kv_wb_mut, &region, PeerState::Normal, None)
.and_then(|_| {
// TODO: maybe all information needs to be filled?
let mut merging_state = MergeState::new();
merging_state.set_target(self.region.clone());
write_peer_state(
kv,
wb_mut,
kv_wb_mut,
source_region,
PeerState::Tombstone,
Some(merging_state),
Expand Down Expand Up @@ -1924,8 +1924,8 @@ impl ApplyDelegate {
// Update version to avoid duplicated rollback requests.
region.mut_region_epoch().set_version(version + 1);
let kv = &ctx.engines.kv;
let wb_mut = ctx.wb.as_mut().unwrap();
write_peer_state(kv, wb_mut, &region, PeerState::Normal, None).unwrap_or_else(|e| {
let kv_wb_mut = ctx.kv_wb.as_mut().unwrap();
write_peer_state(kv, kv_wb_mut, &region, PeerState::Normal, None).unwrap_or_else(|e| {
panic!(
"{} failed to rollback merge {:?}: {:?}",
self.tag, rollback, e
Expand Down

0 comments on commit b780a80

Please sign in to comment.