Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: block in-memory pessimistic locks during the flashback #14859

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions components/raftstore-v2/src/operation/command/admin/flashback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use kvproto::{
use protobuf::Message;
use raftstore::{
coprocessor::RegionChangeReason,
store::metrics::{PEER_ADMIN_CMD_COUNTER, PEER_IN_FLASHBACK_STATE},
store::{
metrics::{PEER_ADMIN_CMD_COUNTER, PEER_IN_FLASHBACK_STATE},
LocksStatus,
},
Result,
};

Expand Down Expand Up @@ -85,7 +88,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn on_apply_res_flashback<T>(
&mut self,
store_ctx: &mut StoreContext<EK, ER, T>,
mut res: FlashbackResult,
#[allow(unused_mut)] mut res: FlashbackResult,
) {
(|| {
fail_point!("keep_peer_fsm_flashback_state_false", |_| {
Expand Down Expand Up @@ -114,6 +117,22 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
.unwrap();
self.set_has_extra_write();

let mut pessimistic_locks = self.txn_context().ext().pessimistic_locks.write();
pessimistic_locks.status = if res.region_state.get_region().is_in_flashback {
// To prevent the insertion of any new pessimistic locks, set the lock status
// to `LocksStatus::IsInFlashback` and clear all the existing locks.
pessimistic_locks.clear();
LocksStatus::IsInFlashback
} else if self.is_leader() {
// If the region is not in flashback, the leader can continue to insert
// pessimistic locks.
LocksStatus::Normal
} else {
// If the region is not in flashback and the peer is not the leader, it
// cannot insert pessimistic locks.
LocksStatus::NotLeader
}

// Compares to v1, v2 does not expire remote lease, because only
// local reader can serve read requests.
}
Expand Down
15 changes: 15 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6489,6 +6489,21 @@ where
})());
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
let mut pessimistic_locks = self.fsm.peer.txn_ext.pessimistic_locks.write();
pessimistic_locks.status = if self.region().is_in_flashback {
// To prevent the insertion of any new pessimistic locks, set the lock status
// to `LocksStatus::IsInFlashback` and clear all the existing locks.
pessimistic_locks.clear();
LocksStatus::IsInFlashback
} else if self.fsm.peer.is_leader() {
// If the region is not in flashback, the leader can continue to insert
// pessimistic locks.
LocksStatus::Normal
} else {
// If the region is not in flashback and the peer is not the leader, it
// cannot insert pessimistic locks.
LocksStatus::NotLeader
}
}

fn on_ready_batch_switch_witness(&mut self, sw: SwitchWitness) {
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/txn_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub enum LocksStatus {
TransferringLeader,
MergingRegion,
NotLeader,
IsInFlashback,
}

impl fmt::Debug for PeerPessimisticLocks {
Expand Down
10 changes: 9 additions & 1 deletion components/test_raftstore-v2/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,14 @@ impl<EK: KvEngine> Simulator<EK> for ServerCluster<EK> {

impl<EK: KvEngine> Cluster<ServerCluster<EK>, EK> {
pub fn must_get_snapshot_of_region(&mut self, region_id: u64) -> RegionSnapshot<EK::Snapshot> {
self.must_get_snapshot_of_region_with_ctx(region_id, SnapContext::default())
}

pub fn must_get_snapshot_of_region_with_ctx(
&mut self,
region_id: u64,
snap_ctx: SnapContext<'_>,
) -> RegionSnapshot<EK::Snapshot> {
let mut try_snapshot = || -> Option<RegionSnapshot<EK::Snapshot>> {
let leader = self.leader_of_region(region_id)?;
let store_id = leader.store_id;
Expand All @@ -897,7 +905,7 @@ impl<EK: KvEngine> Cluster<ServerCluster<EK>, EK> {
let mut storage = self.sim.rl().storages.get(&store_id).unwrap().clone();
let snap_ctx = SnapContext {
pb_ctx: &ctx,
..Default::default()
..snap_ctx.clone()
};
storage.snapshot(snap_ctx).ok()
};
Expand Down
10 changes: 9 additions & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,14 @@ impl Simulator for ServerCluster {

impl Cluster<ServerCluster> {
pub fn must_get_snapshot_of_region(&mut self, region_id: u64) -> RegionSnapshot<RocksSnapshot> {
self.must_get_snapshot_of_region_with_ctx(region_id, Default::default())
}

pub fn must_get_snapshot_of_region_with_ctx(
&mut self,
region_id: u64,
snap_ctx: SnapContext<'_>,
) -> RegionSnapshot<RocksSnapshot> {
let mut try_snapshot = || -> Option<RegionSnapshot<RocksSnapshot>> {
let leader = self.leader_of_region(region_id)?;
let store_id = leader.store_id;
Expand All @@ -800,7 +808,7 @@ impl Cluster<ServerCluster> {
let mut storage = self.sim.rl().storages.get(&store_id).unwrap().clone();
let snap_ctx = SnapContext {
pb_ctx: &ctx,
..Default::default()
..snap_ctx.clone()
};
storage.snapshot(snap_ctx).ok()
};
Expand Down
18 changes: 17 additions & 1 deletion src/storage/mvcc/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::ops::Bound;

use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE};
use kvproto::{
errorpb::{self, EpochNotMatch, StaleCommand},
errorpb::{self, EpochNotMatch, FlashbackInProgress, StaleCommand},
kvrpcpb::Context,
};
use raftstore::store::LocksStatus;
use tikv_kv::{SnapshotExt, SEEK_BOUND};
use txn_types::{Key, Lock, OldValue, TimeStamp, Value, Write, WriteRef, WriteType};

Expand Down Expand Up @@ -146,6 +147,8 @@ pub struct MvccReader<S: EngineSnapshot> {
term: u64,
#[allow(dead_code)]
version: u64,

allow_in_flashback: bool,
}

impl<S: EngineSnapshot> MvccReader<S> {
Expand All @@ -164,6 +167,7 @@ impl<S: EngineSnapshot> MvccReader<S> {
fill_cache,
term: 0,
version: 0,
allow_in_flashback: false,
}
}

Expand All @@ -182,6 +186,7 @@ impl<S: EngineSnapshot> MvccReader<S> {
fill_cache: !ctx.get_not_fill_cache(),
term: ctx.get_term(),
version: ctx.get_region_epoch().get_version(),
allow_in_flashback: false,
}
}

Expand Down Expand Up @@ -267,6 +272,13 @@ impl<S: EngineSnapshot> MvccReader<S> {
err.set_epoch_not_match(EpochNotMatch::default());
return Some(Err(KvError::from(err).into()));
}
// If the region is in the flashback state, it should not be allowed to read the
// locks.
if locks.status == LocksStatus::IsInFlashback && !self.allow_in_flashback {
let mut err = errorpb::Error::default();
err.set_flashback_in_progress(FlashbackInProgress::default());
return Some(Err(KvError::from(err).into()));
}

locks.get(key).map(|(lock, _)| {
// For write commands that are executed in serial, it should be impossible
Expand Down Expand Up @@ -768,6 +780,10 @@ impl<S: EngineSnapshot> MvccReader<S> {
pub fn snapshot(&self) -> &S {
&self.snapshot
}

pub fn set_allow_in_flashback(&mut self, set_allow_in_flashback: bool) {
self.allow_in_flashback = set_allow_in_flashback;
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/storage/txn/commands/flashback_to_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for FlashbackToVersion {
fn process_write(mut self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
let mut reader =
MvccReader::new_with_ctx(snapshot.clone(), Some(ScanMode::Forward), &self.ctx);
reader.set_allow_in_flashback(true);
let mut txn = MvccTxn::new(TimeStamp::zero(), context.concurrency_manager);
match self.state {
FlashbackToVersionState::RollbackLock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ impl<S: Snapshot> ReadCommand<S> for FlashbackToVersionReadPhase {
fn process_read(self, snapshot: S, statistics: &mut Statistics) -> Result<ProcessResult> {
let tag = self.tag().get_str();
let mut reader = MvccReader::new_with_ctx(snapshot, Some(ScanMode::Forward), &self.ctx);
reader.set_allow_in_flashback(true);
// Filter out the SST that does not have a newer version than `self.version` in
// `CF_WRITE`, i.e, whose latest `commit_ts` <= `self.version` in the later
// scan. By doing this, we can only flashback those keys that have version
Expand Down
66 changes: 64 additions & 2 deletions tests/integrations/raftstore/test_flashback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,76 @@ use kvproto::{
raft_cmdpb::{AdminCmdType, CmdType, RaftCmdRequest, RaftCmdResponse, Request},
raft_serverpb::RegionLocalState,
};
use raftstore::store::Callback;
use raftstore::store::{Callback, LocksStatus};
use test_raftstore::*;
use test_raftstore_macro::test_case;
use txn_types::WriteBatchFlags;
use tikv::storage::kv::SnapContext;
use txn_types::{Key, PessimisticLock, WriteBatchFlags};

const TEST_KEY: &[u8] = b"k1";
const TEST_VALUE: &[u8] = b"v1";

#[test_case(test_raftstore::new_server_cluster)]
#[test_case(test_raftstore_v2::new_server_cluster)]
fn test_flashback_with_in_memory_pessimistic_locks() {
let mut cluster = new_cluster(0, 3);
cluster.cfg.raft_store.raft_heartbeat_ticks = 20;
cluster.run();
cluster.must_transfer_leader(1, new_peer(1, 1));

let region = cluster.get_region(TEST_KEY);
// Write a pessimistic lock to the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region(region.get_id());
let txn_ext = snapshot.txn_ext.unwrap();
let mut pessimistic_locks = txn_ext.pessimistic_locks.write();
assert!(pessimistic_locks.is_writable());
pessimistic_locks
.insert(vec![(
Key::from_raw(TEST_KEY),
PessimisticLock {
primary: TEST_KEY.to_vec().into_boxed_slice(),
start_ts: 10.into(),
ttl: 3000,
for_update_ts: 20.into(),
min_commit_ts: 30.into(),
last_change_ts: 5.into(),
versions_to_last_change: 3,
is_locked_with_conflict: false,
},
)])
.unwrap();
assert_eq!(pessimistic_locks.len(), 1);
}
// Prepare flashback.
cluster.must_send_wait_flashback_msg(region.get_id(), AdminCmdType::PrepareFlashback);
// Check the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region_with_ctx(
region.get_id(),
SnapContext {
allowed_in_flashback: true,
..Default::default()
},
);
let txn_ext = snapshot.txn_ext.unwrap();
let pessimistic_locks = txn_ext.pessimistic_locks.read();
assert!(!pessimistic_locks.is_writable());
assert_eq!(pessimistic_locks.status, LocksStatus::IsInFlashback);
assert_eq!(pessimistic_locks.len(), 0);
}
// Finish flashback.
cluster.must_send_wait_flashback_msg(region.get_id(), AdminCmdType::FinishFlashback);
// Check the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region(region.get_id());
let txn_ext = snapshot.txn_ext.unwrap();
let pessimistic_locks = txn_ext.pessimistic_locks.read();
assert!(pessimistic_locks.is_writable());
assert_eq!(pessimistic_locks.len(), 0);
}
}

#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
fn test_allow_read_only_request() {
Expand Down