Skip to content

Commit

Permalink
add task params (FuelLabs#1159)
Browse files Browse the repository at this point in the history
Adding optional Task parameters when initializing `Task:RunnableTask`
from `RunnableService`.
  • Loading branch information
leviathanbeak authored Apr 26, 2023
1 parent 2cd6c94 commit 0643189
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 33 deletions.
33 changes: 21 additions & 12 deletions crates/fuel-core/src/graphql_api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use tower_http::{
trace::TraceLayer,
};

pub type Service = fuel_core_services::ServiceRunner<NotInitializedTask>;
pub type Service = fuel_core_services::ServiceRunner<GraphqlService>;

pub type Database = Box<dyn DatabasePort>;

Expand All @@ -85,10 +85,13 @@ pub struct SharedState {
pub bound_address: SocketAddr,
}

pub struct NotInitializedTask {
pub struct GraphqlService {
bound_address: SocketAddr,
}

pub struct ServerParams {
router: Router,
listener: TcpListener,
bound_address: SocketAddr,
}

pub struct Task {
Expand All @@ -97,23 +100,30 @@ pub struct Task {
}

#[async_trait::async_trait]
impl RunnableService for NotInitializedTask {
impl RunnableService for GraphqlService {
const NAME: &'static str = "GraphQL";

type SharedData = SharedState;
type Task = Task;
type TaskParams = ServerParams;

fn shared_data(&self) -> Self::SharedData {
SharedState {
bound_address: self.bound_address,
}
}

async fn into_task(self, state: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
self,
state: &StateWatcher,
params: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let mut state = state.clone();
let server = axum::Server::from_tcp(self.listener)
let ServerParams { router, listener } = params;

let server = axum::Server::from_tcp(listener)
.unwrap()
.serve(self.router.into_make_service())
.serve(router.into_make_service())
.with_graceful_shutdown(async move {
state
.while_started()
Expand Down Expand Up @@ -198,11 +208,10 @@ pub fn new_service(

tracing::info!("Binding GraphQL provider to {}", bound_address);

Ok(Service::new(NotInitializedTask {
router,
listener,
bound_address,
}))
Ok(Service::new_with_params(
GraphqlService { bound_address },
ServerParams { router, listener },
))
}

async fn graphql_playground() -> impl IntoResponse {
Expand Down
11 changes: 8 additions & 3 deletions crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,17 @@ impl RunnableService for Task {
const NAME: &'static str = "FuelService";
type SharedData = SharedState;
type Task = Task;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
self.shared.clone()
}

async fn into_task(self, _: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
for service in &self.services {
service.start_and_await().await?;
}
Expand Down Expand Up @@ -288,7 +293,7 @@ mod tests {
let task = Task::new(Default::default(), Config::local_node()).unwrap();
let (_, receiver) = tokio::sync::watch::channel(State::NotStarted);
let mut watcher = receiver.into();
let mut task = task.into_task(&watcher).await.unwrap();
let mut task = task.into_task(&watcher, ()).await.unwrap();
sleep(Duration::from_secs(1));
for service in task.sub_services() {
assert_eq!(service.state(), State::Started);
Expand Down Expand Up @@ -324,7 +329,7 @@ mod tests {
#[tokio::test]
async fn shutdown_stops_all_services() {
let task = Task::new(Default::default(), Config::local_node()).unwrap();
let mut task = task.into_task(&Default::default()).await.unwrap();
let mut task = task.into_task(&Default::default(), ()).await.unwrap();
let sub_services_watchers: Vec<_> = task
.sub_services()
.iter()
Expand Down
7 changes: 6 additions & 1 deletion crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,17 @@ where

type SharedData = SharedState;
type Task = Task<T, B, I>;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
self.shared_state.clone()
}

async fn into_task(self, _: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
match self.trigger {
Trigger::Never | Trigger::Instant => {}
Trigger::Interval { block_time } => {
Expand Down
7 changes: 6 additions & 1 deletion crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,17 @@ where

type SharedData = SharedState;
type Task = Task<D>;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
self.shared.clone()
}

async fn into_task(mut self, _: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
mut self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
self.p2p_service.start()?;
Ok(self)
}
Expand Down
7 changes: 6 additions & 1 deletion crates/services/relayer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ where

type SharedData = SharedState<D>;
type Task = Task<P, D>;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
let synced = self.synced.subscribe();
Expand All @@ -188,7 +189,11 @@ where
}
}

async fn into_task(mut self, watcher: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
mut self,
watcher: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let shutdown = watcher.clone();
let NotInitializedTask {
synced,
Expand Down
4 changes: 2 additions & 2 deletions crates/services/relayer/src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn deploy_height_does_not_override() {
};
let eth_node = MockMiddleware::default();
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config);
let _ = relayer.into_task(&Default::default()).await;
let _ = relayer.into_task(&Default::default(), ()).await;

assert_eq!(*mock_db.get_finalized_da_height().unwrap(), 50);
}
Expand All @@ -72,7 +72,7 @@ async fn deploy_height_does_override() {
};
let eth_node = MockMiddleware::default();
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config);
let _ = relayer.into_task(&Default::default()).await;
let _ = relayer.into_task(&Default::default(), ()).await;

assert_eq!(*mock_db.get_finalized_da_height().unwrap(), 52);
}
42 changes: 32 additions & 10 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,21 @@ pub trait RunnableService: Send {
/// The initialized runnable task type.
type Task: RunnableTask;

/// Optional parameters used to when initializing into task.
type TaskParams: Send;

/// A cloned instance of the shared data
fn shared_data(&self) -> Self::SharedData;

/// Converts the service into a runnable task before the main run loop.
///
/// The `state` is a `State` watcher of the service. Some tasks may handle state changes
/// on their own.
async fn into_task(self, state_watcher: &StateWatcher) -> anyhow::Result<Self::Task>;
async fn into_task(
self,
state_watcher: &StateWatcher,
params: Self::TaskParams,
) -> anyhow::Result<Self::Task>;
}

/// The trait is implemented by the service task and contains a single iteration of the infinity
Expand Down Expand Up @@ -124,11 +131,24 @@ where
impl<S> ServiceRunner<S>
where
S: RunnableService + 'static,
S::TaskParams: Default,
{
/// Initializes a new `ServiceRunner` containing a `RunnableService`
pub fn new(service: S) -> Self {
let shared = service.shared_data();
let state = initialize_loop(service);
let state = initialize_loop(service, S::TaskParams::default());
Self { shared, state }
}
}

impl<S> ServiceRunner<S>
where
S: RunnableService + 'static,
{
/// Initializes a new `ServiceRunner` containing a `RunnableService` with parameters for underlying `Task`
pub fn new_with_params(service: S, params: S::TaskParams) -> Self {
let shared = service.shared_data();
let state = initialize_loop(service, params);
Self { shared, state }
}

Expand Down Expand Up @@ -225,7 +245,7 @@ where

#[tracing::instrument(skip_all, fields(service = S::NAME))]
/// Initialize the background loop as a spawned task.
fn initialize_loop<S>(service: S) -> Shared<watch::Sender<State>>
fn initialize_loop<S>(service: S, params: S::TaskParams) -> Shared<watch::Sender<State>>
where
S: RunnableService + 'static,
{
Expand All @@ -236,7 +256,8 @@ where
tokio::task::spawn(
async move {
tracing::debug!("running");
let run = std::panic::AssertUnwindSafe(run(service, stop_sender.clone()));
let run =
std::panic::AssertUnwindSafe(run(service, stop_sender.clone(), params));
tracing::debug!("awaiting run");
let result = run.catch_unwind().await;

Expand Down Expand Up @@ -270,7 +291,7 @@ where
}

/// Runs the main loop.
async fn run<S>(service: S, sender: Shared<watch::Sender<State>>)
async fn run<S>(service: S, sender: Shared<watch::Sender<State>>, params: S::TaskParams)
where
S: RunnableService + 'static,
{
Expand All @@ -287,7 +308,7 @@ where

// We can panic here, because it is inside of the task.
let mut task = service
.into_task(&state)
.into_task(&state, params)
.await
.expect("The initialization of the service failed.");

Expand Down Expand Up @@ -388,10 +409,11 @@ mod tests {

type SharedData = EmptyShared;
type Task = MockTask;
type TaskParams = ();

fn shared_data(&self) -> EmptyShared;

async fn into_task(self, state: &StateWatcher) -> anyhow::Result<MockTask>;
async fn into_task(self, state: &StateWatcher, params: <MockService as RunnableService>::TaskParams) -> anyhow::Result<MockTask>;
}
}

Expand All @@ -417,7 +439,7 @@ mod tests {
fn new_empty() -> Self {
let mut mock = MockService::default();
mock.expect_shared_data().returning(|| EmptyShared);
mock.expect_into_task().returning(|_| {
mock.expect_into_task().returning(|_, _| {
let mut mock = MockTask::default();
mock.expect_run().returning(|watcher| {
let mut watcher = watcher.clone();
Expand Down Expand Up @@ -468,7 +490,7 @@ mod tests {
async fn panic_during_run() {
let mut mock = MockService::default();
mock.expect_shared_data().returning(|| EmptyShared);
mock.expect_into_task().returning(|_| {
mock.expect_into_task().returning(|_, _| {
let mut mock = MockTask::default();
mock.expect_run().returning(|_| panic!("Should fail"));
mock.expect_shutdown().times(1).returning(|| Ok(()));
Expand All @@ -486,7 +508,7 @@ mod tests {
async fn panic_during_shutdown() {
let mut mock = MockService::default();
mock.expect_shared_data().returning(|| EmptyShared);
mock.expect_into_task().returning(|_| {
mock.expect_into_task().returning(|_, _| {
let mut mock = MockTask::default();
mock.expect_run().returning(|_| {
Box::pin(async move {
Expand Down
14 changes: 12 additions & 2 deletions crates/services/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,15 @@ where
type SharedData = ();

type Task = SyncTask<P, E, C>;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {}

async fn into_task(mut self, watcher: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
mut self,
watcher: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let mut sync_watcher = watcher.clone();
self.import_task_handle.start_and_await().await?;
let mut import_watcher = self.import_task_handle.state_watcher();
Expand Down Expand Up @@ -193,12 +198,17 @@ where
const NAME: &'static str = "fuel-core-sync/import-task";

type SharedData = ();
type TaskParams = ();

type Task = ImportTask<P, E, C>;

fn shared_data(&self) -> Self::SharedData {}

async fn into_task(self, _: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
Ok(self)
}
}
7 changes: 6 additions & 1 deletion crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,17 @@ where

type SharedData = SharedState<P2P, DB>;
type Task = Task<P2P, DB>;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
self.shared.clone()
}

async fn into_task(mut self, _: &StateWatcher) -> anyhow::Result<Self::Task> {
async fn into_task(
mut self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
self.ttl_timer.reset();
Ok(self)
}
Expand Down

0 comments on commit 0643189

Please sign in to comment.