Skip to content
This repository was archived by the owner on Oct 21, 2024. It is now read-only.

Commit 6beb4ea

Browse files
committed
WIP example Flight server for DataFusion
1 parent 8b7911b commit 6beb4ea

File tree

2 files changed

+165
-0
lines changed

2 files changed

+165
-0
lines changed

rust/datafusion/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ crossbeam = "0.7.1"
5656
[dev-dependencies]
5757
criterion = "0.2.0"
5858
tempdir = "0.3.7"
59+
futures = "0.3"
60+
prost = "0.5"
61+
tokio = { version = "0.2", features = ["full"] }
62+
tonic = "0.1.1"
63+
flatbuffers = "0.6.0"
64+
arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" }
5965

6066
[[bench]]
6167
name = "aggregate_query_sql"
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::pin::Pin;
2+
3+
use futures::Stream;
4+
use tonic::transport::Server;
5+
use tonic::{Request, Response, Status, Streaming};
6+
7+
use datafusion::execution::context::ExecutionContext;
8+
9+
use arrow::record_batch::RecordBatch;
10+
use flight::{
11+
flight_service_server::FlightService, flight_service_server::FlightServiceServer,
12+
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
13+
HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket,
14+
};
15+
16+
#[derive(Clone)]
17+
pub struct FlightServiceImpl {}
18+
19+
#[tonic::async_trait]
20+
impl FlightService for FlightServiceImpl {
21+
type HandshakeStream = Pin<
22+
Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + Sync + 'static>,
23+
>;
24+
type ListFlightsStream =
25+
Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + Sync + 'static>>;
26+
type DoGetStream =
27+
Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
28+
type DoPutStream =
29+
Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + Sync + 'static>>;
30+
type DoActionStream = Pin<
31+
Box<dyn Stream<Item = Result<flight::Result, Status>> + Send + Sync + 'static>,
32+
>;
33+
type ListActionsStream =
34+
Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + Sync + 'static>>;
35+
36+
async fn do_get(
37+
&self,
38+
request: Request<Ticket>,
39+
) -> Result<Response<Self::DoGetStream>, Status> {
40+
let ticket = request.into_inner();
41+
match String::from_utf8(ticket.ticket.to_vec()) {
42+
Ok(sql) => {
43+
println!("do_get: {}", sql);
44+
45+
// create local execution context
46+
let mut ctx = ExecutionContext::new();
47+
48+
ctx.register_parquet("alltypes_plain", "alltypes_plain.snappy.parquet")
49+
.unwrap();
50+
51+
// create the query plan
52+
let plan = ctx
53+
.create_logical_plan(&sql)
54+
.map_err(|e| to_tonic_err(&e))?;
55+
let plan = ctx.optimize(&plan).map_err(|e| to_tonic_err(&e))?;
56+
let plan = ctx
57+
.create_physical_plan(&plan, 1024 * 1024)
58+
.map_err(|e| to_tonic_err(&e))?;
59+
60+
//TODO make this async
61+
62+
// execute the query
63+
let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?;
64+
65+
let flights: Vec<Result<FlightData, Status>> =
66+
results.iter().map(|batch| to_flight_data(batch)).collect();
67+
68+
let output = futures::stream::iter(flights);
69+
70+
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
71+
}
72+
Err(e) => Err(Status::unimplemented(format!("Invalid ticket: {:?}", e))),
73+
}
74+
}
75+
76+
async fn handshake(
77+
&self,
78+
_request: Request<Streaming<HandshakeRequest>>,
79+
) -> Result<Response<Self::HandshakeStream>, Status> {
80+
Err(Status::unimplemented("Not yet implemented"))
81+
}
82+
83+
async fn list_flights(
84+
&self,
85+
_request: Request<Criteria>,
86+
) -> Result<Response<Self::ListFlightsStream>, Status> {
87+
Err(Status::unimplemented("Not yet implemented"))
88+
}
89+
90+
async fn get_flight_info(
91+
&self,
92+
_request: Request<FlightDescriptor>,
93+
) -> Result<Response<FlightInfo>, Status> {
94+
Err(Status::unimplemented("Not yet implemented"))
95+
}
96+
97+
async fn get_schema(
98+
&self,
99+
_request: Request<FlightDescriptor>,
100+
) -> Result<Response<SchemaResult>, Status> {
101+
Err(Status::unimplemented("Not yet implemented"))
102+
}
103+
104+
async fn do_put(
105+
&self,
106+
_request: Request<Streaming<FlightData>>,
107+
) -> Result<Response<Self::DoPutStream>, Status> {
108+
Err(Status::unimplemented("Not yet implemented"))
109+
}
110+
111+
async fn do_action(
112+
&self,
113+
_request: Request<Action>,
114+
) -> Result<Response<Self::DoActionStream>, Status> {
115+
Err(Status::unimplemented("Not yet implemented"))
116+
}
117+
118+
async fn list_actions(
119+
&self,
120+
_request: Request<Empty>,
121+
) -> Result<Response<Self::ListActionsStream>, Status> {
122+
Err(Status::unimplemented("Not yet implemented"))
123+
}
124+
}
125+
126+
fn to_flight_data(_batch: &RecordBatch) -> Result<FlightData, Status> {
127+
//TODO implement .. need help on how to encode the batches using IPC here
128+
129+
Ok(FlightData {
130+
flight_descriptor: None,
131+
app_metadata: vec![],
132+
/// Header for message data as described in Message.fbs::Message
133+
data_header: vec![],
134+
/// The actual batch of Arrow data. Preferably handled with minimal-copies
135+
/// coming last in the definition to help with sidecar patterns (it is
136+
/// expected that some implementations will fetch this field off the wire
137+
/// with specialized code to avoid extra memory copies).
138+
///
139+
data_body: vec![],
140+
})
141+
}
142+
143+
fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status {
144+
Status::unimplemented(format!("{:?}", e))
145+
}
146+
147+
#[tokio::main]
148+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
149+
let addr = "0.0.0.0:50051".parse()?;
150+
let service = FlightServiceImpl {};
151+
152+
let svc = FlightServiceServer::new(service);
153+
154+
println!("Listening on {:?}", addr);
155+
156+
Server::builder().add_service(svc).serve(addr).await?;
157+
158+
Ok(())
159+
}

0 commit comments

Comments
 (0)