-
Notifications
You must be signed in to change notification settings - Fork 786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement FlightSQL spec change to support stateless prepared statements #5433
Changes from 4 commits
1d1b800
1e6dfe0
8ba176e
3dcbeda
b738348
041ea6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,17 +32,18 @@ use arrow_flight::{ | |
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, | ||
CommandGetTables, CommandGetXdbcTypeInfo, CommandPreparedStatementQuery, | ||
CommandPreparedStatementUpdate, CommandStatementQuery, CommandStatementSubstraitPlan, | ||
CommandStatementUpdate, ProstMessageExt, SqlInfo, TicketStatementQuery, | ||
CommandStatementUpdate, DoPutPreparedStatementResult, ProstMessageExt, SqlInfo, | ||
TicketStatementQuery, | ||
}, | ||
utils::batches_to_flight_data, | ||
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, | ||
HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, Ticket, | ||
HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket, | ||
}; | ||
use arrow_ipc::writer::IpcWriteOptions; | ||
use arrow_schema::{ArrowError, DataType, Field, Schema}; | ||
use assert_cmd::Command; | ||
use bytes::Bytes; | ||
use futures::{Stream, StreamExt, TryStreamExt}; | ||
use futures::{Stream, TryStreamExt}; | ||
use prost::Message; | ||
use tokio::{net::TcpListener, task::JoinHandle}; | ||
use tonic::{Request, Response, Status, Streaming}; | ||
|
@@ -51,7 +52,7 @@ const QUERY: &str = "SELECT * FROM table;"; | |
|
||
#[tokio::test] | ||
async fn test_simple() { | ||
let test_server = FlightSqlServiceImpl {}; | ||
let test_server = FlightSqlServiceImpl::default(); | ||
let fixture = TestFixture::new(&test_server).await; | ||
let addr = fixture.addr; | ||
|
||
|
@@ -92,10 +93,9 @@ async fn test_simple() { | |
|
||
const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1"; | ||
const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle"; | ||
const UPDATED_PREPARED_STATEMENT_HANDLE: &str = "updated_prepared_statement_handle"; | ||
|
||
#[tokio::test] | ||
async fn test_do_put_prepared_statement() { | ||
let test_server = FlightSqlServiceImpl {}; | ||
async fn test_do_put_prepared_statement(test_server: FlightSqlServiceImpl) { | ||
let fixture = TestFixture::new(&test_server).await; | ||
let addr = fixture.addr; | ||
|
||
|
@@ -136,11 +136,40 @@ async fn test_do_put_prepared_statement() { | |
); | ||
} | ||
|
||
#[tokio::test] | ||
pub async fn test_do_put_prepared_statement_stateless() { | ||
test_do_put_prepared_statement(FlightSqlServiceImpl { | ||
stateless_prepared_statements: true, | ||
}) | ||
.await | ||
} | ||
|
||
#[tokio::test] | ||
pub async fn test_do_put_prepared_statement_stateful() { | ||
test_do_put_prepared_statement(FlightSqlServiceImpl { | ||
stateless_prepared_statements: false, | ||
}) | ||
.await | ||
} | ||
|
||
/// All tests must complete within this many seconds or else the test server is shutdown | ||
const DEFAULT_TIMEOUT_SECONDS: u64 = 30; | ||
|
||
#[derive(Clone, Default)] | ||
pub struct FlightSqlServiceImpl {} | ||
#[derive(Clone)] | ||
pub struct FlightSqlServiceImpl { | ||
/// Whether to emulate stateless (true) or stateful (false) behavior for | ||
/// prepared statements. stateful servers will not return an updated | ||
/// handle after executing `DoPut(CommandPreparedStatementQuery)` | ||
stateless_prepared_statements: bool, | ||
} | ||
|
||
impl Default for FlightSqlServiceImpl { | ||
fn default() -> Self { | ||
Self { | ||
stateless_prepared_statements: true, | ||
} | ||
} | ||
} | ||
|
||
impl FlightSqlServiceImpl { | ||
/// Return an [`FlightServiceServer`] that can be used with a | ||
|
@@ -274,10 +303,17 @@ impl FlightSqlService for FlightSqlServiceImpl { | |
cmd: CommandPreparedStatementQuery, | ||
_request: Request<FlightDescriptor>, | ||
) -> Result<Response<FlightInfo>, Status> { | ||
assert_eq!( | ||
cmd.prepared_statement_handle, | ||
PREPARED_STATEMENT_HANDLE.as_bytes() | ||
); | ||
if self.stateless_prepared_statements { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to do this kind of testing (simulate both stateful and stateless server behavior) in the Go implementation as well, but I am less confident around the Arrow Go test suite and general Go testing practices. |
||
assert_eq!( | ||
cmd.prepared_statement_handle, | ||
UPDATED_PREPARED_STATEMENT_HANDLE.as_bytes() | ||
); | ||
} else { | ||
assert_eq!( | ||
cmd.prepared_statement_handle, | ||
PREPARED_STATEMENT_HANDLE.as_bytes() | ||
); | ||
} | ||
let resp = Response::new(self.fake_flight_info().unwrap()); | ||
Ok(resp) | ||
} | ||
|
@@ -524,7 +560,7 @@ impl FlightSqlService for FlightSqlServiceImpl { | |
&self, | ||
_query: CommandPreparedStatementQuery, | ||
request: Request<PeekableFlightDataStream>, | ||
) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> { | ||
) -> Result<DoPutPreparedStatementResult, Status> { | ||
// just make sure decoding the parameters works | ||
let parameters = FlightRecordBatchStream::new_from_flight_data( | ||
request.into_inner().map_err(|e| e.into()), | ||
|
@@ -543,10 +579,15 @@ impl FlightSqlService for FlightSqlServiceImpl { | |
))); | ||
} | ||
} | ||
|
||
Ok(Response::new( | ||
futures::stream::once(async { Ok(PutResult::default()) }).boxed(), | ||
)) | ||
let handle = if self.stateless_prepared_statements { | ||
UPDATED_PREPARED_STATEMENT_HANDLE.to_string().into() | ||
} else { | ||
PREPARED_STATEMENT_HANDLE.to_string().into() | ||
}; | ||
let result = DoPutPreparedStatementResult { | ||
prepared_statement_handle: Some(handle), | ||
}; | ||
Ok(result) | ||
} | ||
|
||
async fn do_put_prepared_statement_update( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says errors are ignored, but the code doesn't seem to ignore errors. I wonder if I am misreading this or if the comment or code should be updated 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments never lie ;)
updated this to explain that we ignore the lack of a response from legacy servers, rather than any error.