diff --git a/Cargo.lock b/Cargo.lock index 555af682cf4..2d33f8b5930 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,7 +328,7 @@ dependencies = [ "crossbeam-channel 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-deque 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -340,7 +340,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -361,7 +361,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-deque" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -385,12 +394,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-epoch" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-queue" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-utils" version = "0.2.2" @@ -401,10 +431,11 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.6.1" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -794,11 +825,11 @@ dependencies = [ "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2052,7 +2083,7 @@ dependencies = [ "tempdir 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "tikv 3.0.0-beta.1", "tikv_util 0.1.0", - "tokio-threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2164,8 +2195,8 @@ dependencies = [ "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "tipb 0.0.1 (git+https://github.com/pingcap/tipb.git)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "twoway 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2228,7 +2259,8 @@ dependencies = [ "tikv_alloc 0.1.0", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2274,12 +2306,12 @@ dependencies = [ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-current-thread 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-fs 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-uds 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2307,7 +2339,7 @@ dependencies = [ "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2319,14 +2351,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "tokio-executor" -version = "0.1.5" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2337,7 +2370,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2355,7 +2388,7 @@ name = "tokio-reactor" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2363,7 +2396,7 @@ dependencies = [ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2382,16 +2415,18 @@ dependencies = [ [[package]] name = "tokio-threadpool" -version = "0.1.9" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "crossbeam-deque 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2399,10 +2434,10 @@ name = "tokio-timer" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2714,10 +2749,13 @@ dependencies = [ "checksum crossbeam-channel 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0ac88e108fa40799b39c08eb2a93bedf4cc99a9e5577f08ddf6dd6134ae65bf0" "checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3" "checksum crossbeam-deque 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4fe1b6f945f824c7a25afe44f62e25d714c0cc523f8e99d8db5cd1026e1269d3" +"checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71" "checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150" "checksum crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2449aaa4ec7ef96e5fb24db16024b935df718e9ae1cec0a1e68feeca2efca7b8" +"checksum crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "04c9e3102cc2d69cd681412141b390abd55a362afc1540965dad0ad4d34280b4" +"checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" "checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" -"checksum crossbeam-utils 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c55913cc2799171a550e307918c0a360e8c16004820291bf3b638969b4a01816" +"checksum crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f8306fcef4a7b563b76b7dd949ca48f52bc1141aa067d2ea09565f3e2652aa5c" "checksum csv 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "71903184af9960c555e7f3b32ff17390d20ecaaf17d4f18c4a0993f2df8a49e3" "checksum csv-core 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4dd8e6d86f7ba48b4276ef1317edc8cc36167546d8972feb4a2b5fec0b374105" "checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850" @@ -2905,12 +2943,12 @@ dependencies = [ "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" "checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" "checksum tokio-current-thread 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "331c8acc267855ec06eb0c94618dcbbfea45bed2d20b77252940095273fb58f6" -"checksum tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c117b6cf86bb730aab4834f10df96e4dd586eff2c3c27d3781348da49e255bde" +"checksum tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "83ea44c6c0773cc034771693711c35c677b4b5a4b21b9e7071704c54de7d555e" "checksum tokio-fs 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "60ae25f6b17d25116d2cba342083abe5255d3c2c79cb21ea11aa049c53bf7c75" "checksum tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "7392fe0a70d5ce0c882c4778116c519bd5dbaa8a7c3ae3d04578b3afafdcda21" "checksum tokio-reactor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "502b625acb4ee13cbb3b90b8ca80e0addd263ddacf6931666ef751e610b07fb5" "checksum tokio-tcp 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ad235e9dadd126b2d47f6736f65aa1fdcd6420e66ca63f44177bc78df89f912" -"checksum tokio-threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "56c5556262383032878afad66943926a1d1f0967f17e94bd7764ceceb3b70e7f" +"checksum tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "ec5759cf26cf9659555f36c431b515e3d05f66831741c85b4b5d5dfb9cf1323c" "checksum tokio-timer 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "4f37f0111d76cc5da132fe9bc0590b9b9cfd079bc7e75ac3846278430a299ff8" "checksum tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "137bda266504893ac4774e0ec4c2108f7ccdbcb7ac8dced6305fe9e4e0b5041a" "checksum tokio-uds 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "99ce87382f6c1a24b513a72c048b2c8efe66cb5161c9061d00bee510f08dc168" diff --git a/Cargo.toml b/Cargo.toml index 13c551fc872..aac35e12afc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,7 +112,7 @@ smallvec = { version = "0.6", features = ["union"] } flate2 = { version = "1.0", features = ["zlib"], default-features = false } more-asserts = "0.1" hyper = "0.12" -tokio-threadpool = "0.1" +tokio-threadpool = "0.1.13" vlog = "0.1.4" twoway = "0.2.0" cop_datatype = { path = "components/cop_datatype" } diff --git a/components/test_coprocessor/src/fixture.rs b/components/test_coprocessor/src/fixture.rs index 024a51b776d..c3cb1137414 100644 --- a/components/test_coprocessor/src/fixture.rs +++ b/components/test_coprocessor/src/fixture.rs @@ -5,12 +5,11 @@ use super::*; use kvproto::kvrpcpb::Context; use tikv::coprocessor::codec::Datum; -use tikv::coprocessor::{Endpoint, ReadPoolContext}; -use tikv::server::readpool::{self, ReadPool}; +use tikv::coprocessor::Endpoint; +use tikv::server::readpool; use tikv::server::Config; use tikv::storage::kv::RocksEngine; use tikv::storage::{Engine, TestEngineBuilder}; -use tikv_util::worker::FutureWorker; #[derive(Clone)] pub struct ProductTable(Table); @@ -54,15 +53,7 @@ pub fn init_data_with_engine_and_commit( vals: &[(i64, Option<&str>, i64)], commit: bool, ) -> (Store, Endpoint) { - init_data_with_details( - ctx, - engine, - tbl, - vals, - commit, - &Config::default(), - &readpool::Config::default_for_test(), - ) + init_data_with_details(ctx, engine, tbl, vals, commit, &Config::default()) } pub fn init_data_with_details( @@ -72,7 +63,6 @@ pub fn init_data_with_details( vals: &[(i64, Option<&str>, i64)], commit: bool, cfg: &Config, - read_pool_cfg: &readpool::Config, ) -> (Store, Endpoint) { let mut store = Store::from_engine(engine); @@ -88,10 +78,8 @@ pub fn init_data_with_details( if commit { store.commit_with_ctx(ctx); } - let pd_worker = FutureWorker::new("test-pd-worker"); - let pool = ReadPool::new("readpool", read_pool_cfg, || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + + let pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(cfg, store.get_engine(), pool); (store, cop) } diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 533194fb39c..d689a5ec4f2 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -19,7 +19,7 @@ use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter}; use tikv::raftstore::store::{Callback, SnapManager}; use tikv::raftstore::Result; use tikv::server::load_statistics::ThreadLoad; -use tikv::server::readpool::ReadPool; +use tikv::server::readpool; use tikv::server::resolve::{self, Task as ResolveTask}; use tikv::server::transport::RaftStoreRouter; use tikv::server::transport::ServerRaftStoreRouter; @@ -28,7 +28,8 @@ use tikv::server::{ create_raft_storage, Config, Error, Node, PdStoreAddrResolver, RaftClient, Server, ServerTransport, }; -use tikv::storage::{self, RaftKv}; + +use tikv::storage::RaftKv; use tikv_util::collections::{HashMap, HashSet}; use tikv_util::security::SecurityManager; use tikv_util::worker::{FutureWorker, Worker}; @@ -126,11 +127,8 @@ impl Simulator for ServerCluster { let sim_router = SimulateTransport::new(raft_router); // Create storage. - let pd_worker = FutureWorker::new("test-future-worker"); - let storage_read_pool = - ReadPool::new("store-read", &cfg.readpool.storage.build_config(), || { - storage::ReadPoolContext::new(pd_worker.scheduler()) - }); + let pd_worker = FutureWorker::new("test-pd-worker"); + let storage_read_pool = readpool::Builder::build_for_test(); let store = create_raft_storage( sim_router.clone(), &cfg.storage, @@ -155,12 +153,9 @@ impl Simulator for ServerCluster { // Create pd client, snapshot manager, server. let (worker, resolver) = resolve::new_resolver(Arc::clone(&self.pd_client)).unwrap(); let snap_mgr = SnapManager::new(tmp_str, Some(router.clone())); - let pd_worker = FutureWorker::new("test-pd-worker"); let server_cfg = Arc::new(cfg.server.clone()); let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap()); - let cop_read_pool = ReadPool::new("cop", &cfg.readpool.coprocessor.build_config(), || { - coprocessor::ReadPoolContext::new(pd_worker.scheduler()) - }); + let cop_read_pool = readpool::Builder::build_for_test(); let cop = coprocessor::Endpoint::new(&server_cfg, store.get_engine(), cop_read_pool); let mut server = None; for _ in 0..100 { diff --git a/components/tikv_util/Cargo.toml b/components/tikv_util/Cargo.toml index b75964414d7..04bf52537ef 100644 --- a/components/tikv_util/Cargo.toml +++ b/components/tikv_util/Cargo.toml @@ -33,6 +33,7 @@ tikv_alloc = { path = "../tikv_alloc" } tokio-core = "0.1" tokio-timer = "0.2" tokio-executor = "0.1" +tokio-threadpool = "0.1.13" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" diff --git a/components/tikv_util/src/future_pool/builder.rs b/components/tikv_util/src/future_pool/builder.rs new file mode 100644 index 00000000000..6bda9c4f8b7 --- /dev/null +++ b/components/tikv_util/src/future_pool/builder.rs @@ -0,0 +1,79 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use tokio_threadpool::Builder as TokioBuilder; + +use super::metrics::*; + +pub struct Builder { + inner_builder: TokioBuilder, + name_prefix: Option, + on_tick: Option>, +} + +impl Builder { + pub fn new() -> Self { + Self { + inner_builder: TokioBuilder::new(), + name_prefix: None, + on_tick: None, + } + } + + pub fn pool_size(&mut self, val: usize) -> &mut Self { + self.inner_builder.pool_size(val); + self + } + + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.inner_builder.stack_size(val); + self + } + + pub fn name_prefix(&mut self, val: impl Into) -> &mut Self { + let name = val.into(); + self.name_prefix = Some(name.clone()); + self.inner_builder.name_prefix(name); + self + } + + pub fn on_tick(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.on_tick = Some(Box::new(f)); + self + } + + pub fn before_stop(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.inner_builder.before_stop(f); + self + } + + pub fn after_start(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.inner_builder.after_start(f); + self + } + + pub fn build(&mut self) -> super::FuturePool { + let name = if let Some(name) = &self.name_prefix { + name.as_str() + } else { + "future_pool" + }; + let env = Arc::new(super::Env { + on_tick: self.on_tick.take(), + metrics_running_task_count: FUTUREPOOL_RUNNING_TASK_VEC.with_label_values(&[name]), + metrics_handled_task_count: FUTUREPOOL_HANDLED_TASK_VEC.with_label_values(&[name]), + }); + let pool = Arc::new(self.inner_builder.build()); + super::FuturePool { pool, env } + } +} diff --git a/components/tikv_util/src/future_pool/metrics.rs b/components/tikv_util/src/future_pool/metrics.rs new file mode 100644 index 00000000000..0a454477c53 --- /dev/null +++ b/components/tikv_util/src/future_pool/metrics.rs @@ -0,0 +1,18 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use prometheus::*; + +lazy_static! { + pub static ref FUTUREPOOL_RUNNING_TASK_VEC: IntGaugeVec = register_int_gauge_vec!( + "tikv_futurepool_pending_task_total", + "Current future_pool pending + running tasks.", + &["name"] + ) + .unwrap(); + pub static ref FUTUREPOOL_HANDLED_TASK_VEC: IntCounterVec = register_int_counter_vec!( + "tikv_futurepool_handled_task_total", + "Total number of future_pool handled tasks.", + &["name"] + ) + .unwrap(); +} diff --git a/components/tikv_util/src/future_pool/mod.rs b/components/tikv_util/src/future_pool/mod.rs new file mode 100644 index 00000000000..71241507f28 --- /dev/null +++ b/components/tikv_util/src/future_pool/mod.rs @@ -0,0 +1,310 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +//! This mod implemented a wrapped future pool that supports `on_tick()` which +//! is invoked no less than the specific interval. + +mod builder; +mod metrics; + +pub use self::builder::Builder; + +use std::cell::Cell; +use std::sync::Arc; +use std::time::Duration; + +use futures::{lazy, Future}; +use prometheus::*; +use tokio_threadpool::{SpawnHandle, ThreadPool}; + +use crate::time::Instant; + +const TICK_INTERVAL: Duration = Duration::from_secs(1); + +thread_local! { + static THREAD_LAST_TICK_TIME: Cell = Cell::new(Instant::now_coarse()); +} + +struct Env { + on_tick: Option>, + metrics_running_task_count: IntGauge, + metrics_handled_task_count: IntCounter, +} + +#[derive(Clone)] +pub struct FuturePool { + pool: Arc, + env: Arc, +} + +impl std::fmt::Debug for FuturePool { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fmt.debug_struct("FuturePool").finish() + } +} + +impl crate::AssertSend for FuturePool {} +impl crate::AssertSync for FuturePool {} + +impl FuturePool { + /// Gets current running task count. + #[inline] + pub fn get_running_task_count(&self) -> usize { + // As long as different future pool has different name prefix, we can safely use the value + // in metrics. + self.env.metrics_running_task_count.get() as usize + } + + /// Wraps a user provided future to support features of the `FuturePool`. + /// The wrapped future will be spawned in the future thread pool. + fn wrap_user_future(&self, future_fn: F) -> impl Future + where + F: FnOnce() -> R + Send + 'static, + R: Future + Send + 'static, + R::Item: Send + 'static, + R::Error: Send + 'static, + { + let env = self.env.clone(); + env.metrics_running_task_count.inc(); + + let func = move || { + future_fn().then(move |r| { + env.metrics_handled_task_count.inc(); + env.metrics_running_task_count.dec(); + try_tick_thread(&env); + r + }) + }; + lazy(func) + } + + /// Spawns a future in the pool. + pub fn spawn(&self, future_fn: F) + where + F: FnOnce() -> R + Send + 'static, + R: Future + Send + 'static, + R::Item: Send + 'static, + R::Error: Send + 'static, + { + let future = self.wrap_user_future(future_fn); + self.pool.spawn(future.then(|_| Ok(()))); + } + + /// Spawns a future in the pool and returns a handle to the result of the future. + /// + /// The future will not be executed if the handle is not polled. + #[must_use] + pub fn spawn_handle(&self, future_fn: F) -> SpawnHandle + where + F: FnOnce() -> R + Send + 'static, + R: Future + Send + 'static, + R::Item: Send + 'static, + R::Error: Send + 'static, + { + let future = self.wrap_user_future(future_fn); + self.pool.spawn_handle(future) + } +} + +/// Tries to trigger a tick in current thread. +/// +/// This function is effective only when it is called in thread pool worker +/// thread. +#[inline] +fn try_tick_thread(env: &Env) { + THREAD_LAST_TICK_TIME.with(|tls_last_tick| { + let now = Instant::now_coarse(); + let last_tick = tls_last_tick.get(); + if now.duration_since(last_tick) < TICK_INTERVAL { + return; + } + tls_last_tick.set(now); + if let Some(f) = &env.on_tick { + f(); + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc; + use std::thread; + + use futures::future; + + fn spawn_future_and_wait(pool: &FuturePool, duration: Duration) { + pool.spawn_handle(move || { + thread::sleep(duration); + future::ok::<_, ()>(()) + }) + .wait() + .unwrap(); + } + + fn spawn_future_without_wait(pool: &FuturePool, duration: Duration) { + pool.spawn(move || { + thread::sleep(duration); + future::ok::<_, ()>(()) + }); + } + + #[test] + fn test_tick() { + let tick_sequence = Arc::new(AtomicUsize::new(0)); + + let tick_sequence2 = tick_sequence.clone(); + let (tx, rx) = mpsc::sync_channel(1000); + + let pool = Builder::new() + .pool_size(1) + .on_tick(move || { + let seq = tick_sequence2.fetch_add(1, Ordering::SeqCst); + tx.send(seq).unwrap(); + }) + .build(); + + assert!(rx.try_recv().is_err()); + + // Tick is not emitted since there is no task + thread::sleep(TICK_INTERVAL * 2); + assert!(rx.try_recv().is_err()); + + // Tick is emitted because long enough time has elapsed since pool is created + spawn_future_and_wait(&pool, TICK_INTERVAL / 20); + assert!(rx.try_recv().is_err()); + + spawn_future_and_wait(&pool, TICK_INTERVAL / 20); + spawn_future_and_wait(&pool, TICK_INTERVAL / 20); + spawn_future_and_wait(&pool, TICK_INTERVAL / 20); + spawn_future_and_wait(&pool, TICK_INTERVAL / 20); + + // So far we have only elapsed TICK_INTERVAL * 0.2, so no ticks so far. + assert!(rx.try_recv().is_err()); + + // Even if long enough time has elapsed, tick is not emitted until next task arrives + thread::sleep(TICK_INTERVAL * 2); + assert!(rx.try_recv().is_err()); + + spawn_future_and_wait(&pool, TICK_INTERVAL / 20); + assert_eq!(rx.try_recv().unwrap(), 0); + assert!(rx.try_recv().is_err()); + + // Tick is not emitted if there is no task + thread::sleep(TICK_INTERVAL * 2); + assert!(rx.try_recv().is_err()); + + // Tick is emitted since long enough time has passed + spawn_future_and_wait(&pool, TICK_INTERVAL / 20); + assert_eq!(rx.try_recv().unwrap(), 1); + assert!(rx.try_recv().is_err()); + + // Tick is emitted immediately after a long task + spawn_future_and_wait(&pool, TICK_INTERVAL * 2); + assert_eq!(rx.try_recv().unwrap(), 2); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn test_tick_multi_thread() { + let tick_sequence = Arc::new(AtomicUsize::new(0)); + + let tick_sequence2 = tick_sequence.clone(); + let (tx, rx) = mpsc::sync_channel(1000); + + let pool = Builder::new() + .pool_size(2) + .on_tick(move || { + let seq = tick_sequence2.fetch_add(1, Ordering::SeqCst); + tx.send(seq).unwrap(); + }) + .build(); + + assert!(rx.try_recv().is_err()); + + // Spawn two tasks, each will be processed in one worker thread. + spawn_future_without_wait(&pool, TICK_INTERVAL / 2); + spawn_future_without_wait(&pool, TICK_INTERVAL / 2); + + assert!(rx.try_recv().is_err()); + + // Wait long enough time to trigger a tick. + thread::sleep(TICK_INTERVAL * 2); + + assert!(rx.try_recv().is_err()); + + // These two tasks should both trigger a tick. + spawn_future_without_wait(&pool, TICK_INTERVAL); + spawn_future_without_wait(&pool, TICK_INTERVAL / 2); + + // Wait until these tasks are finished. + thread::sleep(TICK_INTERVAL * 2); + + assert_eq!(rx.try_recv().unwrap(), 0); + assert_eq!(rx.try_recv().unwrap(), 1); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn test_handle_drop() { + let pool = Builder::new().pool_size(1).build(); + + let (tx, rx) = mpsc::sync_channel(10); + + let tx2 = tx.clone(); + pool.spawn(move || { + thread::sleep(Duration::from_millis(200)); + tx2.send(11).unwrap(); + future::ok::<_, ()>(()) + }); + + let tx2 = tx.clone(); + drop(pool.spawn_handle(move || { + tx2.send(7).unwrap(); + future::ok::<_, ()>(()) + })); + + thread::sleep(Duration::from_millis(500)); + + assert_eq!(rx.try_recv().unwrap(), 11); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn test_handle_result() { + let pool = Builder::new().pool_size(1).build(); + + let handle = pool.spawn_handle(move || future::ok::<_, ()>(42)); + + assert_eq!(handle.wait().unwrap(), 42); + } + + #[test] + fn test_running_task_count() { + let pool = Builder::new() + .name_prefix("future_pool_for_running_task_test") // The name is important + .pool_size(2) + .build(); + + assert_eq!(pool.get_running_task_count(), 0); + + spawn_future_without_wait(&pool, Duration::from_millis(500)); // f1 + assert_eq!(pool.get_running_task_count(), 1); + + spawn_future_without_wait(&pool, Duration::from_millis(1000)); // f2 + assert_eq!(pool.get_running_task_count(), 2); + + spawn_future_without_wait(&pool, Duration::from_millis(1500)); + assert_eq!(pool.get_running_task_count(), 3); + + thread::sleep(Duration::from_millis(700)); // f1 completed, f2 elapsed 700 + assert_eq!(pool.get_running_task_count(), 2); + + spawn_future_without_wait(&pool, Duration::from_millis(1500)); + assert_eq!(pool.get_running_task_count(), 3); + + thread::sleep(Duration::from_millis(2700)); + assert_eq!(pool.get_running_task_count(), 0); + } +} diff --git a/components/tikv_util/src/futurepool.rs b/components/tikv_util/src/futurepool.rs deleted file mode 100644 index 578f8d4714b..00000000000 --- a/components/tikv_util/src/futurepool.rs +++ /dev/null @@ -1,382 +0,0 @@ -// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. - -//! This mod implemented a wrapped future pool that supports `on_tick()` which -//! is invoked no less than the specific interval. - -use std::cell::{Cell, RefCell, RefMut}; -use std::fmt; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{mpsc, Arc}; -use std::thread; -use std::time::Duration; - -use futures::Future; -use futures_cpupool::{self as cpupool, CpuFuture, CpuPool}; -use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; - -use crate::collections::HashMap; -use crate::time::Instant; - -lazy_static! { - pub static ref FUTUREPOOL_PENDING_TASK_VEC: IntGaugeVec = register_int_gauge_vec!( - "tikv_futurepool_pending_task_total", - "Current future_pool pending + running tasks.", - &["name"] - ) - .unwrap(); - pub static ref FUTUREPOOL_HANDLED_TASK_VEC: IntCounterVec = register_int_counter_vec!( - "tikv_futurepool_handled_task_total", - "Total number of future_pool handled tasks.", - &["name"] - ) - .unwrap(); -} - -pub trait Context: fmt::Debug + Send { - /// Will be invoked periodically (no less than specified interval). - /// When there is no task, it will NOT be invoked. - fn on_tick(&mut self) {} -} - -/// A delegator to wrap `Context` to provide `on_tick` feature. -#[derive(Debug)] -struct ContextDelegator { - tick_interval: Duration, - inner: RefCell, - last_tick: Cell>, -} - -impl ContextDelegator { - fn new(context: T, tick_interval: Duration) -> ContextDelegator { - ContextDelegator { - tick_interval, - inner: RefCell::new(context), - last_tick: Cell::new(None), - } - } - - fn context_mut(&self) -> RefMut<'_, T> { - self.inner.borrow_mut() - } - - fn on_task_finish(&self) { - let now = Instant::now_coarse(); - let last_tick = self.last_tick.get(); - if last_tick.is_none() { - // set last_tick when the first future is resolved - self.last_tick.set(Some(now)); - return; - } - if now.duration_since(last_tick.unwrap()) < self.tick_interval { - return; - } - self.last_tick.set(Some(now)); - self.context_mut().on_tick(); - } -} - -#[derive(Debug)] -pub struct ContextDelegators { - delegators: Arc>>, -} - -/// Users can only retrive a Context for the current thread so that `HashMap<..>` is `Sync`. -/// Thus `ContextDelegators` is `Send` & `Sync`. -unsafe impl Send for ContextDelegators {} -unsafe impl Sync for ContextDelegators {} - -impl Clone for ContextDelegators { - fn clone(&self) -> Self { - ContextDelegators { - delegators: Arc::clone(&self.delegators), - } - } -} - -impl ContextDelegators { - fn new(delegators: HashMap>) -> Self { - ContextDelegators { - delegators: Arc::new(delegators), - } - } - - /// This function should be called in the future pool thread. Otherwise it will panic. - pub fn current_thread_context_mut(&self) -> RefMut<'_, T> { - let delegator = self.get_current_thread_delegator(); - delegator.context_mut() - } - - fn get_current_thread_delegator(&self) -> &ContextDelegator { - let thread_id = thread::current().id(); - if let Some(delegator) = self.delegators.get(&thread_id) { - delegator - } else { - panic!("Called from threads out of the future thread pool"); - } - } -} - -/// A future thread pool that supports `on_tick` for each thread. -pub struct FuturePool { - pool: CpuPool, - context_delegators: ContextDelegators, - running_task_count: Arc, - metrics_pending_task_count: IntGauge, - metrics_handled_task_count: IntCounter, -} - -impl fmt::Debug for FuturePool { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("FuturePool") - .field("pool", &self.pool) - .field("context_delegators", &self.context_delegators) - .finish() - } -} - -impl Clone for FuturePool { - fn clone(&self) -> FuturePool { - FuturePool { - pool: self.pool.clone(), - context_delegators: self.context_delegators.clone(), - running_task_count: Arc::clone(&self.running_task_count), - metrics_pending_task_count: self.metrics_pending_task_count.clone(), - metrics_handled_task_count: self.metrics_handled_task_count.clone(), - } - } -} - -impl crate::AssertSend for FuturePool {} -impl crate::AssertSync for FuturePool {} - -impl FuturePool { - pub fn new( - pool_size: usize, - stack_size: usize, - name_prefix: &str, - tick_interval: Duration, - context_factory: F, - ) -> FuturePool - where - F: Fn() -> T, - { - let (tx, rx) = mpsc::sync_channel(pool_size); - let pool = cpupool::Builder::new() - .pool_size(pool_size) - .stack_size(stack_size) - .name_prefix(name_prefix) - .after_start(move || { - // We only need to know each thread's id and we can build context later - // by invoking `context_factory` in a non-concurrent way. - let thread_id = thread::current().id(); - tx.send(thread_id).unwrap(); - }) - .create(); - let contexts = (0..pool_size) - .map(|_| { - let thread_id = rx.recv().unwrap(); - let context_delegator = ContextDelegator::new(context_factory(), tick_interval); - (thread_id, context_delegator) - }) - .collect(); - FuturePool { - pool, - context_delegators: ContextDelegators::new(contexts), - running_task_count: Arc::new(AtomicUsize::new(0)), - metrics_pending_task_count: FUTUREPOOL_PENDING_TASK_VEC - .with_label_values(&[name_prefix]), - metrics_handled_task_count: FUTUREPOOL_HANDLED_TASK_VEC - .with_label_values(&[name_prefix]), - } - } - - /// Gets current running task count - #[inline] - pub fn get_running_task_count(&self) -> usize { - self.running_task_count.load(Ordering::Acquire) - } - - pub fn spawn(&self, future_factory: R) -> CpuFuture - where - R: FnOnce(ContextDelegators) -> F + Send + 'static, - F: Future + Send + 'static, - F::Item: Send + 'static, - F::Error: Send + 'static, - { - let running_task_count = Arc::clone(&self.running_task_count); - let metrics_pending_task_count = self.metrics_pending_task_count.clone(); - let metrics_handled_task_count = self.metrics_handled_task_count.clone(); - let delegators = self.context_delegators.clone(); - let func = move || { - future_factory(delegators.clone()).then(move |r| { - let delegator = delegators.get_current_thread_delegator(); - delegator.on_task_finish(); - running_task_count.fetch_sub(1, Ordering::Release); - metrics_pending_task_count.dec(); - metrics_handled_task_count.inc(); - r - }) - }; - - self.running_task_count.fetch_add(1, Ordering::Release); - self.metrics_pending_task_count.inc(); - self.pool.spawn_fn(func) - } -} - -#[cfg(test)] -mod tests { - use futures::future; - use std::sync::mpsc::{channel, Sender}; - use std::thread; - use std::time::Duration; - - pub use super::*; - - fn spawn_long_time_future( - pool: &FuturePool, - future_duration_ms: u64, - ) -> CpuFuture<(), ()> { - pool.spawn(move |_| { - thread::sleep(Duration::from_millis(future_duration_ms)); - future::ok::<(), ()>(()) - }) - } - - fn spawn_long_time_future_and_wait(pool: &FuturePool, future_duration_ms: u64) { - spawn_long_time_future(pool, future_duration_ms) - .wait() - .unwrap(); - } - - #[test] - fn test_context() { - #[derive(Debug)] - struct MyContext { - ctx_thread_id: thread::ThreadId, - } - impl Context for MyContext {} - - let pool = FuturePool::new( - 1, - 1024000, - "test-pool", - Duration::from_millis(50), - move || MyContext { - ctx_thread_id: thread::current().id(), - }, - ); - - let main_thread_id = thread::current().id(); - - pool.spawn(move |ctxd| { - // future_factory is executed in future pool - let current_thread_id = thread::current().id(); - assert_ne!(main_thread_id, current_thread_id); - - // Context is created in main thread - let ctx = ctxd.current_thread_context_mut(); - assert_eq!(ctx.ctx_thread_id, main_thread_id); - future::ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } - - #[test] - fn test_tick() { - struct MyContext { - tx: Sender, - sn: i32, - } - impl fmt::Debug for MyContext { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "MyContext") - } - } - impl Context for MyContext { - fn on_tick(&mut self) { - self.tx.send(self.sn).unwrap(); - self.sn += 1; - } - } - - let (tx, rx) = channel(); - - let pool = FuturePool::new( - 1, - 1024000, - "test-pool", - Duration::from_millis(200), - move || MyContext { - tx: tx.clone(), - sn: 0, - }, - ); - assert!(rx.try_recv().is_err()); - - // Tick is not emitted since there is no task - thread::sleep(Duration::from_millis(400)); - assert!(rx.try_recv().is_err()); - - // Tick is not emitted immediately for the first future - spawn_long_time_future_and_wait(&pool, 10); - assert!(rx.try_recv().is_err()); - - spawn_long_time_future_and_wait(&pool, 1); - spawn_long_time_future_and_wait(&pool, 1); - spawn_long_time_future_and_wait(&pool, 1); - spawn_long_time_future_and_wait(&pool, 1); - assert!(rx.try_recv().is_err()); - - // Even if there are tasks previously, tick is not emitted until next task arrives - thread::sleep(Duration::from_millis(400)); - assert!(rx.try_recv().is_err()); - - // Next task arrives && long time passed, tick is emitted - spawn_long_time_future_and_wait(&pool, 400); - assert_eq!(rx.try_recv().unwrap(), 0); - assert!(rx.try_recv().is_err()); - - // Tick is not emitted if there is no task - thread::sleep(Duration::from_millis(400)); - assert!(rx.try_recv().is_err()); - - // Tick is emitted since long enough time has passed - spawn_long_time_future_and_wait(&pool, 1); - assert_eq!(rx.try_recv().unwrap(), 1); - assert!(rx.try_recv().is_err()); - } - - #[test] - fn test_task_count() { - #[derive(Debug)] - struct MyContext; - impl Context for MyContext {} - - let pool = FuturePool::new( - 2, - 1024000, - "test-pool", - Duration::from_millis(50), - move || MyContext {}, - ); - - assert_eq!(pool.get_running_task_count(), 0); - let f1 = spawn_long_time_future(&pool, 100); - assert_eq!(pool.get_running_task_count(), 1); - let f2 = spawn_long_time_future(&pool, 200); - assert_eq!(pool.get_running_task_count(), 2); - let f3 = spawn_long_time_future(&pool, 300); - assert_eq!(pool.get_running_task_count(), 3); - f1.wait().unwrap(); - assert_eq!(pool.get_running_task_count(), 2); - let f4 = spawn_long_time_future(&pool, 300); - let f5 = spawn_long_time_future(&pool, 300); - assert_eq!(pool.get_running_task_count(), 4); - f2.join(f3).wait().unwrap(); - assert_eq!(pool.get_running_task_count(), 2); - f4.join(f5).wait().unwrap(); - assert_eq!(pool.get_running_task_count(), 0); - } -} diff --git a/components/tikv_util/src/lib.rs b/components/tikv_util/src/lib.rs index 7baef68e726..7824d5738ee 100644 --- a/components/tikv_util/src/lib.rs +++ b/components/tikv_util/src/lib.rs @@ -8,8 +8,6 @@ extern crate futures; #[macro_use] extern crate lazy_static; #[macro_use] -extern crate prometheus; -#[macro_use] extern crate quick_error; #[macro_use] extern crate serde_derive; @@ -51,7 +49,7 @@ pub mod collections; pub mod config; pub mod file; pub mod future; -pub mod futurepool; +pub mod future_pool; pub mod io_limiter; #[macro_use] pub mod macros; diff --git a/src/bin/tikv-server.rs b/src/bin/tikv-server.rs index f5e4c132305..7c34d9aff29 100644 --- a/src/bin/tikv-server.rs +++ b/src/bin/tikv-server.rs @@ -47,7 +47,6 @@ use tikv::pd::{PdClient, RpcClient}; use tikv::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor}; use tikv::raftstore::store::fsm; use tikv::raftstore::store::{new_compaction_listener, SnapManagerBuilder}; -use tikv::server::readpool::ReadPool; use tikv::server::resolve; use tikv::server::status_server::StatusServer; use tikv::server::transport::ServerRaftStoreRouter; @@ -195,10 +194,12 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc { engine: E, /// The thread pool to run Coprocessor requests. - read_pool: ReadPool, + read_pool: ReadPool, /// The recursion limit when parsing Coprocessor Protobuf requests. recursion_limit: u32, @@ -58,7 +58,7 @@ impl Clone for Endpoint { impl tikv_util::AssertSend for Endpoint {} impl Endpoint { - pub fn new(cfg: &Config, engine: E, read_pool: ReadPool) -> Self { + pub fn new(cfg: &Config, engine: E, read_pool: ReadPool) -> Self { Self { engine, read_pool, @@ -272,11 +272,10 @@ impl Endpoint { let engine = self.engine.clone(); let priority = readpool::Priority::from(req_ctx.context.get_priority()); // box the tracker so that moving it is cheap. - let mut tracker = Box::new(Tracker::new(req_ctx)); + let tracker = Box::new(Tracker::new(req_ctx)); self.read_pool - .future_execute(priority, move |ctxd| { - tracker.attach_ctxd(ctxd); + .spawn_handle(priority, move || { Self::handle_unary_request_impl(engine, tracker, handler_builder) }) .map_err(|_| Error::Full) @@ -408,25 +407,16 @@ impl Endpoint { let (tx, rx) = mpsc::channel::>(self.stream_channel_size); let engine = self.engine.clone(); let priority = readpool::Priority::from(req_ctx.context.get_priority()); - // Must be created befure `future_execute`, otherwise wait time is not tracked. - let mut tracker = Box::new(Tracker::new(req_ctx)); + let tracker = Box::new(Tracker::new(req_ctx)); self.read_pool - .future_execute(priority, move |ctxd| { - tracker.attach_ctxd(ctxd); - + .spawn(priority, move || { Self::handle_stream_request_impl(engine, tracker, handler_builder) // Stream .then(Ok::<_, mpsc::SendError<_>>) // Stream, MpscError> .forward(tx) }) - .map_err(|_| Error::Full) - .and_then(move |cpu_future| { - // Keep running stream producer - cpu_future.forget(); - - // Returns the stream instead of a future - Ok(rx.then(|r| r.unwrap())) - }) + .map_err(|_| Error::Full)?; + Ok(rx.then(|r| r.unwrap())) } /// Parses and handles a stream request. Returns a stream that produce each result in a @@ -511,7 +501,6 @@ mod tests { use tipb::expression::Expr; use crate::storage::TestEngineBuilder; - use tikv_util::worker::FutureWorker; /// A unary `RequestHandler` that always produces a fixture. struct UnaryFixture { @@ -633,11 +622,8 @@ mod tests { #[test] fn test_outdated_request() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(&Config::default(), engine, read_pool); // a normal request @@ -671,11 +657,8 @@ mod tests { #[test] fn test_stack_guard() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new( &Config { end_point_recursion_limit: 5, @@ -711,11 +694,8 @@ mod tests { #[test] fn test_invalid_req_type() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(&Config::default(), engine, read_pool); let mut req = coppb::Request::new(); @@ -730,11 +710,8 @@ mod tests { #[test] fn test_invalid_req_body() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(&Config::default(), engine, read_pool); let mut req = coppb::Request::new(); @@ -750,17 +727,16 @@ mod tests { #[test] fn test_full() { - let pd_worker = FutureWorker::new("test-pd-worker"); + let read_pool = readpool::Builder::from_config(&readpool::Config { + normal_concurrency: 1, + max_tasks_per_worker_normal: 2, + ..readpool::Config::default_for_test() + }) + .name_prefix("cop-test-full") + .build(); + let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new( - "readpool", - &readpool::Config { - normal_concurrency: 1, - max_tasks_per_worker_normal: 2, - ..readpool::Config::default_for_test() - }, - || ReadPoolContext::new(pd_worker.scheduler()), - ); + let cop = Endpoint::new(&Config::default(), engine, read_pool); let (tx, rx) = mpsc::channel(); @@ -805,11 +781,8 @@ mod tests { #[test] fn test_error_unary_response() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(&Config::default(), engine, read_pool); let handler_builder = Box::new(|_, _: &_| { @@ -826,11 +799,8 @@ mod tests { #[test] fn test_error_streaming_response() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(&Config::default(), engine, read_pool); // Fail immediately @@ -873,11 +843,8 @@ mod tests { #[test] fn test_empty_streaming_response() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(&Config::default(), engine, read_pool); let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed())); @@ -894,11 +861,8 @@ mod tests { #[test] fn test_special_streaming_handlers() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new(&Config::default(), engine, read_pool); // handler returns `finished == true` should not be called again. @@ -983,11 +947,8 @@ mod tests { #[test] fn test_channel_size() { - let pd_worker = FutureWorker::new("test-pd-worker"); let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }); + let read_pool = readpool::Builder::build_for_test(); let cop = Endpoint::new( &Config { end_point_stream_channel_size: 3, @@ -1018,6 +979,7 @@ mod tests { assert!(counter.load(atomic::Ordering::SeqCst) < 14); } + // #[test] fn test_handle_time() { use tikv_util::config::ReadableDuration; @@ -1037,13 +999,11 @@ mod tests { const PAYLOAD_SMALL: i64 = 3000; const PAYLOAD_LARGE: i64 = 6000; - let pd_worker = FutureWorker::new("test-pd-worker"); + let read_pool = + readpool::Builder::from_config(&readpool::Config::default_with_concurrency(1)).build(); + let engine = TestEngineBuilder::new().build().unwrap(); - let read_pool = ReadPool::new( - "readpool", - &readpool::Config::default_with_concurrency(1), - || ReadPoolContext::new(pd_worker.scheduler()), - ); + let mut config = Config::default(); config.end_point_request_max_handle_duration = ReadableDuration::millis((PAYLOAD_SMALL + PAYLOAD_LARGE) as u64 * 2); diff --git a/src/coprocessor/mod.rs b/src/coprocessor/mod.rs index cc033d12c15..07f5d5efaa1 100644 --- a/src/coprocessor/mod.rs +++ b/src/coprocessor/mod.rs @@ -24,14 +24,13 @@ mod endpoint; mod error; pub mod local_metrics; mod metrics; -mod readpool_context; +pub mod readpool_impl; mod statistics; mod tracker; pub mod util; pub use self::endpoint::Endpoint; pub use self::error::{Error, Result}; -pub use self::readpool_context::Context as ReadPoolContext; use std::boxed::FnBox; diff --git a/src/coprocessor/readpool_context.rs b/src/coprocessor/readpool_context.rs deleted file mode 100644 index e22759c5c83..00000000000 --- a/src/coprocessor/readpool_context.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. - -use std::fmt; - -use crate::pd; -use tikv_util::futurepool; -use tikv_util::worker; - -use super::dag::executor::ExecutorMetrics; -use super::local_metrics::{BasicLocalMetrics, ExecLocalMetrics}; - -pub struct Context { - // TODO: ExecLocalMetrics can be merged into this file. - pub exec_local_metrics: ExecLocalMetrics, - pub basic_local_metrics: BasicLocalMetrics, -} - -impl fmt::Debug for Context { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("coprocessor::ReadPoolContext").finish() - } -} - -impl Context { - pub fn new(pd_sender: worker::FutureScheduler) -> Self { - Context { - exec_local_metrics: ExecLocalMetrics::new(pd_sender), - basic_local_metrics: BasicLocalMetrics::default(), - } - } - - pub fn collect(&mut self, region_id: u64, scan_tag: &str, metrics: ExecutorMetrics) { - self.exec_local_metrics - .collect(scan_tag, region_id, metrics); - } -} - -impl futurepool::Context for Context { - fn on_tick(&mut self) { - self.exec_local_metrics.flush(); - self.basic_local_metrics.flush(); - } -} diff --git a/src/coprocessor/readpool_impl.rs b/src/coprocessor/readpool_impl.rs new file mode 100644 index 00000000000..d9b7a8f5a66 --- /dev/null +++ b/src/coprocessor/readpool_impl.rs @@ -0,0 +1,138 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::cell::RefCell; + +use crate::pd::PdTask; +use crate::server::readpool::{self, Builder, ReadPool}; +use tikv_util::collections::HashMap; +use tikv_util::worker::FutureScheduler; + +use super::metrics::*; +use prometheus::local::*; + +use crate::coprocessor::dag::executor::ExecutorMetrics; + +pub struct CopLocalMetrics { + pub local_copr_req_histogram_vec: LocalHistogramVec, + pub local_outdated_req_wait_time: LocalHistogramVec, + pub local_copr_req_handle_time: LocalHistogramVec, + pub local_copr_req_wait_time: LocalHistogramVec, + pub local_copr_req_error: LocalIntCounterVec, + pub local_copr_scan_keys: LocalHistogramVec, + pub local_copr_scan_details: LocalIntCounterVec, + pub local_copr_rocksdb_perf_counter: LocalIntCounterVec, + local_copr_executor_count: LocalIntCounterVec, + local_copr_get_or_scan_count: LocalIntCounterVec, + local_cop_flow_stats: HashMap, +} + +thread_local! { + pub static TLS_COP_METRICS: RefCell = RefCell::new( + CopLocalMetrics { + local_copr_req_histogram_vec: + COPR_REQ_HISTOGRAM_VEC.local(), + local_outdated_req_wait_time: + OUTDATED_REQ_WAIT_TIME.local(), + local_copr_req_handle_time: + COPR_REQ_HANDLE_TIME.local(), + local_copr_req_wait_time: + COPR_REQ_WAIT_TIME.local(), + local_copr_req_error: + COPR_REQ_ERROR.local(), + local_copr_scan_keys: + COPR_SCAN_KEYS.local(), + local_copr_scan_details: + COPR_SCAN_DETAILS.local(), + local_copr_rocksdb_perf_counter: + COPR_ROCKSDB_PERF_COUNTER.local(), + local_copr_executor_count: + COPR_EXECUTOR_COUNT.local(), + local_copr_get_or_scan_count: + COPR_GET_OR_SCAN_COUNT.local(), + local_cop_flow_stats: + HashMap::default(), + } + ); +} + +pub fn build_read_pool( + config: &readpool::Config, + pd_sender: FutureScheduler, + name_prefix: &str, +) -> ReadPool { + let pd_sender2 = pd_sender.clone(); + + Builder::from_config(config) + .name_prefix(name_prefix) + .on_tick(move || tls_flush(&pd_sender)) + .before_stop(move || tls_flush(&pd_sender2)) + .build() +} + +#[inline] +fn tls_flush(pd_sender: &FutureScheduler) { + TLS_COP_METRICS.with(|m| { + // Flush Prometheus metrics + let mut cop_metrics = m.borrow_mut(); + cop_metrics.local_copr_req_histogram_vec.flush(); + cop_metrics.local_copr_req_handle_time.flush(); + cop_metrics.local_copr_req_wait_time.flush(); + cop_metrics.local_copr_scan_keys.flush(); + cop_metrics.local_copr_rocksdb_perf_counter.flush(); + cop_metrics.local_copr_scan_details.flush(); + cop_metrics.local_copr_get_or_scan_count.flush(); + cop_metrics.local_copr_executor_count.flush(); + + // Report PD metrics + if cop_metrics.local_cop_flow_stats.is_empty() { + // Stats to report to PD is empty, ignore. + return; + } + + let read_stats = cop_metrics.local_cop_flow_stats.clone(); + cop_metrics.local_cop_flow_stats = HashMap::default(); + + let result = pd_sender.schedule(PdTask::ReadStats { read_stats }); + if let Err(e) = result { + error!("Failed to send cop pool read flow statistics"; "err" => ?e); + } + }); +} + +pub fn tls_collect_executor_metrics(region_id: u64, type_str: &str, metrics: ExecutorMetrics) { + let stats = &metrics.cf_stats; + // cf statistics group by type + for (cf, details) in stats.details() { + for (tag, count) in details { + TLS_COP_METRICS.with(|m| { + m.borrow_mut() + .local_copr_scan_details + .with_label_values(&[type_str, cf, tag]) + .inc_by(count as i64); + }); + } + } + // flow statistics group by region + tls_collect_read_flow(region_id, stats); + + // scan count + let scan_counter = metrics.scan_counter; + // exec count + let executor_count = metrics.executor_count; + TLS_COP_METRICS.with(|m| { + scan_counter.consume(&mut m.borrow_mut().local_copr_get_or_scan_count); + executor_count.consume(&mut m.borrow_mut().local_copr_executor_count); + }); +} + +#[inline] +pub fn tls_collect_read_flow(region_id: u64, statistics: &crate::storage::Statistics) { + TLS_COP_METRICS.with(|m| { + let map = &mut m.borrow_mut().local_cop_flow_stats; + let flow_stats = map + .entry(region_id) + .or_insert_with(crate::storage::FlowStatistics::default); + flow_stats.add(&statistics.write.flow_stats); + flow_stats.add(&statistics.data.flow_stats); + }); +} diff --git a/src/coprocessor/tracker.rs b/src/coprocessor/tracker.rs index 9a18716a416..1aa73a257df 100644 --- a/src/coprocessor/tracker.rs +++ b/src/coprocessor/tracker.rs @@ -3,10 +3,11 @@ use kvproto::kvrpcpb; use crate::storage::kv::{PerfStatisticsDelta, PerfStatisticsInstant}; -use tikv_util::futurepool; + use tikv_util::time::{self, Duration, Instant}; use crate::coprocessor::dag::executor::ExecutorMetrics; +use crate::coprocessor::readpool_impl::*; use crate::coprocessor::*; // If handle time is larger than the lower bound, the query is considered as slow query. @@ -14,10 +15,7 @@ const SLOW_QUERY_LOWER_BOUND: f64 = 1.0; // 1 second. #[derive(Debug, Clone, Copy, PartialEq)] enum TrackerState { - /// The tracker is just created and not initialized. Initialize means `ctxd` is attached. - NotInitialized, - - /// The tracker is initialized with a `ctxd` instance. + /// The tracker is initialized. Initialized, /// The tracker is notified that all items just began. @@ -54,9 +52,6 @@ pub struct Tracker { // Request info, used to print slow log. pub req_ctx: ReqContext, - - // Metrics collect target - ctxd: Option>, } impl Tracker { @@ -69,7 +64,7 @@ impl Tracker { item_begin_at: Instant::now_coarse(), perf_statistics_start: None, - current_stage: TrackerState::NotInitialized, + current_stage: TrackerState::Initialized, wait_time: Duration::default(), req_time: Duration::default(), item_process_time: Duration::default(), @@ -78,20 +73,11 @@ impl Tracker { total_perf_statistics: PerfStatisticsDelta::default(), req_ctx, - - ctxd: None, } } - /// Attach future pool's context delegators. - pub fn attach_ctxd(&mut self, ctxd: futurepool::ContextDelegators) { - assert!(self.current_stage == TrackerState::NotInitialized); - self.ctxd = Some(ctxd); - self.current_stage = TrackerState::Initialized; - } - pub fn on_begin_all_items(&mut self) { - assert!(self.current_stage == TrackerState::Initialized); + assert_eq!(self.current_stage, TrackerState::Initialized); self.wait_time = Instant::now_coarse() - self.request_begin_at; self.current_stage = TrackerState::AllItemsBegan; } @@ -107,7 +93,7 @@ impl Tracker { } pub fn on_finish_item(&mut self, some_exec_metrics: Option) { - assert!(self.current_stage == TrackerState::ItemBegan); + assert_eq!(self.current_stage, TrackerState::ItemBegan); self.item_process_time = Instant::now_coarse() - self.item_begin_at; self.total_process_time += self.item_process_time; if let Some(mut exec_metrics) = some_exec_metrics { @@ -124,7 +110,7 @@ impl Tracker { /// Get current item's ExecDetail according to previous collected metrics. /// TiDB asks for ExecDetail to be printed in its log. pub fn get_item_exec_details(&self) -> kvrpcpb::ExecDetails { - assert!(self.current_stage == TrackerState::ItemFinished); + assert_eq!(self.current_stage, TrackerState::ItemFinished); let is_slow_query = time::duration_to_sec(self.item_process_time) > SLOW_QUERY_LOWER_BOUND; let mut exec_details = kvrpcpb::ExecDetails::new(); if self.req_ctx.context.get_handle_time() || is_slow_query { @@ -180,57 +166,67 @@ impl Tracker { let total_exec_metrics = std::mem::replace(&mut self.total_exec_metrics, ExecutorMetrics::default()); - let mut thread_ctx = self.ctxd.as_ref().unwrap().current_thread_context_mut(); - thread_ctx - .basic_local_metrics - .req_time - .with_label_values(&[self.req_ctx.tag]) - .observe(time::duration_to_sec(self.req_time)); - thread_ctx - .basic_local_metrics - .wait_time - .with_label_values(&[self.req_ctx.tag]) - .observe(time::duration_to_sec(self.wait_time)); - thread_ctx - .basic_local_metrics - .handle_time - .with_label_values(&[self.req_ctx.tag]) - .observe(time::duration_to_sec(self.total_process_time)); - thread_ctx - .basic_local_metrics - .scan_keys - .with_label_values(&[self.req_ctx.tag]) - .observe(total_exec_metrics.cf_stats.total_op_count() as f64); - thread_ctx.collect( + + TLS_COP_METRICS.with(|m| { + let mut cop_metrics = m.borrow_mut(); + + // req time + cop_metrics + .local_copr_req_histogram_vec + .with_label_values(&[self.req_ctx.tag]) + .observe(time::duration_to_sec(self.req_time)); + + // wait time + cop_metrics + .local_copr_req_wait_time + .with_label_values(&[self.req_ctx.tag]) + .observe(time::duration_to_sec(self.wait_time)); + + // handle time + cop_metrics + .local_copr_req_handle_time + .with_label_values(&[self.req_ctx.tag]) + .observe(time::duration_to_sec(self.total_process_time)); + + // scan keys + cop_metrics + .local_copr_scan_keys + .with_label_values(&[self.req_ctx.tag]) + .observe(total_exec_metrics.cf_stats.total_op_count() as f64); + + // rocksdb perf stats + cop_metrics + .local_copr_rocksdb_perf_counter + .with_label_values(&[self.req_ctx.tag, "internal_key_skipped_count"]) + .inc_by(self.total_perf_statistics.internal_key_skipped_count as i64); + + cop_metrics + .local_copr_rocksdb_perf_counter + .with_label_values(&[self.req_ctx.tag, "internal_delete_skipped_count"]) + .inc_by(self.total_perf_statistics.internal_delete_skipped_count as i64); + + cop_metrics + .local_copr_rocksdb_perf_counter + .with_label_values(&[self.req_ctx.tag, "block_cache_hit_count"]) + .inc_by(self.total_perf_statistics.block_cache_hit_count as i64); + + cop_metrics + .local_copr_rocksdb_perf_counter + .with_label_values(&[self.req_ctx.tag, "block_read_count"]) + .inc_by(self.total_perf_statistics.block_read_count as i64); + + cop_metrics + .local_copr_rocksdb_perf_counter + .with_label_values(&[self.req_ctx.tag, "block_read_byte"]) + .inc_by(self.total_perf_statistics.block_read_byte as i64); + }); + + tls_collect_executor_metrics( self.req_ctx.context.get_region_id(), self.req_ctx.tag, total_exec_metrics, ); - thread_ctx - .basic_local_metrics - .rocksdb_perf_stats - .with_label_values(&[self.req_ctx.tag, "internal_key_skipped_count"]) - .inc_by(self.total_perf_statistics.internal_key_skipped_count as i64); - thread_ctx - .basic_local_metrics - .rocksdb_perf_stats - .with_label_values(&[self.req_ctx.tag, "internal_delete_skipped_count"]) - .inc_by(self.total_perf_statistics.internal_delete_skipped_count as i64); - thread_ctx - .basic_local_metrics - .rocksdb_perf_stats - .with_label_values(&[self.req_ctx.tag, "block_cache_hit_count"]) - .inc_by(self.total_perf_statistics.block_cache_hit_count as i64); - thread_ctx - .basic_local_metrics - .rocksdb_perf_stats - .with_label_values(&[self.req_ctx.tag, "block_read_count"]) - .inc_by(self.total_perf_statistics.block_read_count as i64); - thread_ctx - .basic_local_metrics - .rocksdb_perf_stats - .with_label_values(&[self.req_ctx.tag, "block_read_byte"]) - .inc_by(self.total_perf_statistics.block_read_byte as i64); + self.current_stage = TrackerState::Tracked; } } diff --git a/src/server/node.rs b/src/server/node.rs index 31c1df04d93..7c3389ae495 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -16,7 +16,7 @@ use crate::raftstore::store::{ use crate::server::readpool::ReadPool; use crate::server::Config as ServerConfig; use crate::server::ServerRaftStoreRouter; -use crate::storage::{self, Config as StorageConfig, RaftKv, Storage}; +use crate::storage::{Config as StorageConfig, RaftKv, Storage}; use engine::rocks::DB; use engine::Engines; use engine::Peekable; @@ -33,7 +33,7 @@ const CHECK_CLUSTER_BOOTSTRAPPED_RETRY_SECONDS: u64 = 3; pub fn create_raft_storage( router: S, cfg: &StorageConfig, - read_pool: ReadPool, + read_pool: ReadPool, local_storage: Option>, raft_store_router: Option, ) -> Result>> diff --git a/src/server/readpool/builder.rs b/src/server/readpool/builder.rs new file mode 100644 index 00000000000..e6298edc32d --- /dev/null +++ b/src/server/readpool/builder.rs @@ -0,0 +1,89 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use tikv_util::future_pool::Builder as FuturePoolBuilder; + +use super::config::Config; + +pub struct Builder<'a> { + config: &'a Config, + builder_low: FuturePoolBuilder, + builder_normal: FuturePoolBuilder, + builder_high: FuturePoolBuilder, +} + +impl<'a> Builder<'a> { + pub fn from_config(config: &'a Config) -> Self { + let mut builder_low = FuturePoolBuilder::new(); + builder_low.pool_size(config.low_concurrency); + builder_low.stack_size(config.stack_size.0 as usize); + + let mut builder_normal = FuturePoolBuilder::new(); + builder_normal.pool_size(config.normal_concurrency); + builder_normal.stack_size(config.stack_size.0 as usize); + + let mut builder_high = FuturePoolBuilder::new(); + builder_high.pool_size(config.high_concurrency); + builder_high.stack_size(config.stack_size.0 as usize); + + Builder { + config, + builder_low, + builder_normal, + builder_high, + } + } + + pub fn name_prefix(&mut self, name: impl AsRef) -> &mut Self { + let name = name.as_ref(); + self.builder_low.name_prefix(format!("{}-low", name)); + self.builder_normal.name_prefix(format!("{}-normal", name)); + self.builder_high.name_prefix(format!("{}-high", name)); + self + } + + pub fn on_tick(&mut self, f: F) -> &mut Self + where + F: Fn() + Clone + Send + Sync + 'static, + { + self.builder_low.on_tick(f.clone()); + self.builder_normal.on_tick(f.clone()); + self.builder_high.on_tick(f); + self + } + + pub fn before_stop(&mut self, f: F) -> &mut Self + where + F: Fn() + Clone + Send + Sync + 'static, + { + self.builder_low.before_stop(f.clone()); + self.builder_normal.before_stop(f.clone()); + self.builder_high.before_stop(f); + self + } + + pub fn after_start(&mut self, f: F) -> &mut Self + where + F: Fn() + Clone + Send + Sync + 'static, + { + self.builder_low.after_start(f.clone()); + self.builder_normal.after_start(f.clone()); + self.builder_high.after_start(f); + self + } + + pub fn build(&mut self) -> super::ReadPool { + super::ReadPool { + pool_low: self.builder_low.build(), + pool_normal: self.builder_normal.build(), + pool_high: self.builder_high.build(), + max_tasks_low: self.config.max_tasks_per_worker_low * self.config.low_concurrency, + max_tasks_normal: self.config.max_tasks_per_worker_normal + * self.config.normal_concurrency, + max_tasks_high: self.config.max_tasks_per_worker_high * self.config.high_concurrency, + } + } + + pub fn build_for_test() -> super::ReadPool { + Builder::from_config(&Config::default_for_test()).build() + } +} diff --git a/src/server/readpool/mod.rs b/src/server/readpool/mod.rs index dcdef4c538c..6c781f48235 100644 --- a/src/server/readpool/mod.rs +++ b/src/server/readpool/mod.rs @@ -1,87 +1,39 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. +mod builder; pub mod config; mod priority; -use std::error::Error; -use std::fmt; -use std::time::Duration; - -use futures::Future; -use futures_cpupool::CpuFuture; - -use tikv_util::futurepool::{self, FuturePool}; - +pub use self::builder::Builder; pub use self::config::Config; pub use self::priority::Priority; -const TICK_INTERVAL_SEC: u64 = 1; +use futures::Future; +use tikv_util::future_pool::FuturePool; +use tokio_threadpool::SpawnHandle; + +type Result = std::result::Result; /// A priority-aware thread pool for executing futures. /// /// It is specifically used for all sorts of read operations like KV Get, /// KV Scan and Coprocessor Read to improve performance. -pub struct ReadPool { - pool_high: FuturePool, - pool_normal: FuturePool, - pool_low: FuturePool, +#[derive(Clone)] +pub struct ReadPool { + pool_high: FuturePool, + pool_normal: FuturePool, + pool_low: FuturePool, max_tasks_high: usize, max_tasks_normal: usize, max_tasks_low: usize, } -impl tikv_util::AssertSend for ReadPool {} -impl tikv_util::AssertSync for ReadPool {} - -impl Clone for ReadPool { - fn clone(&self) -> Self { - ReadPool { - pool_high: self.pool_high.clone(), - pool_normal: self.pool_normal.clone(), - pool_low: self.pool_low.clone(), - ..*self - } - } -} - -impl ReadPool { - /// Creates a new thread pool. - pub fn new(name_prefix: &str, config: &Config, context_factory: F) -> Self - where - F: Fn() -> T, - { - let tick_interval = Duration::from_secs(TICK_INTERVAL_SEC); - - ReadPool { - pool_high: FuturePool::new( - config.high_concurrency, - config.stack_size.0 as usize, - &format!("{}-high", name_prefix), - tick_interval, - &context_factory, - ), - pool_normal: FuturePool::new( - config.normal_concurrency, - config.stack_size.0 as usize, - &format!("{}-normal", name_prefix), - tick_interval, - &context_factory, - ), - pool_low: FuturePool::new( - config.low_concurrency, - config.stack_size.0 as usize, - &format!("{}-low", name_prefix), - tick_interval, - &context_factory, - ), - max_tasks_high: config.max_tasks_per_worker_high * config.high_concurrency, - max_tasks_normal: config.max_tasks_per_worker_normal * config.normal_concurrency, - max_tasks_low: config.max_tasks_per_worker_low * config.low_concurrency, - } - } +impl tikv_util::AssertSend for ReadPool {} +impl tikv_util::AssertSync for ReadPool {} +impl ReadPool { #[inline] - fn get_pool_by_priority(&self, priority: Priority) -> &FuturePool { + fn get_pool_by_priority(&self, priority: Priority) -> &FuturePool { match priority { Priority::High => &self.pool_high, Priority::Normal => &self.pool_normal, @@ -98,21 +50,12 @@ impl ReadPool { } } - /// Executes a future (generated by the `future_factory`) on specified future pool, - /// returning a success future representing the produced value, or a fail future if - /// the future pool is full. - pub fn future_execute( - &self, - priority: Priority, - future_factory: R, - ) -> Result, Full> + #[inline] + fn gate_spawn(&self, priority: Priority, f: F) -> Result where - R: FnOnce(futurepool::ContextDelegators) -> F + Send + 'static, - F: Future + Send + 'static, - F::Item: Send + 'static, - F::Error: Send + 'static, + F: FnOnce(&FuturePool) -> R, { - fail_point!("read_pool_execute_full", |_| Err(Full { + fail_point!("read_pool_spawn_full", |_| Err(Full { current_tasks: 100, max_tasks: 100, })); @@ -120,15 +63,41 @@ impl ReadPool { let pool = self.get_pool_by_priority(priority); let max_tasks = self.get_max_tasks_by_priority(priority); let current_tasks = pool.get_running_task_count(); + if current_tasks >= max_tasks { Err(Full { current_tasks, max_tasks, }) } else { - Ok(pool.spawn(future_factory)) + Ok(f(pool)) } } + + pub fn spawn(&self, priority: Priority, future_fn: F) -> Result<()> + where + F: FnOnce() -> R + Send + 'static, + R: Future + Send + 'static, + R::Item: Send + 'static, + R::Error: Send + 'static, + { + self.gate_spawn(priority, |pool| pool.spawn(future_fn)) + } + + #[must_use] + pub fn spawn_handle( + &self, + priority: Priority, + future_fn: F, + ) -> Result> + where + F: FnOnce() -> R + Send + 'static, + R: Future + Send + 'static, + R::Item: Send + 'static, + R::Error: Send + 'static, + { + self.gate_spawn(priority, |pool| pool.spawn_handle(future_fn)) + } } #[derive(Clone, Copy, PartialEq, Eq, Debug)] @@ -137,8 +106,8 @@ pub struct Full { pub max_tasks: usize, } -impl fmt::Display for Full { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { +impl std::fmt::Display for Full { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( fmt, "read pool is full, current task count = {}, max task count = {}", @@ -147,7 +116,7 @@ impl fmt::Display for Full { } } -impl Error for Full { +impl std::error::Error for Full { fn description(&self) -> &str { "read pool is full" } @@ -161,6 +130,7 @@ mod tests { use std::result; use std::sync::mpsc::{channel, Sender}; use std::thread; + use std::time::Duration; use super::*; @@ -182,19 +152,14 @@ mod tests { } } - #[derive(Debug)] - struct Context; - - impl futurepool::Context for Context {} - #[test] - fn test_future_execute() { - let read_pool = ReadPool::new("readpool", &Config::default_for_test(), || Context {}); + fn test_spawn_handle() { + let read_pool = Builder::build_for_test(); expect_val( vec![1, 2, 4], read_pool - .future_execute(Priority::High, |_| { + .spawn_handle(Priority::High, || { future::ok::, BoxError>(vec![1, 2, 4]) }) .unwrap() // unwrap Full error @@ -204,7 +169,7 @@ mod tests { expect_err( "foobar", read_pool - .future_execute(Priority::High, |_| { + .spawn_handle(Priority::High, || { future::err::<(), BoxError>(box_err!("foobar")) }) .unwrap() // unwrap Full error @@ -213,17 +178,17 @@ mod tests { } fn spawn_long_time_future( - pool: &ReadPool, + pool: &ReadPool, id: u64, future_duration_ms: u64, - ) -> Result, Full> { - pool.future_execute(Priority::High, move |_| { + ) -> Result> { + pool.spawn_handle(Priority::High, move || { thread::sleep(Duration::from_millis(future_duration_ms)); future::ok::(id) }) } - fn wait_on_new_thread(sender: Sender>, future: F) + fn wait_on_new_thread(sender: Sender>, future: F) where F: Future + Send + 'static, F::Item: Send + 'static, @@ -239,15 +204,13 @@ mod tests { fn test_full() { let (tx, rx) = channel(); - let read_pool = ReadPool::new( - "readpool", - &Config { - high_concurrency: 2, - max_tasks_per_worker_high: 2, - ..Config::default_for_test() - }, - || Context {}, - ); + let read_pool = builder::Builder::from_config(&Config { + high_concurrency: 2, + max_tasks_per_worker_high: 2, + ..Config::default_for_test() + }) + .name_prefix("read-test-full") + .build(); wait_on_new_thread( tx.clone(), @@ -293,10 +256,10 @@ mod tests { // full assert!(spawn_long_time_future(&read_pool, 8, 100).is_err()); - assert_eq!(rx.recv().unwrap(), Ok(2)); - assert_eq!(rx.recv().unwrap(), Ok(3)); - assert_eq!(rx.recv().unwrap(), Ok(7)); - assert_eq!(rx.recv().unwrap(), Ok(4)); + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); // no more results assert!(rx.recv_timeout(Duration::from_millis(500)).is_err()); diff --git a/src/server/readpool/priority.rs b/src/server/readpool/priority.rs index db9edccfc64..1023662654a 100644 --- a/src/server/readpool/priority.rs +++ b/src/server/readpool/priority.rs @@ -11,6 +11,16 @@ pub enum Priority { High, } +impl Priority { + pub fn as_str(self) -> &'static str { + match self { + Priority::Normal => "normal", + Priority::Low => "low", + Priority::High => "high", + } + } +} + impl From for Priority { fn from(p: kvrpcpb::CommandPri) -> Priority { match p { diff --git a/src/server/server.rs b/src/server/server.rs index 60f1574c44a..5835473f01e 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -240,12 +240,12 @@ mod tests { use crate::raftstore::store::transport::Transport; use crate::raftstore::store::*; use crate::raftstore::Result as RaftStoreResult; - use crate::server::readpool::{self, ReadPool}; + use crate::server::readpool; use crate::storage::TestStorageBuilder; + use kvproto::raft_cmdpb::RaftCmdRequest; use kvproto::raft_serverpb::RaftMessage; use tikv_util::security::SecurityConfig; - use tikv_util::worker::FutureWorker; #[derive(Clone)] struct MockResolver { @@ -321,12 +321,7 @@ mod tests { let cfg = Arc::new(cfg); let security_mgr = Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap()); - let pd_worker = FutureWorker::new("test-pd-worker"); - let cop_read_pool = ReadPool::new( - "cop-readpool", - &readpool::Config::default_for_test(), - || coprocessor::ReadPoolContext::new(pd_worker.scheduler()), - ); + let cop_read_pool = readpool::Builder::build_for_test(); let cop = coprocessor::Endpoint::new(&cfg, storage.get_engine(), cop_read_pool); let addr = Arc::new(Mutex::new(None)); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6993b060123..920aef395f4 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -5,7 +5,7 @@ pub mod gc_worker; pub mod kv; mod metrics; pub mod mvcc; -mod readpool_context; +pub mod readpool_impl; pub mod txn; pub mod types; @@ -23,7 +23,7 @@ use futures::{future, Future}; use kvproto::errorpb; use kvproto::kvrpcpb::{CommandPri, Context, KeyRange, LockInfo}; -use crate::server::readpool::{self, ReadPool}; +use crate::server::readpool::{self, Builder as ReadPoolBuilder, ReadPool}; use crate::server::ServerRaftStoreRouter; use tikv_util::collections::HashMap; use tikv_util::worker::{self, Builder, ScheduleError, Worker}; @@ -42,7 +42,7 @@ pub use self::kv::{ TestEngineBuilder, }; pub use self::mvcc::Scanner as StoreScanner; -pub use self::readpool_context::Context as ReadPoolContext; +pub use self::readpool_impl::*; pub use self::txn::{FixtureStore, FixtureStoreScanner}; pub use self::txn::{Msg, Scanner, Scheduler, SnapshotStore, Store}; pub use self::types::{Key, KvPair, MvccInfo, Value}; @@ -457,14 +457,7 @@ impl TestStorageBuilder { /// Build a `Storage`. pub fn build(self) -> Result> { - use tikv_util::worker::FutureWorker; - - let read_pool = { - let pd_worker = FutureWorker::new("test-future–worker"); - ReadPool::new("readpool", &readpool::Config::default_for_test(), || { - ReadPoolContext::new(pd_worker.scheduler()) - }) - }; + let read_pool = ReadPoolBuilder::from_config(&readpool::Config::default_for_test()).build(); Storage::from_engine( self.engine, &self.config, @@ -504,7 +497,7 @@ pub struct Storage { worker_scheduler: worker::Scheduler, /// The thread pool used to run most read operations. - read_pool: ReadPool, + read_pool: ReadPool, /// Used to handle requests related to GC. gc_worker: GCWorker, @@ -577,7 +570,7 @@ impl Storage { pub fn from_engine( engine: E, config: &Config, - read_pool: ReadPool, + read_pool: ReadPool, local_storage: Option>, raft_store_router: Option, ) -> Result { @@ -668,41 +661,37 @@ impl Storage { let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let res = self.read_pool.future_execute(priority, move |ctxd| { - let timer = { - let ctxd = ctxd.clone(); - let mut thread_ctx = ctxd.current_thread_context_mut(); - thread_ctx.start_command_duration_timer(CMD, priority) - }; + let res = self.read_pool.spawn_handle(priority, move || { + tls_collect_command_count(CMD, priority); + let command_duration = tikv_util::time::Instant::now_coarse(); Self::async_snapshot(engine, &ctx) .and_then(move |snapshot: E::Snap| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - - let mut statistics = Statistics::default(); - let snap_store = SnapshotStore::new( - snapshot, - start_ts, - ctx.get_isolation_level(), - !ctx.get_not_fill_cache(), - ); - let result = snap_store - .get(&key, &mut statistics) - // map storage::txn::Error -> storage::Error - .map_err(Error::from) - .map(|r| { - thread_ctx.collect_key_reads(CMD, 1); - r - }); - - thread_ctx.collect_scan_count(CMD, &statistics); - thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); - - result + tls_processing_read_observe_duration(CMD, || { + let mut statistics = Statistics::default(); + let snap_store = SnapshotStore::new( + snapshot, + start_ts, + ctx.get_isolation_level(), + !ctx.get_not_fill_cache(), + ); + let result = snap_store + .get(&key, &mut statistics) + // map storage::txn::Error -> storage::Error + .map_err(Error::from) + .map(|r| { + tls_collect_key_reads(CMD, 1); + r + }); + + tls_collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + + result + }) }) .then(move |r| { - timer.observe_duration(); + tls_collect_command_duration(CMD, command_duration.elapsed()); r }) }); @@ -724,45 +713,43 @@ impl Storage { let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let res = self.read_pool.future_execute(priority, move |ctxd| { - let timer = { - let ctxd = ctxd.clone(); - let mut thread_ctx = ctxd.current_thread_context_mut(); - thread_ctx.start_command_duration_timer(CMD, priority) - }; + let res = self.read_pool.spawn_handle(priority, move || { + tls_collect_command_count(CMD, priority); + let command_duration = tikv_util::time::Instant::now_coarse(); Self::async_snapshot(engine, &ctx) .and_then(move |snapshot: E::Snap| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - - let mut statistics = Statistics::default(); - let snap_store = SnapshotStore::new( - snapshot, - start_ts, - ctx.get_isolation_level(), - !ctx.get_not_fill_cache(), - ); - let kv_pairs: Vec<_> = snap_store - .batch_get(&keys, &mut statistics) - .into_iter() - .zip(keys) - .filter(|&(ref v, ref _k)| !(v.is_ok() && v.as_ref().unwrap().is_none())) - .map(|(v, k)| match v { - Ok(Some(x)) => Ok((k.into_raw().unwrap(), x)), - Err(e) => Err(Error::from(e)), - _ => unreachable!(), - }) - .collect(); - - thread_ctx.collect_key_reads(CMD, kv_pairs.len() as u64); - thread_ctx.collect_scan_count(CMD, &statistics); - thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); - - Ok(kv_pairs) + tls_processing_read_observe_duration(CMD, || { + let mut statistics = Statistics::default(); + let snap_store = SnapshotStore::new( + snapshot, + start_ts, + ctx.get_isolation_level(), + !ctx.get_not_fill_cache(), + ); + let kv_pairs: Vec<_> = snap_store + .batch_get(&keys, &mut statistics) + .into_iter() + .zip(keys) + .filter(|&(ref v, ref _k)| { + !(v.is_ok() && v.as_ref().unwrap().is_none()) + }) + .map(|(v, k)| match v { + Ok(Some(x)) => Ok((k.into_raw().unwrap(), x)), + Err(e) => Err(Error::from(e)), + _ => unreachable!(), + }) + .collect(); + + tls_collect_key_reads(CMD, kv_pairs.len()); + tls_collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + + Ok(kv_pairs) + }) }) .then(move |r| { - timer.observe_duration(); + tls_collect_command_duration(CMD, command_duration.elapsed()); r }) }); @@ -788,53 +775,53 @@ impl Storage { let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let res = self.read_pool.future_execute(priority, move |ctxd| { - let timer = { - let ctxd = ctxd.clone(); - let mut thread_ctx = ctxd.current_thread_context_mut(); - thread_ctx.start_command_duration_timer(CMD, priority) - }; + let res = self.read_pool.spawn_handle(priority, move || { + tls_collect_command_count(CMD, priority); + let command_duration = tikv_util::time::Instant::now_coarse(); Self::async_snapshot(engine, &ctx) .and_then(move |snapshot: E::Snap| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - - let snap_store = SnapshotStore::new( - snapshot, - start_ts, - ctx.get_isolation_level(), - !ctx.get_not_fill_cache(), - ); - - let mut scanner; - if !options.reverse_scan { - scanner = snap_store.scanner( - false, - options.key_only, - Some(start_key), - end_key, - )?; - } else { - scanner = - snap_store.scanner(true, options.key_only, end_key, Some(start_key))?; - }; - let res = scanner.scan(limit); - - let statistics = scanner.take_statistics(); - thread_ctx.collect_scan_count(CMD, &statistics); - thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); - - res.map_err(Error::from).map(|results| { - thread_ctx.collect_key_reads(CMD, results.len() as u64); - results - .into_iter() - .map(|x| x.map_err(Error::from)) - .collect() + tls_processing_read_observe_duration(CMD, || { + let snap_store = SnapshotStore::new( + snapshot, + start_ts, + ctx.get_isolation_level(), + !ctx.get_not_fill_cache(), + ); + + let mut scanner; + if !options.reverse_scan { + scanner = snap_store.scanner( + false, + options.key_only, + Some(start_key), + end_key, + )?; + } else { + scanner = snap_store.scanner( + true, + options.key_only, + end_key, + Some(start_key), + )?; + }; + let res = scanner.scan(limit); + + let statistics = scanner.take_statistics(); + tls_collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + + res.map_err(Error::from).map(|results| { + tls_collect_key_reads(CMD, results.len()); + results + .into_iter() + .map(|x| x.map_err(Error::from)) + .collect() + }) }) }) .then(move |r| { - timer.observe_duration(); + tls_collect_command_duration(CMD, command_duration.elapsed()); r }) }); @@ -1072,33 +1059,33 @@ impl Storage { let readpool = self.read_pool.clone(); Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.future_execute(priority, move |ctxd| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - let cf = match Self::rawkv_cf(&cf) { - Ok(x) => x, - Err(e) => return future::err(e), - }; - // no scan_count for this kind of op. - - let key_len = key.len(); - let result = snapshot - .get_cf(cf, &Key::from_encoded(key)) - // map storage::kv::Error -> storage::Error - .map_err(Error::from) - .map(|r| { - if let Some(ref value) = r { - let mut stats = Statistics::default(); - stats.data.flow_stats.read_keys = 1; - stats.data.flow_stats.read_bytes = key_len + value.len(); - thread_ctx.collect_read_flow(ctx.get_region_id(), &stats); - thread_ctx.collect_key_reads(CMD, 1); - } - r - }); + let res = readpool.spawn_handle(priority, move || { + tls_processing_read_observe_duration(CMD, || { + let cf = match Self::rawkv_cf(&cf) { + Ok(x) => x, + Err(e) => return future::err(e), + }; + // no scan_count for this kind of op. + + let key_len = key.len(); + let result = snapshot + .get_cf(cf, &Key::from_encoded(key)) + // map storage::engine::Error -> storage::Error + .map_err(Error::from) + .map(|r| { + if let Some(ref value) = r { + let mut stats = Statistics::default(); + stats.data.flow_stats.read_keys = 1; + stats.data.flow_stats.read_bytes = key_len + value.len(); + tls_collect_read_flow(ctx.get_region_id(), &stats); + tls_collect_key_reads(CMD, 1); + } + r + }); - timer.observe_duration(); - future::result(result) + timer.observe_duration(); + future::result(result) + }) }); future::result(res) .map_err(|_| Error::SchedTooBusy) @@ -1124,38 +1111,39 @@ impl Storage { let readpool = self.read_pool.clone(); Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.future_execute(priority, move |ctxd| { - let keys: Vec = keys.into_iter().map(Key::from_encoded).collect(); - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - let cf = match Self::rawkv_cf(&cf) { - Ok(x) => x, - Err(e) => return future::err(e), - }; - // no scan_count for this kind of op. - let mut stats = Statistics::default(); - let result: Vec> = keys - .into_iter() - .map(|k| { - let v = snapshot.get_cf(cf, &k); - (k, v) - }) - .filter(|&(_, ref v)| !(v.is_ok() && v.as_ref().unwrap().is_none())) - .map(|(k, v)| match v { - Ok(Some(v)) => { - stats.data.flow_stats.read_keys += 1; - stats.data.flow_stats.read_bytes += k.as_encoded().len() + v.len(); - Ok((k.into_encoded(), v)) - } - Err(e) => Err(Error::from(e)), - _ => unreachable!(), - }) - .collect(); - thread_ctx.collect_key_reads(CMD, stats.data.flow_stats.read_keys as u64); - thread_ctx.collect_read_flow(ctx.get_region_id(), &stats); + let res = readpool.spawn_handle(priority, move || { + tls_processing_read_observe_duration(CMD, || { + let keys: Vec = keys.into_iter().map(Key::from_encoded).collect(); + let cf = match Self::rawkv_cf(&cf) { + Ok(x) => x, + Err(e) => return future::err(e), + }; + // no scan_count for this kind of op. + let mut stats = Statistics::default(); + let result: Vec> = keys + .into_iter() + .map(|k| { + let v = snapshot.get_cf(cf, &k); + (k, v) + }) + .filter(|&(_, ref v)| !(v.is_ok() && v.as_ref().unwrap().is_none())) + .map(|(k, v)| match v { + Ok(Some(v)) => { + stats.data.flow_stats.read_keys += 1; + stats.data.flow_stats.read_bytes += k.as_encoded().len() + v.len(); + Ok((k.into_encoded(), v)) + } + Err(e) => Err(Error::from(e)), + _ => unreachable!(), + }) + .collect(); + + tls_collect_key_reads(CMD, stats.data.flow_stats.read_keys as usize); + tls_collect_read_flow(ctx.get_region_id(), &stats); - timer.observe_duration(); - future::ok(result) + timer.observe_duration(); + future::ok(result) + }) }); future::result(res) .map_err(|_| Error::SchedTooBusy) @@ -1401,43 +1389,42 @@ impl Storage { let readpool = self.read_pool.clone(); Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.future_execute(priority, move |ctxd| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); + let res = readpool.spawn_handle(priority, move || { + tls_processing_read_observe_duration(CMD, || { + let end_key = end_key.map(Key::from_encoded); - let end_key = end_key.map(Key::from_encoded); - - let mut statistics = Statistics::default(); - let result = if reverse { - Self::reverse_raw_scan( - &snapshot, - &cf, - &Key::from_encoded(key), - end_key, - limit, - &mut statistics, - key_only, - ) - .map_err(Error::from) - } else { - Self::raw_scan( - &snapshot, - &cf, - &Key::from_encoded(key), - end_key, - limit, - &mut statistics, - key_only, - ) - .map_err(Error::from) - }; + let mut statistics = Statistics::default(); + let result = if reverse { + Self::reverse_raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + end_key, + limit, + &mut statistics, + key_only, + ) + .map_err(Error::from) + } else { + Self::raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + end_key, + limit, + &mut statistics, + key_only, + ) + .map_err(Error::from) + }; - thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); - thread_ctx.collect_key_reads(CMD, statistics.write.flow_stats.read_keys as u64); - thread_ctx.collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + tls_collect_key_reads(CMD, statistics.write.flow_stats.read_keys as usize); + tls_collect_scan_count(CMD, &statistics); - timer.observe_duration(); - future::result(result) + timer.observe_duration(); + future::result(result) + }) }); future::result(res) .map_err(|_| Error::SchedTooBusy) @@ -1502,64 +1489,64 @@ impl Storage { let readpool = self.read_pool.clone(); Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { - let res = readpool.future_execute(priority, move |ctxd| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - - let mut statistics = Statistics::default(); - if !Self::check_key_ranges(&ranges, reverse) { - return future::result(Err(box_err!("Invalid KeyRanges"))); - }; - let mut result = Vec::new(); - let ranges_len = ranges.len(); - for i in 0..ranges_len { - let start_key = Key::from_encoded(ranges[i].take_start_key()); - let end_key = ranges[i].take_end_key(); - let end_key = if end_key.is_empty() { - if i + 1 == ranges_len { - None - } else { - Some(Key::from_encoded_slice(ranges[i + 1].get_start_key())) - } - } else { - Some(Key::from_encoded(end_key)) - }; - let pairs = if reverse { - match Self::reverse_raw_scan( - &snapshot, - &cf, - &start_key, - end_key, - each_limit, - &mut statistics, - key_only, - ) { - Ok(x) => x, - Err(e) => return future::err(e), - } - } else { - match Self::raw_scan( - &snapshot, - &cf, - &start_key, - end_key, - each_limit, - &mut statistics, - key_only, - ) { - Ok(x) => x, - Err(e) => return future::err(e), - } + let res = readpool.spawn_handle(priority, move || { + tls_processing_read_observe_duration(CMD, || { + let mut statistics = Statistics::default(); + if !Self::check_key_ranges(&ranges, reverse) { + return future::result(Err(box_err!("Invalid KeyRanges"))); }; - result.extend(pairs.into_iter()); - } + let mut result = Vec::new(); + let ranges_len = ranges.len(); + for i in 0..ranges_len { + let start_key = Key::from_encoded(ranges[i].take_start_key()); + let end_key = ranges[i].take_end_key(); + let end_key = if end_key.is_empty() { + if i + 1 == ranges_len { + None + } else { + Some(Key::from_encoded_slice(ranges[i + 1].get_start_key())) + } + } else { + Some(Key::from_encoded(end_key)) + }; + let pairs = if reverse { + match Self::reverse_raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + ) { + Ok(x) => x, + Err(e) => return future::err(e), + } + } else { + match Self::raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + ) { + Ok(x) => x, + Err(e) => return future::err(e), + } + }; + result.extend(pairs.into_iter()); + } - thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); - thread_ctx.collect_key_reads(CMD, statistics.write.flow_stats.read_keys as u64); - thread_ctx.collect_scan_count(CMD, &statistics); + tls_collect_read_flow(ctx.get_region_id(), &statistics); + tls_collect_key_reads(CMD, statistics.write.flow_stats.read_keys as usize); - timer.observe_duration(); - future::ok(result) + tls_collect_scan_count(CMD, &statistics); + + timer.observe_duration(); + future::ok(result) + }) }); future::result(res) .map_err(|_| Error::SchedTooBusy) diff --git a/src/storage/readpool_context.rs b/src/storage/readpool_context.rs deleted file mode 100644 index 4d247411043..00000000000 --- a/src/storage/readpool_context.rs +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. - -use prometheus::local::{LocalHistogramTimer, LocalHistogramVec, LocalIntCounterVec}; -use std::fmt; -use std::mem; - -use crate::pd; -use crate::server::readpool; -use crate::storage; -use tikv_util::collections::HashMap; -use tikv_util::futurepool; -use tikv_util::worker; - -use super::metrics::*; - -pub struct Context { - pd_sender: worker::FutureScheduler, - - // TODO: command_duration, processing_read_duration, command_counter can be merged together. - command_duration: LocalHistogramVec, - processing_read_duration: LocalHistogramVec, - command_keyreads: LocalHistogramVec, - // TODO: command_counter, command_pri_counter can be merged together. - command_counter: LocalIntCounterVec, - command_pri_counter: LocalIntCounterVec, - scan_details: LocalIntCounterVec, - - read_flow_stats: HashMap, -} - -impl fmt::Debug for Context { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("storage::ReadPoolContext").finish() - } -} - -impl Context { - pub fn new(pd_sender: worker::FutureScheduler) -> Self { - Context { - pd_sender, - command_duration: SCHED_HISTOGRAM_VEC.local(), - processing_read_duration: SCHED_PROCESSING_READ_HISTOGRAM_VEC.local(), - command_keyreads: KV_COMMAND_KEYREAD_HISTOGRAM_VEC.local(), - command_counter: KV_COMMAND_COUNTER_VEC.local(), - command_pri_counter: SCHED_COMMANDS_PRI_COUNTER_VEC.local(), - scan_details: KV_COMMAND_SCAN_DETAILS.local(), - read_flow_stats: HashMap::default(), - } - } - - #[inline] - pub fn start_command_duration_timer( - &mut self, - cmd: &str, - priority: readpool::Priority, - ) -> LocalHistogramTimer { - self.command_counter.with_label_values(&[cmd]).inc(); - self.command_pri_counter - .with_label_values(&[&priority.to_string()]) - .inc(); - self.command_duration - .with_label_values(&[cmd]) - .start_coarse_timer() - } - - #[inline] - pub fn start_processing_read_duration_timer(&mut self, cmd: &str) -> LocalHistogramTimer { - self.processing_read_duration - .with_label_values(&[cmd]) - .start_coarse_timer() - } - - #[inline] - pub fn collect_key_reads(&mut self, cmd: &str, count: u64) { - self.command_keyreads - .with_label_values(&[cmd]) - .observe(count as f64); - } - - #[inline] - pub fn collect_scan_count(&mut self, cmd: &str, statistics: &storage::Statistics) { - for (cf, details) in statistics.details() { - for (tag, count) in details { - self.scan_details - .with_label_values(&[cmd, cf, tag]) - .inc_by(count as i64); - } - } - } - - #[inline] - pub fn collect_read_flow(&mut self, region_id: u64, statistics: &storage::Statistics) { - let flow_stats = self - .read_flow_stats - .entry(region_id) - .or_insert_with(storage::FlowStatistics::default); - flow_stats.add(&statistics.write.flow_stats); - flow_stats.add(&statistics.data.flow_stats); - } -} - -impl futurepool::Context for Context { - fn on_tick(&mut self) { - // Flush Prometheus metrics - self.command_duration.flush(); - self.processing_read_duration.flush(); - self.command_keyreads.flush(); - self.command_counter.flush(); - self.command_pri_counter.flush(); - self.scan_details.flush(); - - // Report PD metrics - if !self.read_flow_stats.is_empty() { - let mut read_stats = HashMap::default(); - mem::swap(&mut read_stats, &mut self.read_flow_stats); - let result = self - .pd_sender - .schedule(pd::PdTask::ReadStats { read_stats }); - if let Err(e) = result { - error!("Failed to send readpool read flow statistics"; "err" => ?e); - } - } - } -} diff --git a/src/storage/readpool_impl.rs b/src/storage/readpool_impl.rs new file mode 100644 index 00000000000..d3d6232c2fc --- /dev/null +++ b/src/storage/readpool_impl.rs @@ -0,0 +1,160 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::cell::RefCell; +use std::time::Duration; + +use prometheus::local::*; + +use crate::pd::PdTask; +use crate::server::readpool::{self, Builder, ReadPool}; +use tikv_util::collections::HashMap; +use tikv_util::worker::FutureScheduler; + +use super::metrics::*; + +pub struct StorageLocalMetrics { + local_sched_histogram_vec: LocalHistogramVec, + local_sched_processing_read_histogram_vec: LocalHistogramVec, + local_kv_command_keyread_histogram_vec: LocalHistogramVec, + local_kv_command_counter_vec: LocalIntCounterVec, + local_sched_commands_pri_counter_vec: LocalIntCounterVec, + local_kv_command_scan_details: LocalIntCounterVec, + local_read_flow_stats: HashMap, +} + +thread_local! { + static TLS_STORAGE_METRICS: RefCell = RefCell::new( + StorageLocalMetrics { + local_sched_histogram_vec: SCHED_HISTOGRAM_VEC.local(), + local_sched_processing_read_histogram_vec: SCHED_PROCESSING_READ_HISTOGRAM_VEC.local(), + local_kv_command_keyread_histogram_vec: KV_COMMAND_KEYREAD_HISTOGRAM_VEC.local(), + local_kv_command_counter_vec: KV_COMMAND_COUNTER_VEC.local(), + local_sched_commands_pri_counter_vec: SCHED_COMMANDS_PRI_COUNTER_VEC.local(), + local_kv_command_scan_details: KV_COMMAND_SCAN_DETAILS.local(), + local_read_flow_stats: HashMap::default(), + } + ); +} + +pub fn build_read_pool( + config: &readpool::Config, + pd_sender: FutureScheduler, + name_prefix: &str, +) -> ReadPool { + let pd_sender2 = pd_sender.clone(); + + Builder::from_config(config) + .name_prefix(name_prefix) + .on_tick(move || tls_flush(&pd_sender)) + .before_stop(move || tls_flush(&pd_sender2)) + .build() +} + +#[inline] +fn tls_flush(pd_sender: &FutureScheduler) { + TLS_STORAGE_METRICS.with(|m| { + let mut storage_metrics = m.borrow_mut(); + // Flush Prometheus metrics + storage_metrics.local_sched_histogram_vec.flush(); + storage_metrics + .local_sched_processing_read_histogram_vec + .flush(); + storage_metrics + .local_kv_command_keyread_histogram_vec + .flush(); + storage_metrics.local_kv_command_counter_vec.flush(); + storage_metrics.local_sched_commands_pri_counter_vec.flush(); + storage_metrics.local_kv_command_scan_details.flush(); + + // Report PD metrics + if storage_metrics.local_read_flow_stats.is_empty() { + // Stats to report to PD is empty, ignore. + return; + } + + let read_stats = storage_metrics.local_read_flow_stats.clone(); + storage_metrics.local_read_flow_stats = HashMap::default(); + + let result = pd_sender.schedule(PdTask::ReadStats { read_stats }); + if let Err(e) = result { + error!("Failed to send read pool read flow statistics"; "err" => ?e); + } + }); +} + +#[inline] +pub fn tls_collect_command_count(cmd: &str, priority: readpool::Priority) { + TLS_STORAGE_METRICS.with(|m| { + let mut storage_metrics = m.borrow_mut(); + storage_metrics + .local_kv_command_counter_vec + .with_label_values(&[cmd]) + .inc(); + storage_metrics + .local_sched_commands_pri_counter_vec + .with_label_values(&[priority.as_str()]) + .inc(); + }); +} + +#[inline] +pub fn tls_collect_command_duration(cmd: &str, duration: Duration) { + TLS_STORAGE_METRICS.with(|m| { + m.borrow_mut() + .local_sched_histogram_vec + .with_label_values(&[cmd]) + .observe(tikv_util::time::duration_to_sec(duration)) + }); +} + +#[inline] +pub fn tls_collect_key_reads(cmd: &str, count: usize) { + TLS_STORAGE_METRICS.with(|m| { + m.borrow_mut() + .local_kv_command_keyread_histogram_vec + .with_label_values(&[cmd]) + .observe(count as f64) + }); +} + +#[inline] +pub fn tls_processing_read_observe_duration(cmd: &str, f: F) -> R +where + F: FnOnce() -> R, +{ + TLS_STORAGE_METRICS.with(|m| { + let now = tikv_util::time::Instant::now_coarse(); + let ret = f(); + m.borrow_mut() + .local_sched_processing_read_histogram_vec + .with_label_values(&[cmd]) + .observe(now.elapsed_secs()); + ret + }) +} + +#[inline] +pub fn tls_collect_scan_count(cmd: &str, statistics: &crate::storage::Statistics) { + TLS_STORAGE_METRICS.with(|m| { + let histogram = &mut m.borrow_mut().local_kv_command_scan_details; + for (cf, details) in statistics.details() { + for (tag, count) in details { + histogram + .with_label_values(&[cmd, cf, tag]) + .inc_by(count as i64); + } + } + }); +} + +#[inline] +pub fn tls_collect_read_flow(region_id: u64, statistics: &crate::storage::Statistics) { + TLS_STORAGE_METRICS.with(|m| { + let map = &mut m.borrow_mut().local_read_flow_stats; + let flow_stats = map + .entry(region_id) + .or_insert_with(crate::storage::FlowStatistics::default); + flow_stats.add(&statistics.write.flow_stats); + flow_stats.add(&statistics.data.flow_stats); + }); +} diff --git a/tests/failpoints/cases/test_coprocessor.rs b/tests/failpoints/cases/test_coprocessor.rs index cc439a25ec7..340d409b3b8 100644 --- a/tests/failpoints/cases/test_coprocessor.rs +++ b/tests/failpoints/cases/test_coprocessor.rs @@ -72,7 +72,7 @@ fn test_readpool_full() { let (_, endpoint) = init_with_data(&product, &[]); let req = DAGSelect::from(&product).build(); - fail::cfg("read_pool_execute_full", "return()").unwrap(); + fail::cfg("read_pool_spawn_full", "return()").unwrap(); let resp = handle_request(&endpoint, req); assert!(resp.get_region_error().has_server_is_busy()); diff --git a/tests/integrations/coprocessor/test_select.rs b/tests/integrations/coprocessor/test_select.rs index 50c9821d565..2b1a681abd8 100644 --- a/tests/integrations/coprocessor/test_select.rs +++ b/tests/integrations/coprocessor/test_select.rs @@ -14,7 +14,6 @@ use tipb::select::Chunk; use test_coprocessor::*; use test_storage::*; use tikv::coprocessor::codec::{datum, Datum}; -use tikv::server::readpool; use tikv::server::Config; use tikv::storage::TestEngineBuilder; use tikv_util::codec::number::*; @@ -69,15 +68,7 @@ fn test_batch_row_limit() { let engine = TestEngineBuilder::new().build().unwrap(); let mut cfg = Config::default(); cfg.end_point_batch_row_limit = batch_row_limit; - init_data_with_details( - Context::new(), - engine, - &product, - &data, - true, - &cfg, - &readpool::Config::default_for_test(), - ) + init_data_with_details(Context::new(), engine, &product, &data, true, &cfg) }; // for dag selection @@ -110,15 +101,7 @@ fn test_stream_batch_row_limit() { let engine = TestEngineBuilder::new().build().unwrap(); let mut cfg = Config::default(); cfg.end_point_stream_batch_row_limit = stream_row_limit; - init_data_with_details( - Context::new(), - engine, - &product, - &data, - true, - &cfg, - &readpool::Config::default_for_test(), - ) + init_data_with_details(Context::new(), engine, &product, &data, true, &cfg) }; let req = DAGSelect::from(&product).build(); @@ -203,15 +186,7 @@ fn test_scan_detail() { let engine = TestEngineBuilder::new().build().unwrap(); let mut cfg = Config::default(); cfg.end_point_batch_row_limit = 50; - init_data_with_details( - Context::new(), - engine, - &product, - &data, - true, - &cfg, - &readpool::Config::default_for_test(), - ) + init_data_with_details(Context::new(), engine, &product, &data, true, &cfg) }; let reqs = vec![