Skip to content

Commit

Permalink
tests: use channel in block_engine (tikv#1724)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored and siddontang committed Apr 7, 2017
1 parent 965dab2 commit ce55342
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 17 deletions.
2 changes: 1 addition & 1 deletion tests/pd/mock/mocker/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ impl Mocker for AlreadyBootstrap {

Some(Ok(resp))
}
}
}
14 changes: 5 additions & 9 deletions tests/storage/test_raft_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

use std::sync::mpsc::channel;
use std::time::Duration;
use std::thread;
use tikv::util::HandyRwLock;
use tikv::storage::{self, Storage, Mutation, make_key, ALL_CFS, Options, Engine};
use tikv::storage::{txn, engine, mvcc};
Expand Down Expand Up @@ -146,24 +145,21 @@ fn test_engine_leader_change_twice() {
fn test_scheduler_leader_change_twice() {
let mut cluster = new_server_cluster_with_cfs(0, 2, ALL_CFS);
cluster.run();

let region = cluster.get_region(b"");
let peers = region.get_peers();

cluster.must_transfer_leader(region.get_id(), peers[0].clone());
let engine = cluster.sim.rl().storages[&peers[0].get_id()].clone();
let engine = util::BlockEngine::new(engine);
let mut engine = util::BlockEngine::new(engine);
let config = Config::default();
let mut storage = Storage::from_engine(engine.clone(), &config).unwrap();
storage.start(&config).unwrap();

let mut ctx = Context::new();
ctx.set_region_id(region.get_id());
ctx.set_region_epoch(region.get_region_epoch().clone());
ctx.set_peer(peers[0].clone());

let (tx, rx) = channel();
engine.block_snapshot();
let (stx, srx) = channel();
engine.block_snapshot(stx.clone());
storage.async_prewrite(ctx.clone(),
vec![Mutation::Put((make_key(b"k"), b"v".to_vec()))],
b"k".to_vec(),
Expand All @@ -180,8 +176,8 @@ fn test_scheduler_leader_change_twice() {
tx.send(1).unwrap();
})
.unwrap();
// Sleep a while, the prewrite should be blocked at snapshot stage.
thread::sleep(Duration::from_millis(200));
// wait for the message, the prewrite should be blocked at snapshot stage.
srx.recv_timeout(Duration::from_secs(2)).unwrap();
// Transfer leader twice, then unblock snapshot.
cluster.must_transfer_leader(region.get_id(), peers[1].clone());
cluster.must_transfer_leader(region.get_id(), peers[0].clone());
Expand Down
7 changes: 4 additions & 3 deletions tests/storage/test_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,12 +675,12 @@ fn bench_txn_store_rocksdb_put_x100(b: &mut Bencher) {
}

fn test_storage_1gc_with_engine(engine: Box<Engine>, ctx: Context) {
let engine = util::BlockEngine::new(engine);
let mut engine = util::BlockEngine::new(engine);
let config = Config::default();
let mut storage = Storage::from_engine(engine.clone(), &config).unwrap();
storage.start(&config).unwrap();

engine.block_snapshot();
let (stx, srx) = channel();
engine.block_snapshot(stx);
let (tx1, rx1) = channel();
storage.async_gc(ctx.clone(),
1,
Expand All @@ -703,6 +703,7 @@ fn test_storage_1gc_with_engine(engine: Box<Engine>, ctx: Context) {
})
.unwrap();

srx.recv_timeout(Duration::from_secs(2)).unwrap();
rx2.recv().unwrap();
engine.unblock_snapshot();
rx1.recv().unwrap();
Expand Down
37 changes: 33 additions & 4 deletions tests/storage/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::time::Duration;
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::mpsc::Sender;
use std::sync::Mutex;
use tikv::storage::{Engine, Snapshot, Modify, ALL_CFS};
use tikv::storage::engine::{Callback, Result};
use tikv::storage::config::Config;
Expand All @@ -30,6 +32,7 @@ pub struct BlockEngine {
engine: Box<Engine>,
block_write: Arc<AtomicBool>,
block_snapshot: Arc<AtomicBool>,
sender: Arc<Mutex<Option<Sender<bool>>>>,
}

impl BlockEngine {
Expand All @@ -38,35 +41,58 @@ impl BlockEngine {
engine: engine,
block_write: Arc::new(AtomicBool::new(false)),
block_snapshot: Arc::new(AtomicBool::new(false)),
sender: Arc::new(Mutex::new(None)),
}
}

#[allow(dead_code)]
pub fn block_write(&self) {
fn set_sender(&mut self, sender: Option<Sender<bool>>) {
let mut data = self.sender.lock().unwrap();
*data = sender;
}

#[allow(dead_code)]
pub fn block_write(&mut self, sender: Sender<bool>) {
self.block_write.store(true, Ordering::SeqCst);
self.set_sender(Some(sender));
}

#[allow(dead_code)]
pub fn unblock_write(&self) {
pub fn unblock_write(&mut self) {
self.block_write.store(false, Ordering::SeqCst);
self.set_sender(None);
}

pub fn block_snapshot(&self) {
pub fn block_snapshot(&mut self, sender: Sender<bool>) {
self.block_snapshot.store(true, Ordering::SeqCst);
self.set_sender(Some(sender));
}

pub fn unblock_snapshot(&self) {
pub fn unblock_snapshot(&mut self) {
self.block_snapshot.store(false, Ordering::SeqCst);
self.set_sender(None);
}
}

// try_notify tries to send message when block status = true && sender is not none.
fn try_notify(block: Arc<AtomicBool>, sender: Arc<Mutex<Option<Sender<bool>>>>) {
if !block.load(Ordering::SeqCst) {
return;
}
if let Some(s) = sender.lock().unwrap().as_ref() {
s.send(true).unwrap();
}
}

impl Engine for BlockEngine {
fn async_write(&self, ctx: &Context, batch: Vec<Modify>, callback: Callback<()>) -> Result<()> {
let block_write = self.block_write.clone();
let sender = self.sender.clone();
self.engine.async_write(ctx,
batch,
box move |res| {
thread::spawn(move || {
try_notify(block_write.clone(), sender);
while block_write.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(50));
}
Expand All @@ -77,9 +103,11 @@ impl Engine for BlockEngine {

fn async_snapshot(&self, ctx: &Context, callback: Callback<Box<Snapshot>>) -> Result<()> {
let block_snapshot = self.block_snapshot.clone();
let sender = self.sender.clone();
self.engine.async_snapshot(ctx,
box move |res| {
thread::spawn(move || {
try_notify(block_snapshot.clone(), sender);
while block_snapshot.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(50));
}
Expand All @@ -93,6 +121,7 @@ impl Engine for BlockEngine {
engine: self.engine.clone(),
block_write: self.block_write.clone(),
block_snapshot: self.block_snapshot.clone(),
sender: self.sender.clone(),
}
}
}
Expand Down

0 comments on commit ce55342

Please sign in to comment.