From 4374db8f75fa141f83ab6bde1d650c80122008b3 Mon Sep 17 00:00:00 2001 From: Clint Frederickson Date: Mon, 27 Jan 2020 10:51:46 -0700 Subject: [PATCH] [#109] WIP on exposing threading config. See notes in blocking_ops.rs for issues encountered. --- src/blocking.rs | 76 ++++++++++++++++++++++++++++++++++++------- src/document/mod.rs | 3 ++ tests/blocking_ops.rs | 49 +++++++++++++++++++++++++--- tests/document_ops.rs | 65 +++++++++++++++++++++++------------- 4 files changed, 154 insertions(+), 39 deletions(-) diff --git a/src/blocking.rs b/src/blocking.rs index 452d364e..b4f20940 100644 --- a/src/blocking.rs +++ b/src/blocking.rs @@ -7,6 +7,7 @@ //! //! # Optional //! This requires the optional `blocking` feature to be enabled. +pub use crate::internal::IronOxideErr; pub use crate::internal::{ document_api::{ AssociationType, DocAccessEditErr, DocumentAccessResult, DocumentDecryptResult, @@ -227,7 +228,7 @@ impl BlockingIronOxide { password: &str, user_create_opts: &UserCreateOpts, ) -> Result { - let rt = create_runtime(); + let rt = create_runtime(&Default::default())?; rt.enter(|| block_on(IronOxide::user_create(jwt, password, user_create_opts))) } /// See [ironoxide::user::UserOps::user_list_devices()](trait.UserOps.html#tymethod.user_list_devices) @@ -241,7 +242,7 @@ impl BlockingIronOxide { password: &str, device_create_options: &DeviceCreateOpts, ) -> Result { - let rt = create_runtime(); + let rt = create_runtime(&Default::default())?; rt.enter(|| { block_on(IronOxide::generate_new_device( jwt, @@ -257,7 +258,7 @@ impl BlockingIronOxide { } /// See [ironoxide::user::UserOps::user_verify()](trait.UserOps.html#tymethod.user_verify) pub fn user_verify(jwt: &str) -> Result> { - let rt = create_runtime(); + let rt = create_runtime(&Default::default())?; rt.enter(|| block_on(IronOxide::user_verify(jwt))) } /// See [ironoxide::user::UserOps::user_get_public_key()](trait.UserOps.html#tymethod.user_get_public_key) @@ -274,19 +275,69 @@ impl BlockingIronOxide { /// Creates a tokio runtime with the default number of core threads (num of cores on a machine) /// and an elevated number of blocking_threads as we expect heavy concurrency to be network-bound -fn create_runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new() +fn create_runtime(rt_config: &RuntimeConfig) -> Result { + let mut builder = tokio::runtime::Builder::new(); + + dbg!(&rt_config); + + builder .threaded_scheduler() // use multi-threaded scheduler - .enable_all() // enable both I/O and time drivers - .max_threads(250) // core_threads default to number of cores, blocking threads are max - core - .build() - .expect("tokio runtime failed to initialize") + .enable_all(); // enable both I/O and time drivers + rt_config + .core_threads + .map(|core_threads| builder.core_threads(core_threads)); + rt_config + .max_threads + .map(|max_threads| builder.max_threads(max_threads)); + rt_config + .thread_stack_size_bytes + .map(|thread_stack_size| builder.thread_stack_size(thread_stack_size)); + dbg!(&builder); + + let rt = builder.build().map_err(|_| IronOxideErr::InitializeError); + dbg!(&rt); + rt +} + +pub struct BlockingIronOxideConfig { + pub rt_config: RuntimeConfig, +} + +#[derive(Debug, Clone)] +pub struct RuntimeConfig { + pub core_threads: Option, + pub max_threads: Option, + pub thread_stack_size_bytes: Option, +} + +impl Default for RuntimeConfig { + fn default() -> Self { + RuntimeConfig { + core_threads: None, + max_threads: None, + thread_stack_size_bytes: None, + } + } +} + +impl Default for BlockingIronOxideConfig { + fn default() -> Self { + BlockingIronOxideConfig { + rt_config: Default::default(), + } + } } /// Initialize the BlockingIronOxide SDK with a device. Verifies that the provided user/segment exists and the provided device /// keys are valid and exist for the provided account. If successful, returns instance of the BlockingIronOxide SDK. -pub fn initialize(device_context: &DeviceContext) -> Result { - let rt = create_runtime(); +pub fn initialize( + device_context: &DeviceContext, + config: BlockingIronOxideConfig, +) -> Result { + println!("initialize: {:?}", &config.rt_config); + let rt = create_runtime(&config.rt_config)?; + println!("initialize RT: {:?}", &rt.handle()); + let maybe_io = rt.enter(|| block_on(crate::initialize(device_context))); maybe_io.map(|io| BlockingIronOxide { ironoxide: io, @@ -299,8 +350,9 @@ pub fn initialize(device_context: &DeviceContext) -> Result { /// for private key rotation. pub fn initialize_check_rotation( device_context: &DeviceContext, + config: BlockingIronOxideConfig, ) -> Result> { - let rt = create_runtime(); + let rt = create_runtime(&config.rt_config)?; let maybe_init = rt.enter(|| block_on(crate::initialize_check_rotation(device_context))); maybe_init.map(|init| match init { NoRotationNeeded(io) => NoRotationNeeded(BlockingIronOxide { diff --git a/src/document/mod.rs b/src/document/mod.rs index 05bac524..05f84eb4 100644 --- a/src/document/mod.rs +++ b/src/document/mod.rs @@ -13,6 +13,7 @@ use crate::{ Result, }; use itertools::{Either, EitherOrBoth, Itertools}; +use tokio::time::Duration; /// Advanced document operations pub mod advanced; @@ -215,8 +216,10 @@ impl DocumentOps for crate::IronOxide { document_data: &[u8], encrypt_opts: &DocumentEncryptOpts, ) -> Result { + // std::thread::sl let encrypt_opts = encrypt_opts.clone(); + // std::thread::sleep(Duration::from_millis(200)); let (explicit_users, explicit_groups, grant_to_author, policy_grants) = match encrypt_opts.grants { EitherOrBoth::Left(explicit_grants) => { diff --git a/tests/blocking_ops.rs b/tests/blocking_ops.rs index 3ecbfc63..37760350 100644 --- a/tests/blocking_ops.rs +++ b/tests/blocking_ops.rs @@ -5,6 +5,7 @@ mod common; #[cfg(feature = "blocking")] mod integration_tests { use crate::common::{create_id_all_classes, gen_jwt, USER_PASSWORD}; + use ironoxide::blocking::{BlockingIronOxideConfig, RuntimeConfig}; use ironoxide::{ blocking::BlockingIronOxide, group::GroupCreateOpts, @@ -12,6 +13,8 @@ mod integration_tests { InitAndRotationCheck, IronOxideErr, }; use std::convert::TryInto; + use std::sync::Arc; + use tokio::time::Duration; // Tests a UserOp (user_create/generate_new_device), a GroupOp (group_create), // and ironoxide::blocking functions (initialize/initialize_check_rotation) @@ -26,7 +29,7 @@ mod integration_tests { &Default::default(), )? .into(); - let creator_sdk = ironoxide::blocking::initialize(&device)?; + let creator_sdk = ironoxide::blocking::initialize(&device, Default::default())?; // making non-default groups so I can specify needs_rotation of true let group_create = creator_sdk.group_create(&GroupCreateOpts::new( None, @@ -40,7 +43,8 @@ mod integration_tests { ))?; assert_eq!(group_create.needs_rotation(), Some(true)); - let init_and_rotation_check = ironoxide::blocking::initialize_check_rotation(&device)?; + let init_and_rotation_check = + ironoxide::blocking::initialize_check_rotation(&device, Default::default())?; let (user_result, group_result) = match init_and_rotation_check { InitAndRotationCheck::NoRotationNeeded(_) => { panic!("both user and groups should need rotation!"); @@ -74,8 +78,45 @@ mod integration_tests { &Default::default(), )? .into(); - let sdk = ironoxide::blocking::initialize(&device)?; - let doc = [0u8; 64]; + // let sdk = ironoxide::blocking::initialize(&device, Default::default())?; + let sdk = Arc::new(ironoxide::blocking::initialize( + &device, + BlockingIronOxideConfig { + rt_config: RuntimeConfig { + // TODO this doesn't seem to have the intended effect when executing inside of std::thread::spawn below + core_threads: Some(2), + max_threads: Some(3), + thread_stack_size_bytes: None, + }, + }, + )?); + let doc = [42u8; 64]; + + // Spin up an absurd number of threads in hopes that the Runtime internal to the sdk will limit the number + // that concurrently execute. + // TODO: It would be expected that all the threads spin up, but are then blocked when they try to use the SDK's + // TODO: internal Runtime, but that doesn't seem to be happening. The config above seems to have no affect + // TODO: on the number of CPU cores that are concurrently utilized. + // TODO See the test in document_ops where the shared Runtime is created in the test and the configs do seem to + // TODO: limit the number of concurrently executing threads + // TODO: Perhaps this has to do with the different between the blocking API's use of `block_on` vs the + // TODO: async test's usage of `tokio::spawn` to run tasks. More investigation is needed. + let mut threads = vec![]; + for _i in 0..2000 { + let sdk_ref = sdk.clone(); + + threads.push(std::thread::spawn(move || { + dbg!(&_i); + let _result = sdk_ref.document_encrypt(&doc, &Default::default()).unwrap(); + })); + } + let mut joined_count = 0; + for t in threads { + t.join().expect("couldn't join"); + joined_count += 1; + } + + assert_eq!(joined_count, 2000); let doc_result = sdk.document_encrypt(&doc, &Default::default())?; assert_eq!(doc_result.grants().len(), 1); assert_eq!(doc_result.access_errs().len(), 0); diff --git a/tests/document_ops.rs b/tests/document_ops.rs index a322f96a..54015410 100644 --- a/tests/document_ops.rs +++ b/tests/document_ops.rs @@ -13,6 +13,7 @@ use ironoxide::{ }; use itertools::EitherOrBoth; use std::convert::{TryFrom, TryInto}; +use std::sync::Arc; #[tokio::test] async fn doc_roundtrip_empty_data() -> Result<(), IronOxideErr> { @@ -785,29 +786,47 @@ async fn doc_revoke_access() -> Result<(), IronOxideErr> { Ok(()) } -//#[tokio::test] -//async fn doc_encrypt_concurrent() -> Result<(), IronOxideErr> { -// let sdk = Arc::new(initialize_sdk()?); -// let doc = [43u8; 64]; -// let _encrypted_doc = sdk.document_encrypt(&doc, &Default::default()).await?; -// -// let mut threads = vec![]; -// for _i in 0..10 { -// let sdk_ref = sdk.clone(); -// threads.push(std::thread::spawn(move || { -// let _result = sdk_ref.document_encrypt(&doc, &Default::default()).unwrap(); -// })); -// } -// -// let mut joined_count = 0; -// for t in threads { -// t.join().expect("couldn't join"); -// joined_count += 1; -// } -// -// assert_eq!(joined_count, 10); -// Ok(()) -//} +#[test] +fn doc_encrypt_concurrent() -> Result<(), IronOxideErr> { + use tokio::prelude::*; + let mut rt = tokio::runtime::Builder::default() + .enable_all() + .threaded_scheduler() + .core_threads(2) + .max_threads(3) + .build() + .unwrap(); + + let doc = [43u8; 64]; + // let _encrypted_doc = sdk.document_encrypt(&doc, &Default::default()).await?; + + // lots of Futures are spawned onto the shared Runtime, but they execute according to the + // config above + rt.block_on(async move { + let mut handles = vec![]; + let sdk = Arc::new(initialize_sdk().await.unwrap()); + for _i in 0..10000 { + let sdk = sdk.clone(); + handles.push(tokio::spawn(async move { + let opts = DocumentEncryptOpts::default(); + if _i % 100 == 0 { + dbg!(&_i); + } + sdk.document_encrypt(&doc, &opts).await; + })); + } + futures::future::join_all(handles).await; + + // let mut joined_count = 0; + // for t in threads { + // t.await; + // joined_count += 1; + // } + }); + // + // assert_eq!(joined_count, 10); + Ok(()) +} trait WithGrantsAndErrs { fn grants(&self) -> Vec;