Skip to content

Commit

Permalink
[#109] WIP on exposing threading config. See notes in blocking_ops.rs…
Browse files Browse the repository at this point in the history
… for issues encountered.
  • Loading branch information
clintfred committed Jan 27, 2020
1 parent 606aecd commit 4374db8
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 39 deletions.
76 changes: 64 additions & 12 deletions src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//!
//! # Optional
//! This requires the optional `blocking` feature to be enabled.
pub use crate::internal::IronOxideErr;
pub use crate::internal::{
document_api::{
AssociationType, DocAccessEditErr, DocumentAccessResult, DocumentDecryptResult,
Expand Down Expand Up @@ -227,7 +228,7 @@ impl BlockingIronOxide {
password: &str,
user_create_opts: &UserCreateOpts,
) -> Result<UserCreateResult> {
let rt = create_runtime();
let rt = create_runtime(&Default::default())?;
rt.enter(|| block_on(IronOxide::user_create(jwt, password, user_create_opts)))
}
/// See [ironoxide::user::UserOps::user_list_devices()](trait.UserOps.html#tymethod.user_list_devices)
Expand All @@ -241,7 +242,7 @@ impl BlockingIronOxide {
password: &str,
device_create_options: &DeviceCreateOpts,
) -> Result<DeviceAddResult> {
let rt = create_runtime();
let rt = create_runtime(&Default::default())?;
rt.enter(|| {
block_on(IronOxide::generate_new_device(
jwt,
Expand All @@ -257,7 +258,7 @@ impl BlockingIronOxide {
}
/// See [ironoxide::user::UserOps::user_verify()](trait.UserOps.html#tymethod.user_verify)
pub fn user_verify(jwt: &str) -> Result<Option<UserResult>> {
let rt = create_runtime();
let rt = create_runtime(&Default::default())?;
rt.enter(|| block_on(IronOxide::user_verify(jwt)))
}
/// See [ironoxide::user::UserOps::user_get_public_key()](trait.UserOps.html#tymethod.user_get_public_key)
Expand All @@ -274,19 +275,69 @@ impl BlockingIronOxide {

/// Creates a tokio runtime with the default number of core threads (num of cores on a machine)
/// and an elevated number of blocking_threads as we expect heavy concurrency to be network-bound
fn create_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new()
fn create_runtime(rt_config: &RuntimeConfig) -> Result<tokio::runtime::Runtime> {
let mut builder = tokio::runtime::Builder::new();

dbg!(&rt_config);

builder
.threaded_scheduler() // use multi-threaded scheduler
.enable_all() // enable both I/O and time drivers
.max_threads(250) // core_threads default to number of cores, blocking threads are max - core
.build()
.expect("tokio runtime failed to initialize")
.enable_all(); // enable both I/O and time drivers
rt_config
.core_threads
.map(|core_threads| builder.core_threads(core_threads));
rt_config
.max_threads
.map(|max_threads| builder.max_threads(max_threads));
rt_config
.thread_stack_size_bytes
.map(|thread_stack_size| builder.thread_stack_size(thread_stack_size));
dbg!(&builder);

let rt = builder.build().map_err(|_| IronOxideErr::InitializeError);
dbg!(&rt);
rt
}

pub struct BlockingIronOxideConfig {
pub rt_config: RuntimeConfig,
}

#[derive(Debug, Clone)]
pub struct RuntimeConfig {
pub core_threads: Option<usize>,
pub max_threads: Option<usize>,
pub thread_stack_size_bytes: Option<usize>,
}

impl Default for RuntimeConfig {
fn default() -> Self {
RuntimeConfig {
core_threads: None,
max_threads: None,
thread_stack_size_bytes: None,
}
}
}

impl Default for BlockingIronOxideConfig {
fn default() -> Self {
BlockingIronOxideConfig {
rt_config: Default::default(),
}
}
}

/// Initialize the BlockingIronOxide SDK with a device. Verifies that the provided user/segment exists and the provided device
/// keys are valid and exist for the provided account. If successful, returns instance of the BlockingIronOxide SDK.
pub fn initialize(device_context: &DeviceContext) -> Result<BlockingIronOxide> {
let rt = create_runtime();
pub fn initialize(
device_context: &DeviceContext,
config: BlockingIronOxideConfig,
) -> Result<BlockingIronOxide> {
println!("initialize: {:?}", &config.rt_config);
let rt = create_runtime(&config.rt_config)?;
println!("initialize RT: {:?}", &rt.handle());

let maybe_io = rt.enter(|| block_on(crate::initialize(device_context)));
maybe_io.map(|io| BlockingIronOxide {
ironoxide: io,
Expand All @@ -299,8 +350,9 @@ pub fn initialize(device_context: &DeviceContext) -> Result<BlockingIronOxide> {
/// for private key rotation.
pub fn initialize_check_rotation(
device_context: &DeviceContext,
config: BlockingIronOxideConfig,
) -> Result<InitAndRotationCheck<BlockingIronOxide>> {
let rt = create_runtime();
let rt = create_runtime(&config.rt_config)?;
let maybe_init = rt.enter(|| block_on(crate::initialize_check_rotation(device_context)));
maybe_init.map(|init| match init {
NoRotationNeeded(io) => NoRotationNeeded(BlockingIronOxide {
Expand Down
3 changes: 3 additions & 0 deletions src/document/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
Result,
};
use itertools::{Either, EitherOrBoth, Itertools};
use tokio::time::Duration;

/// Advanced document operations
pub mod advanced;
Expand Down Expand Up @@ -215,8 +216,10 @@ impl DocumentOps for crate::IronOxide {
document_data: &[u8],
encrypt_opts: &DocumentEncryptOpts,
) -> Result<DocumentEncryptResult> {
// std::thread::sl
let encrypt_opts = encrypt_opts.clone();

// std::thread::sleep(Duration::from_millis(200));
let (explicit_users, explicit_groups, grant_to_author, policy_grants) =
match encrypt_opts.grants {
EitherOrBoth::Left(explicit_grants) => {
Expand Down
49 changes: 45 additions & 4 deletions tests/blocking_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ mod common;
#[cfg(feature = "blocking")]
mod integration_tests {
use crate::common::{create_id_all_classes, gen_jwt, USER_PASSWORD};
use ironoxide::blocking::{BlockingIronOxideConfig, RuntimeConfig};
use ironoxide::{
blocking::BlockingIronOxide,
group::GroupCreateOpts,
user::{UserCreateOpts, UserId},
InitAndRotationCheck, IronOxideErr,
};
use std::convert::TryInto;
use std::sync::Arc;
use tokio::time::Duration;

// Tests a UserOp (user_create/generate_new_device), a GroupOp (group_create),
// and ironoxide::blocking functions (initialize/initialize_check_rotation)
Expand All @@ -26,7 +29,7 @@ mod integration_tests {
&Default::default(),
)?
.into();
let creator_sdk = ironoxide::blocking::initialize(&device)?;
let creator_sdk = ironoxide::blocking::initialize(&device, Default::default())?;
// making non-default groups so I can specify needs_rotation of true
let group_create = creator_sdk.group_create(&GroupCreateOpts::new(
None,
Expand All @@ -40,7 +43,8 @@ mod integration_tests {
))?;
assert_eq!(group_create.needs_rotation(), Some(true));

let init_and_rotation_check = ironoxide::blocking::initialize_check_rotation(&device)?;
let init_and_rotation_check =
ironoxide::blocking::initialize_check_rotation(&device, Default::default())?;
let (user_result, group_result) = match init_and_rotation_check {
InitAndRotationCheck::NoRotationNeeded(_) => {
panic!("both user and groups should need rotation!");
Expand Down Expand Up @@ -74,8 +78,45 @@ mod integration_tests {
&Default::default(),
)?
.into();
let sdk = ironoxide::blocking::initialize(&device)?;
let doc = [0u8; 64];
// let sdk = ironoxide::blocking::initialize(&device, Default::default())?;
let sdk = Arc::new(ironoxide::blocking::initialize(
&device,
BlockingIronOxideConfig {
rt_config: RuntimeConfig {
// TODO this doesn't seem to have the intended effect when executing inside of std::thread::spawn below
core_threads: Some(2),
max_threads: Some(3),
thread_stack_size_bytes: None,
},
},
)?);
let doc = [42u8; 64];

// Spin up an absurd number of threads in hopes that the Runtime internal to the sdk will limit the number
// that concurrently execute.
// TODO: It would be expected that all the threads spin up, but are then blocked when they try to use the SDK's
// TODO: internal Runtime, but that doesn't seem to be happening. The config above seems to have no affect
// TODO: on the number of CPU cores that are concurrently utilized.
// TODO See the test in document_ops where the shared Runtime is created in the test and the configs do seem to
// TODO: limit the number of concurrently executing threads
// TODO: Perhaps this has to do with the different between the blocking API's use of `block_on` vs the
// TODO: async test's usage of `tokio::spawn` to run tasks. More investigation is needed.
let mut threads = vec![];
for _i in 0..2000 {
let sdk_ref = sdk.clone();

threads.push(std::thread::spawn(move || {
dbg!(&_i);
let _result = sdk_ref.document_encrypt(&doc, &Default::default()).unwrap();
}));
}
let mut joined_count = 0;
for t in threads {
t.join().expect("couldn't join");
joined_count += 1;
}

assert_eq!(joined_count, 2000);
let doc_result = sdk.document_encrypt(&doc, &Default::default())?;
assert_eq!(doc_result.grants().len(), 1);
assert_eq!(doc_result.access_errs().len(), 0);
Expand Down
65 changes: 42 additions & 23 deletions tests/document_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ironoxide::{
};
use itertools::EitherOrBoth;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;

#[tokio::test]
async fn doc_roundtrip_empty_data() -> Result<(), IronOxideErr> {
Expand Down Expand Up @@ -785,29 +786,47 @@ async fn doc_revoke_access() -> Result<(), IronOxideErr> {
Ok(())
}

//#[tokio::test]
//async fn doc_encrypt_concurrent() -> Result<(), IronOxideErr> {
// let sdk = Arc::new(initialize_sdk()?);
// let doc = [43u8; 64];
// let _encrypted_doc = sdk.document_encrypt(&doc, &Default::default()).await?;
//
// let mut threads = vec![];
// for _i in 0..10 {
// let sdk_ref = sdk.clone();
// threads.push(std::thread::spawn(move || {
// let _result = sdk_ref.document_encrypt(&doc, &Default::default()).unwrap();
// }));
// }
//
// let mut joined_count = 0;
// for t in threads {
// t.join().expect("couldn't join");
// joined_count += 1;
// }
//
// assert_eq!(joined_count, 10);
// Ok(())
//}
#[test]
fn doc_encrypt_concurrent() -> Result<(), IronOxideErr> {
use tokio::prelude::*;
let mut rt = tokio::runtime::Builder::default()
.enable_all()
.threaded_scheduler()
.core_threads(2)
.max_threads(3)
.build()
.unwrap();

let doc = [43u8; 64];
// let _encrypted_doc = sdk.document_encrypt(&doc, &Default::default()).await?;

// lots of Futures are spawned onto the shared Runtime, but they execute according to the
// config above
rt.block_on(async move {
let mut handles = vec![];
let sdk = Arc::new(initialize_sdk().await.unwrap());
for _i in 0..10000 {
let sdk = sdk.clone();
handles.push(tokio::spawn(async move {
let opts = DocumentEncryptOpts::default();
if _i % 100 == 0 {
dbg!(&_i);
}
sdk.document_encrypt(&doc, &opts).await;
}));
}
futures::future::join_all(handles).await;

// let mut joined_count = 0;
// for t in threads {
// t.await;
// joined_count += 1;
// }
});
//
// assert_eq!(joined_count, 10);
Ok(())
}

trait WithGrantsAndErrs {
fn grants(&self) -> Vec<UserOrGroup>;
Expand Down

0 comments on commit 4374db8

Please sign in to comment.