Skip to content

Commit

Permalink
Merge pull request #37 from gilescope/refactor
Browse files Browse the repository at this point in the history
move webworker code to own module
  • Loading branch information
gilescope authored Sep 19, 2022
2 parents 2ac0cc8 + 6ed32a1 commit 3b2fb76
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 107 deletions.
115 changes: 8 additions & 107 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ mod datasource;
mod movement;
mod style;
mod ui;

#[cfg(target_family = "wasm")]
pub mod webworker;
#[cfg(target_family = "wasm")]
pub use webworker::IOWorker;

use crate::ui::{Details, DotUrl};
// use bevy_inspector_egui::RegisterInspectable;
// use bevy_inspector_egui::WorldInspectorPlugin;
Expand Down Expand Up @@ -542,8 +548,8 @@ fn source_data(

#[cfg(target_arch = "wasm32")]
use gloo_worker::WorkerBridge;
#[cfg(target_arch = "wasm32")]
let bridge: WorkerBridge<IOWorker> = IOWorker::spawner()
#[cfg(target_family = "wasm")]
let bridge: WorkerBridge<IOWorker> = crate::webworker::IOWorker::spawner()
.callback(|result| {
UPDATE_QUEUE.lock().unwrap().extend(result);
})
Expand Down Expand Up @@ -2010,113 +2016,8 @@ pub mod html_body {
// }
}



#[cfg(target_arch = "wasm32")]
use gloo_worker::{HandlerId, Worker};

#[cfg(target_arch = "wasm32")]
pub struct IOWorker {}

#[cfg(target_arch = "wasm32")]
impl IOWorker {
pub async fn async_update(_msg: <Self as Worker>::Message) {
log!("Got update");
async_std::task::sleep(Duration::from_secs(5)).await;
async_std::task::sleep(Duration::from_secs(5)).await;
log!("Finished waiting");
}

async fn send_it_too(blocks: Vec<datasource::DataUpdate>) {
// web_sys::console::log_1(&format!("got block. add to worker queue{}",
// blocks.len()).into());

// Could move this earlier to when a block is produced by relay chain?
let mut base_time = *BASETIME.lock().unwrap();
if base_time == 0 {
if let datasource::DataUpdate::NewBlock(block) = &blocks[0] {
base_time = block.timestamp.unwrap_or(0);
web_sys::console::log_1(&format!("BASETIME set to {}", base_time).into());
*BASETIME.lock().unwrap() = base_time;
}
}

UPDATE_QUEUE.lock().unwrap().extend(blocks);
// web_sys::console::log_1(&format!("added to worker queue").into());
}
}

#[derive(Deserialize, Serialize)]
pub enum BridgeMessage {
SetDatasource(Vec<Vec<ChainInfo>>, Option<DotUrl>, u32), //data epoc
GetNewBlocks,
}

#[cfg(target_arch = "wasm32")]
use gloo_worker::WorkerScope;

#[cfg(target_arch = "wasm32")]
impl Worker for IOWorker {
type Input = BridgeMessage;
type Message = Vec<()>;
type Output = Vec<datasource::DataUpdate>;

fn create(_scope: &WorkerScope<Self>) -> Self {
Self {}
}

fn update(&mut self, _scope: &WorkerScope<Self>, msg: Self::Message) {
async_std::task::block_on(Self::async_update(msg));
}

fn received(&mut self, scope: &WorkerScope<Self>, msg: Self::Input, id: HandlerId) {
match msg {
BridgeMessage::SetDatasource(s, as_of, data_epoc) => {
DATASOURCE_EPOC.store(data_epoc, Ordering::Relaxed);
// web_sys::console::log_1(&format!("got input from bridge basetime {}",
// basetime).into()); let link_clone : Arc<async_std::sync::Mutex<WorkerLink<Self>>>
// = scope.clone();
async_std::task::block_on(do_datasources(s, as_of, &Self::send_it_too));
// async |_|{
// web_sys::console::log_1(&format!("got block. send to bridge").into());
// self.t();
// // scope.send_message(vec![]);
// }
},
BridgeMessage::GetNewBlocks => {
// let t = async move || {
let vec = &mut *UPDATE_QUEUE.lock().unwrap();
let mut results = vec![];
core::mem::swap(vec, &mut results);
scope.respond(id, results);
// };
// async_std::task::block_on(t());
},
}

// let chain_info = ChainInfo{
// chain_ws: String::from("kusama-rpc.polkadot.io"),
// // pub chain_id: Option<NonZeroU32>,
// // pub chain_drawn: bool,
// // Negative is other direction from center.
// chain_index: 1,
// chain_url: DotUrl{ sovereign:Some(1), env:Env::Prod, ..DotUrl::default() },
// };
// // let url = chain_name_to_url(&chain_info.chain_ws);
// // let source = datasource::RawDataSource::new(&url);
// let block_watcher = datasource::BlockWatcher{
// tx: None,
// chain_info ,
// as_of: None,
// receive_channel: None,
// sender: None,
// };

// async_std::task::block_on(block_watcher.watch_blocks());
// self.link.respond(id, (msg, 42));
}

// fn name_of_resource() -> &'static str {
// "worker.js"
// }
}
110 changes: 110 additions & 0 deletions src/webworker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::DATASOURCE_EPOC;
use crate::do_datasources;
use crate::UPDATE_QUEUE;
use crate::BASETIME;
use crate::BridgeMessage;
use core::sync::atomic::Ordering;
use crate::datasource;
use crate::log;

macro_rules! log {
// Note that this is using the `log` function imported above during
// `bare_bones`
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}

use core::time::Duration;
use gloo_worker::WorkerScope;


use gloo_worker::{HandlerId, Worker};

pub struct IOWorker {}

impl IOWorker {
pub async fn async_update(_msg: <Self as Worker>::Message) {
log!("Got update");
async_std::task::sleep(Duration::from_secs(5)).await;
async_std::task::sleep(Duration::from_secs(5)).await;
log!("Finished waiting");
}

async fn send_it_too(blocks: Vec<datasource::DataUpdate>) {
// web_sys::console::log_1(&format!("got block. add to worker queue{}",
// blocks.len()).into());

// Could move this earlier to when a block is produced by relay chain?
let mut base_time = *BASETIME.lock().unwrap();
if base_time == 0 {
if let datasource::DataUpdate::NewBlock(block) = &blocks[0] {
base_time = block.timestamp.unwrap_or(0);
web_sys::console::log_1(&format!("BASETIME set to {}", base_time).into());
*BASETIME.lock().unwrap() = base_time;
}
}

UPDATE_QUEUE.lock().unwrap().extend(blocks);
// web_sys::console::log_1(&format!("added to worker queue").into());
}
}

impl Worker for IOWorker {
type Input = BridgeMessage;
type Message = Vec<()>;
type Output = Vec<datasource::DataUpdate>;

fn create(_scope: &WorkerScope<Self>) -> Self {
Self {}
}

fn update(&mut self, _scope: &WorkerScope<Self>, msg: Self::Message) {
async_std::task::block_on(Self::async_update(msg));
}

fn received(&mut self, scope: &WorkerScope<Self>, msg: Self::Input, id: HandlerId) {
match msg {
BridgeMessage::SetDatasource(s, as_of, data_epoc) => {
DATASOURCE_EPOC.store(data_epoc, Ordering::Relaxed);
// web_sys::console::log_1(&format!("got input from bridge basetime {}",
// basetime).into()); let link_clone : Arc<async_std::sync::Mutex<WorkerLink<Self>>>
// = scope.clone();
async_std::task::block_on(do_datasources(s, as_of, &Self::send_it_too));
// async |_|{
// web_sys::console::log_1(&format!("got block. send to bridge").into());
// self.t();
// // scope.send_message(vec![]);
// }
},
BridgeMessage::GetNewBlocks => {
// let t = async move || {
let vec = &mut *UPDATE_QUEUE.lock().unwrap();
let mut results = vec![];
core::mem::swap(vec, &mut results);
scope.respond(id, results);
// };
// async_std::task::block_on(t());
},
}

// let chain_info = ChainInfo{
// chain_ws: String::from("kusama-rpc.polkadot.io"),
// // pub chain_id: Option<NonZeroU32>,
// // pub chain_drawn: bool,
// // Negative is other direction from center.
// chain_index: 1,
// chain_url: DotUrl{ sovereign:Some(1), env:Env::Prod, ..DotUrl::default() },
// };
// // let url = chain_name_to_url(&chain_info.chain_ws);
// // let source = datasource::RawDataSource::new(&url);
// let block_watcher = datasource::BlockWatcher{
// tx: None,
// chain_info ,
// as_of: None,
// receive_channel: None,
// sender: None,
// };

// async_std::task::block_on(block_watcher.watch_blocks());
// self.link.respond(id, (msg, 42));
}
}

0 comments on commit 3b2fb76

Please sign in to comment.