Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl/handler service endpoints logic workflow abstraction #73

Open
wants to merge 18 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
7f50289
Move is_any_handler_alive as a Server struct method
MS-Painter Jun 14, 2021
acea667
Extract get handler (immutable) logic to Server struct
MS-Painter Jun 15, 2021
2222caf
Extract type TraceHandlerStream to separate file in module
MS-Painter Jun 15, 2021
9f22a0f
Add basis for service endpoints submodule and outsource to it trace_h…
MS-Painter Jun 15, 2021
85f8138
[WIP] Extract handler service server implementations to separate endp…
MS-Painter Jun 15, 2021
972eba1
[WIP] Apply rustfmt
MS-Painter Jun 15, 2021
a4fd853
[WIP] Remove async constraint for is_any_handler_alive logic by movin…
MS-Painter Jun 15, 2021
e398b62
[WIP] cleanup endpoints instantiation with impl new
MS-Painter Jun 15, 2021
09e7c86
Remove unneeded async constraint on Mapping stop_handler logic
MS-Painter Jun 15, 2021
420159a
Rename foldend server module to handler_server
MS-Painter Jun 15, 2021
9931d69
Rename is_any_handler_alive to any_handler_alive (clippy self usage c…
MS-Painter Jun 15, 2021
24725e1
Improve startup_handlers iterations by iterating once
MS-Painter Jun 15, 2021
48cba66
Fix StartHandlerEndpoint and StopHandlerEndpoint PascalCase naming
MS-Painter Jun 15, 2021
2bb7cd3
Use iter_live_handlers in is_concurrent_handlers_limit_reached
MS-Painter Jun 15, 2021
9e0c7ee
Extract is_concurrent_handlers_limit_reached to server utils
MS-Painter Jun 15, 2021
8735555
Removed reliance on Server struct in StartHandlerEndpoint
MS-Painter Jun 15, 2021
86f209b
Outsource spawn_handler_thread logic to HandlerMapping struct
MS-Painter Jun 15, 2021
de0311e
Remove unneeded comment made for spawn handler thread
MS-Painter Jun 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Outsource spawn_handler_thread logic to HandlerMapping struct
  • Loading branch information
MS-Painter committed Jun 15, 2021
commit 86f209b089b6a49414e148963580c1475745a809
45 changes: 45 additions & 0 deletions src/foldend/handler_mapping.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
use std::convert::TryFrom;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;

use crossbeam::channel::Sender;
use notify::ErrorKind as NotifyErrorKind;
use notify::RecommendedWatcher;
use notify::Watcher;
use notify::{Event, EventKind};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;

use generated_types::{HandlerSummary, ModifyHandlerRequest};
use pipelines::pipeline_config::PipelineConfig;
use pipelines::pipeline_handler::PipelineHandler;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HandlerMapping {
Expand Down Expand Up @@ -40,6 +51,40 @@ impl HandlerMapping {
}
}

pub fn spawn_handler_thread(
&mut self,
directory_path: String,
trace_tx: Arc<
broadcast::Sender<Result<generated_types::TraceHandlerResponse, tonic::Status>>,
>,
) -> Result<(), String> {
let path = PathBuf::from(directory_path);
let config_path = PathBuf::from(&self.handler_config_path);
match fs::read(&config_path) {
Ok(data) => {
match PipelineConfig::try_from(data) {
Ok(config) => {
let (events_tx, events_rx) = crossbeam::channel::unbounded();
let events_thread_tx = events_tx.clone();
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| events_thread_tx.send(res).unwrap()).unwrap();
let _ = watcher.configure(notify::Config::PreciseEvents(true));
thread::spawn(move || {
let mut handler = PipelineHandler::new(config, trace_tx);
handler.watch(&path, watcher, events_rx);
});
// Insert or update the value of the current handled directory
self.watcher_tx = Option::Some(events_tx);
Ok(())
}
Err(err) => Err(format!("Pipeline config parsing failure.\nPath: {:?}\nError: {:?}", config_path, err))
}
}
Err(err) => {
Err(format!("Pipeline file read failure.\nMake sure the file is at the registered path\nPath: {:?}\nError: {:?}", config_path, err))
}
}
}

pub fn stop_handler_thread(&self) -> Result<String, String> {
match self
.watcher_tx
Expand Down
11 changes: 6 additions & 5 deletions src/foldend/handler_server/endpoints/register_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ impl ServiceEndpoint<Request, Response> for RegisterEndpoint<'_> {
}));
}
let trace_tx = self.server.handlers_trace_tx.clone();
match self.mapping.spawn_handler_thread(
request.directory_path,
&mut handler_mapping,
trace_tx,
) {
match handler_mapping
.spawn_handler_thread(request.directory_path.to_string(), trace_tx)
{
Ok(_) => {
self.mapping
.directory_mapping
.insert(request.directory_path, handler_mapping);
let _result = self.mapping.save(&self.server.config.mapping_state_path);
Ok(Response::new(HandlerStateResponse {
is_alive: true,
Expand Down
62 changes: 10 additions & 52 deletions src/foldend/mapping.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
use std::{
collections::HashMap,
convert::TryFrom,
fs,
path::{Path, PathBuf},
result::Result,
sync::Arc,
thread,
};
use std::{collections::HashMap, convert::TryFrom, fs, path::Path, result::Result, sync::Arc};

use notify::{RecommendedWatcher, Watcher};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;

use crate::{config::Config, handler_mapping::HandlerMapping};
use generated_types::HandlerStateResponse;
use pipelines::{pipeline_config::PipelineConfig, pipeline_handler::PipelineHandler};

// Mapping data used to handle known directories to handle
// If a handler thread has ceased isn't known at realtime rather will be verified via channel whenever needed to check given a client request
Expand Down Expand Up @@ -50,11 +40,15 @@ impl Mapping {
message: String::from("Handler already up"),
}
} else {
match self.spawn_handler_thread(directory_path.to_string(), handler_mapping, trace_tx) {
Ok(_) => HandlerStateResponse {
is_alive: true,
message: String::from("Started handler"),
},
match handler_mapping.spawn_handler_thread(directory_path.to_string(), trace_tx) {
Ok(_) => {
self.directory_mapping
.insert(directory_path.to_string(), handler_mapping.to_owned());
HandlerStateResponse {
is_alive: true,
message: String::from("Started handler"),
}
}
Err(err) => HandlerStateResponse {
is_alive: false,
message: format!("Failed to start handler.\nError: {}", err),
Expand All @@ -63,42 +57,6 @@ impl Mapping {
}
}

pub fn spawn_handler_thread(
&mut self,
directory_path: String,
handler_mapping: &mut HandlerMapping,
trace_tx: Arc<
broadcast::Sender<Result<generated_types::TraceHandlerResponse, tonic::Status>>,
>,
) -> Result<(), String> {
let path = PathBuf::from(directory_path.clone());
let config_path = PathBuf::from(&handler_mapping.handler_config_path);
match fs::read(&config_path) {
Ok(data) => {
match PipelineConfig::try_from(data) {
Ok(config) => {
let (events_tx, events_rx) = crossbeam::channel::unbounded();
let events_thread_tx = events_tx.clone();
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| events_thread_tx.send(res).unwrap()).unwrap();
let _ = watcher.configure(notify::Config::PreciseEvents(true));
thread::spawn(move || {
let mut handler = PipelineHandler::new(config, trace_tx);
handler.watch(&path, watcher, events_rx);
});
// Insert or update the value of the current handled directory
handler_mapping.watcher_tx = Option::Some(events_tx);
self.directory_mapping.insert(directory_path, handler_mapping.to_owned());
Ok(())
}
Err(err) => Err(format!("Pipeline config parsing failure.\nPath: {:?}\nError: {:?}", config_path, err))
}
}
Err(err) => {
Err(format!("Pipeline file read failure.\nMake sure the file is at the registered path\nPath: {:?}\nError: {:?}", config_path, err))
}
}
}

pub fn stop_handler(
&mut self,
config: &Config,
Expand Down