Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: block in-memory pessimistic locks during the flashback #14859

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions components/raftstore-v2/src/operation/command/admin/flashback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{KvEngine, RaftEngine, RaftLogBatch};
#[cfg(feature = "failpoints")]
use fail::fail_point;
use kvproto::{
raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, RaftCmdRequest},
Expand All @@ -9,7 +10,10 @@ use kvproto::{
use protobuf::Message;
use raftstore::{
coprocessor::RegionChangeReason,
store::metrics::{PEER_ADMIN_CMD_COUNTER, PEER_IN_FLASHBACK_STATE},
store::{
metrics::{PEER_ADMIN_CMD_COUNTER, PEER_IN_FLASHBACK_STATE},
LocksStatus,
},
Result,
};

Expand Down Expand Up @@ -85,8 +89,10 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn on_apply_res_flashback<T>(
&mut self,
store_ctx: &mut StoreContext<EK, ER, T>,
mut res: FlashbackResult,
#[cfg(not(feature = "failpoints"))] res: FlashbackResult,
#[cfg(feature = "failpoints")] mut res: FlashbackResult,
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
) {
#[cfg(feature = "failpoints")]
(|| {
fail_point!("keep_peer_fsm_flashback_state_false", |_| {
res.region_state.mut_region().set_is_in_flashback(false);
Expand Down Expand Up @@ -114,6 +120,22 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
.unwrap();
self.set_has_extra_write();

let mut pessimistic_locks = self.txn_context().ext().pessimistic_locks.write();
pessimistic_locks.status = if res.region_state.get_region().is_in_flashback {
// To prevent the insertion of any new pessimistic locks, set the lock status
// to `LocksStatus::IsInFlashback` and clear all the existing locks.
pessimistic_locks.clear();
LocksStatus::IsInFlashback
} else if self.is_leader() {
// If the region is not in flashback, the leader can continue to insert
// pessimistic locks.
LocksStatus::Normal
} else {
// If the region is not in flashback and the peer is not the leader, it
// cannot insert pessimistic locks.
LocksStatus::NotLeader
}

// Compares to v1, v2 does not expire remote lease, because only
// local reader can serve read requests.
}
Expand Down
15 changes: 15 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6489,6 +6489,21 @@ where
})());
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
let mut pessimistic_locks = self.fsm.peer.txn_ext.pessimistic_locks.write();
pessimistic_locks.status = if self.region().is_in_flashback {
// To prevent the insertion of any new pessimistic locks, set the lock status
// to `LocksStatus::IsInFlashback` and clear all the existing locks.
pessimistic_locks.clear();
LocksStatus::IsInFlashback
} else if self.fsm.peer.is_leader() {
// If the region is not in flashback, the leader can continue to insert
// pessimistic locks.
LocksStatus::Normal
} else {
// If the region is not in flashback and the peer is not the leader, it
// cannot insert pessimistic locks.
LocksStatus::NotLeader
}
}

fn on_ready_batch_switch_witness(&mut self, sw: SwitchWitness) {
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/txn_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub enum LocksStatus {
TransferringLeader,
MergingRegion,
NotLeader,
IsInFlashback,
}

impl fmt::Debug for PeerPessimisticLocks {
Expand Down
10 changes: 9 additions & 1 deletion components/test_raftstore-v2/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,14 @@ impl<EK: KvEngine> Simulator<EK> for ServerCluster<EK> {

impl<EK: KvEngine> Cluster<ServerCluster<EK>, EK> {
pub fn must_get_snapshot_of_region(&mut self, region_id: u64) -> RegionSnapshot<EK::Snapshot> {
self.must_get_snapshot_of_region_with_ctx(region_id, SnapContext::default())
}

pub fn must_get_snapshot_of_region_with_ctx(
&mut self,
region_id: u64,
snap_ctx: SnapContext,
) -> RegionSnapshot<EK::Snapshot> {
let mut try_snapshot = || -> Option<RegionSnapshot<EK::Snapshot>> {
let leader = self.leader_of_region(region_id)?;
let store_id = leader.store_id;
Expand All @@ -897,7 +905,7 @@ impl<EK: KvEngine> Cluster<ServerCluster<EK>, EK> {
let mut storage = self.sim.rl().storages.get(&store_id).unwrap().clone();
let snap_ctx = SnapContext {
pb_ctx: &ctx,
..Default::default()
..snap_ctx.clone()
};
storage.snapshot(snap_ctx).ok()
};
Expand Down
10 changes: 9 additions & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,14 @@ impl Simulator for ServerCluster {

impl Cluster<ServerCluster> {
pub fn must_get_snapshot_of_region(&mut self, region_id: u64) -> RegionSnapshot<RocksSnapshot> {
self.must_get_snapshot_of_region_with_ctx(region_id, Default::default())
}

pub fn must_get_snapshot_of_region_with_ctx(
&mut self,
region_id: u64,
snap_ctx: SnapContext,
) -> RegionSnapshot<RocksSnapshot> {
let mut try_snapshot = || -> Option<RegionSnapshot<RocksSnapshot>> {
let leader = self.leader_of_region(region_id)?;
let store_id = leader.store_id;
Expand All @@ -800,7 +808,7 @@ impl Cluster<ServerCluster> {
let mut storage = self.sim.rl().storages.get(&store_id).unwrap().clone();
let snap_ctx = SnapContext {
pb_ctx: &ctx,
..Default::default()
..snap_ctx.clone()
};
storage.snapshot(snap_ctx).ok()
};
Expand Down
10 changes: 9 additions & 1 deletion src/storage/mvcc/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::ops::Bound;

use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE};
use kvproto::{
errorpb::{self, EpochNotMatch, StaleCommand},
errorpb::{self, EpochNotMatch, FlashbackInProgress, StaleCommand},
kvrpcpb::Context,
};
use raftstore::store::LocksStatus;
use tikv_kv::{SnapshotExt, SEEK_BOUND};
use txn_types::{Key, Lock, OldValue, TimeStamp, Value, Write, WriteRef, WriteType};

Expand Down Expand Up @@ -267,6 +268,13 @@ impl<S: EngineSnapshot> MvccReader<S> {
err.set_epoch_not_match(EpochNotMatch::default());
return Some(Err(KvError::from(err).into()));
}
// If the region is in the flashback state, it should not be allowed to read the
// locks.
if locks.status == LocksStatus::IsInFlashback {
let mut err = errorpb::Error::default();
err.set_flashback_in_progress(FlashbackInProgress::default());
return Some(Err(KvError::from(err).into()));
}

locks.get(key).map(|(lock, _)| {
// For write commands that are executed in serial, it should be impossible
Expand Down
64 changes: 62 additions & 2 deletions tests/integrations/raftstore/test_flashback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,74 @@ use kvproto::{
raft_cmdpb::{AdminCmdType, CmdType, RaftCmdRequest, RaftCmdResponse, Request},
raft_serverpb::RegionLocalState,
};
use raftstore::store::Callback;
use raftstore::store::{Callback, LocksStatus};
use test_raftstore::*;
use test_raftstore_macro::test_case;
use txn_types::WriteBatchFlags;
use tikv::storage::kv::SnapContext;
use txn_types::{Key, PessimisticLock, WriteBatchFlags};

const TEST_KEY: &[u8] = b"k1";
const TEST_VALUE: &[u8] = b"v1";

#[test_case(test_raftstore::new_server_cluster)]
#[test_case(test_raftstore_v2::new_server_cluster)]
fn test_flashback_with_in_memory_pessimistic_locks() {
let mut cluster = new_cluster(0, 3);
cluster.cfg.raft_store.raft_heartbeat_ticks = 20;
cluster.run();
cluster.must_transfer_leader(1, new_peer(1, 1));

let region = cluster.get_region(TEST_KEY);
// Write a pessimistic lock to the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region(region.get_id());
let txn_ext = snapshot.txn_ext.unwrap();
let lock = PessimisticLock {
primary: TEST_KEY.to_vec().into_boxed_slice(),
start_ts: 10.into(),
ttl: 3000,
for_update_ts: 20.into(),
min_commit_ts: 30.into(),
last_change_ts: 5.into(),
versions_to_last_change: 3,
is_locked_with_conflict: false,
};
let mut pessimistic_locks = txn_ext.pessimistic_locks.write();
assert!(pessimistic_locks.is_writable());
pessimistic_locks
.insert(vec![(Key::from_raw(TEST_KEY), lock.clone())])
.unwrap();
assert_eq!(pessimistic_locks.len(), 1);
}
// Prepare flashback.
cluster.must_send_wait_flashback_msg(region.get_id(), AdminCmdType::PrepareFlashback);
// Check the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region_with_ctx(
region.get_id(),
SnapContext {
allowed_in_flashback: true,
..Default::default()
},
);
let txn_ext = snapshot.txn_ext.unwrap();
let pessimistic_locks = txn_ext.pessimistic_locks.read();
assert!(!pessimistic_locks.is_writable());
assert_eq!(pessimistic_locks.status, LocksStatus::IsInFlashback);
assert_eq!(pessimistic_locks.len(), 0);
}
// Finish flashback.
cluster.must_send_wait_flashback_msg(region.get_id(), AdminCmdType::FinishFlashback);
// Check the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region(region.get_id());
let txn_ext = snapshot.txn_ext.unwrap();
let pessimistic_locks = txn_ext.pessimistic_locks.read();
assert!(pessimistic_locks.is_writable());
assert_eq!(pessimistic_locks.len(), 0);
}
}

#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
fn test_allow_read_only_request() {
Expand Down