Skip to content

Commit

Permalink
feat: grpc trait&Server trait
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Apr 29, 2024
1 parent 5f1acbd commit 995cf77
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 61 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
futures = "0.3"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
async-trait.workspace = true
Expand Down
67 changes: 18 additions & 49 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio::task::LocalSet;

use crate::adapter::error::{EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu};
Expand Down Expand Up @@ -72,8 +72,9 @@ pub fn start_flow_node_and_one_worker(
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) {
let node_id = Some(1);
let (flow_node_manager, mut worker) =
FlowNodeManager::new_with_worker(frontend_invoker, query_engine, table_meta);
FlowNodeManager::new_with_worker(node_id, frontend_invoker, query_engine, table_meta);
let rt = tokio::runtime::Runtime::new().unwrap();
let local = tokio::task::LocalSet::new();

Expand All @@ -88,6 +89,8 @@ pub fn start_flow_node_and_one_worker(
});
}

pub type FlowNodeManagerRef = Arc<FlowNodeManager>;

/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
///
/// The choice of timestamp is just using current system timestamp for now
Expand All @@ -103,18 +106,25 @@ pub struct FlowNodeManager {
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlowNodeContext>,
tick_manager: FlowTickManager,
node_id: Option<u32>,
run_task_created: Mutex<bool>,
}

impl FlowNodeManager {
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self) {
loop {
// TODO(discord9): start a server to listen for incoming request
self.run_available().await;
self.send_writeback_requests().await;
// TODO(discord9): error handling
let _ = self.send_writeback_requests().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
pub fn new(
node_id: Option<u32>,
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
Expand All @@ -133,14 +143,17 @@ impl FlowNodeManager {
frontend_invoker,
node_context: Mutex::new(node_context),
tick_manager,
node_id,
run_task_created: Mutex::new(false),
}
}
pub fn new_with_worker<'s>(
node_id: Option<u32>,
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> (Self, Worker<'s>) {
let mut zelf = Self::new(frontend_invoker, query_engine, table_meta);
let mut zelf = Self::new(node_id, frontend_invoker, query_engine, table_meta);
let (handle, worker) = create_worker(zelf.tick_manager.clone());
zelf.add_worker_handle(handle);
(zelf, worker)
Expand Down Expand Up @@ -255,51 +268,6 @@ impl TableInfoSource {
}
}

/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Hydroflow<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}

impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
df: Hydroflow::new(),
state: DataflowState::default(),
err_collector: ErrCollector::default(),
}
}
}

impl<'subgraph> ActiveDataflowState<'subgraph> {
/// Create a new render context, assigned with given global id
pub fn new_ctx<'ctx>(&'ctx mut self, global_id: GlobalId) -> Context<'ctx, 'subgraph>
where
'subgraph: 'ctx,
{
Context {
id: global_id,
df: &mut self.df,
compute_state: &mut self.state,
err_collector: self.err_collector.clone(),
input_collection: Default::default(),
local_scope: Default::default(),
}
}

pub fn set_current_ts(&mut self, ts: repr::Timestamp) {
self.state.set_current_ts(ts);
}

/// Run all available subgraph
///
/// return true if any subgraph actually executed
pub fn run_available(&mut self) -> bool {
self.state.run_available_with_schedule(&mut self.df)
}
}

pub enum DiffRequest {
Insert(Vec<Row>),
Delete(Vec<Row>),
Expand Down Expand Up @@ -369,6 +337,7 @@ impl FlowNodeManager {
table_info.table_info.schema_name,
);
let ctx = QueryContext::with(&catalog, &schema);

let primary_keys = table_info
.table_info
.meta
Expand Down
100 changes: 94 additions & 6 deletions src/flow/src/adapter/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,36 @@

//! Implementation of grpc service for flow node

use std::net::SocketAddr;
use std::sync::Arc;

use api::v1::flow::{CreateRequest, RemoveRequest};
use common_telemetry::tracing::info;
use futures::FutureExt;
use greptime_proto::v1::flow::{
flow_request, flow_server, FlowRequest, FlowResponse, InsertRequests,
};
use itertools::Itertools;
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use snafu::{ensure, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{oneshot, Mutex};
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic::{Request, Response, Status};

use crate::adapter::FlowNodeManager;
use crate::adapter::{FlowNodeManager, FlowNodeManagerRef};
use crate::repr::{self, DiffRow};

pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";

#[derive(Clone)]
pub struct FlowService {
pub manager: FlowNodeManagerRef,
}

#[async_trait::async_trait]
impl flow_server::Flow for FlowNodeManager {
impl flow_server::Flow for FlowService {
async fn handle_create_remove(
&self,
request: Request<FlowRequest>,
Expand All @@ -44,10 +61,12 @@ impl flow_server::Flow for FlowNodeManager {
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_id = self
.manager
.table_info_source
.get_table_id_from_proto_name(&sink_table_name)
.await?;
let ret = self
.manager
.create_task(
task_id.id as u64,
sink_table_id,
Expand All @@ -71,7 +90,8 @@ impl flow_server::Flow for FlowNodeManager {
Some(flow_request::Body::Remove(RemoveRequest {
task_id: Some(task_id),
})) => {
self.remove_task(task_id.id as u64)
self.manager
.remove_task(task_id.id as u64)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok(Response::new(Default::default()))
Expand All @@ -88,19 +108,87 @@ impl flow_server::Flow for FlowNodeManager {
for write_request in request.into_inner().requests {
let region_id = write_request.region_id;
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
let now = self.tick_manager.tick();
// TODO(discord9): reconsider time assignment mechanism
let now = self.manager.tick_manager.tick();
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(repr::Row::from)
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows)
self.manager
.handle_write_request(region_id.into(), rows)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
}
// since `run_available` doesn't blocking, we can just trigger a run here
self.run_available().await;
self.manager.run_available().await;
// write back should be config to be timed in somewhere else like one attempt per second
Ok(Response::new(Default::default()))
}
}

pub struct FlowNodeServer {
pub shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
pub flow_service: FlowService,
}

impl FlowNodeServer {
pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
flow_server::FlowServer::new(self.flow_service.clone())
}
}

#[async_trait::async_trait]
impl servers::server::Server for FlowNodeServer {
async fn shutdown(&self) -> Result<(), servers::error::Error> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
}
info!("Shutdown flow node server");

Ok(())
}
async fn start(&self, addr: SocketAddr) -> Result<SocketAddr, servers::error::Error> {
let (tx, rx) = oneshot::channel::<()>();
let (incoming, addr) = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
AlreadyStartedSnafu { server: "flow" }
);
let listener = TcpListener::bind(addr)
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
let incoming =
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
info!("flow server is bound to {}", addr);

*shutdown_tx = Some(tx);

(incoming, addr)
};

let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
let _handle = common_runtime::spawn_bg(async move {
let _result = builder
.serve_with_incoming_shutdown(incoming, rx.map(drop))
.await
.context(StartGrpcSnafu);
// TODO(discord9): better place for dataflow to run per second
});
let manager_ref = self.flow_service.manager.clone();
let _handle_trigger_run = common_runtime::spawn_bg(async move {
manager_ref.run().await;
});

Ok(addr)
}

fn name(&self) -> &str {
FLOW_NODE_SERVER_NAME
}
}
52 changes: 50 additions & 2 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,65 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;

use hydroflow::scheduled::graph::Hydroflow;
use snafu::ResultExt;
use tokio::sync::{broadcast, mpsc, Mutex};

use crate::adapter::error::{Error, EvalSnafu};
use crate::adapter::{ActiveDataflowState, FlowTickManager, TaskId};
use crate::adapter::{FlowTickManager, TaskId};
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::DiffRow;
use crate::repr::{self, DiffRow};

pub type SharedBuf = Arc<Mutex<VecDeque<DiffRow>>>;

/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Hydroflow<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}

impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
df: Hydroflow::new(),
state: DataflowState::default(),
err_collector: ErrCollector::default(),
}
}
}

impl<'subgraph> ActiveDataflowState<'subgraph> {
/// Create a new render context, assigned with given global id
pub fn new_ctx<'ctx>(&'ctx mut self, global_id: GlobalId) -> Context<'ctx, 'subgraph>
where
'subgraph: 'ctx,
{
Context {
id: global_id,
df: &mut self.df,
compute_state: &mut self.state,
err_collector: self.err_collector.clone(),
input_collection: Default::default(),
local_scope: Default::default(),
}
}

pub fn set_current_ts(&mut self, ts: repr::Timestamp) {
self.state.set_current_ts(ts);
}

/// Run all available subgraph
///
/// return true if any subgraph actually executed
pub fn run_available(&mut self) -> bool {
self.state.run_available_with_schedule(&mut self.df)
}
}

pub struct WorkerHandle {
itc_client: Mutex<InterThreadCallClient>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ mod test {
for now in time_range {
state.set_current_ts(now);
state.run_available_with_schedule(df);
assert!(state.get_err_collector().inner.borrow().is_empty());
assert!(state.get_err_collector().is_empty());
if let Some(expected) = expected.get(&now) {
assert_eq!(*output.borrow(), *expected, "at ts={}", now);
} else {
Expand Down
Loading

0 comments on commit 995cf77

Please sign in to comment.