Skip to content

Commit

Permalink
make DoPutPreparedStatenentResult mandatory
Browse files Browse the repository at this point in the history
  • Loading branch information
erratic-pattern committed Mar 10, 2024
1 parent 8ba176e commit e8067f7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 24 deletions.
2 changes: 1 addition & 1 deletion arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
&self,
_query: CommandPreparedStatementQuery,
_request: Request<PeekableFlightDataStream>,
) -> Result<Option<DoPutPreparedStatementResult>, Status> {
) -> Result<DoPutPreparedStatementResult, Status> {
Err(Status::unimplemented(
"do_put_prepared_statement_query not implemented",
))
Expand Down
23 changes: 8 additions & 15 deletions arrow-flight/src/sql/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,14 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {

/// Bind parameters to given prepared statement.
///
/// Note that `DoPutPreparedStatementResult` contains an optional
/// opaque handle that the client should pass
/// Returns an opaque handle that the client should pass
/// back to the server during subsequent requests with this
/// prepared statement.
async fn do_put_prepared_statement_query(
&self,
_query: CommandPreparedStatementQuery,
_request: Request<PeekableFlightDataStream>,
) -> Result<Option<DoPutPreparedStatementResult>, Status> {
) -> Result<DoPutPreparedStatementResult, Status> {
Err(Status::unimplemented(
"do_put_prepared_statement_query has no default implementation",
))
Expand Down Expand Up @@ -715,19 +714,13 @@ where
Ok(Response::new(Box::pin(output)))
}
Command::CommandPreparedStatementQuery(command) => {
if let Some(result) = self
let result = self
.do_put_prepared_statement_query(command, request)
.await?
{
let output = futures::stream::iter(vec![Ok(PutResult {
app_metadata: result.as_any().encode_to_vec().into(),
})]);
Ok(Response::new(Box::pin(output)))
} else {
Ok(Response::new(
futures::stream::once(async { Ok(PutResult::default()) }).boxed(),
))
}
.await?;
let output = futures::stream::iter(vec![Ok(PutResult {
app_metadata: result.as_any().encode_to_vec().into(),
})]);
Ok(Response::new(Box::pin(output)))
}
Command::CommandStatementSubstraitPlan(command) => {
let record_count = self.do_put_substrait_plan(command, request).await?;
Expand Down
17 changes: 9 additions & 8 deletions arrow-flight/tests/flight_sql_client_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
&self,
_query: CommandPreparedStatementQuery,
request: Request<PeekableFlightDataStream>,
) -> Result<Option<DoPutPreparedStatementResult>, Status> {
) -> Result<DoPutPreparedStatementResult, Status> {
// just make sure decoding the parameters works
let parameters = FlightRecordBatchStream::new_from_flight_data(
request.into_inner().map_err(|e| e.into()),
Expand All @@ -579,14 +579,15 @@ impl FlightSqlService for FlightSqlServiceImpl {
)));
}
}
if self.stateless_prepared_statements {
let result = DoPutPreparedStatementResult {
prepared_statement_handle: UPDATED_PREPARED_STATEMENT_HANDLE.to_string().into(),
};
Ok(Some(result))
let prepared_statement_handle = if self.stateless_prepared_statements {
UPDATED_PREPARED_STATEMENT_HANDLE.to_string().into()
} else {
Ok(None)
}
PREPARED_STATEMENT_HANDLE.to_string().into()
};
let result = DoPutPreparedStatementResult {
prepared_statement_handle,
};
Ok(result)
}

async fn do_put_prepared_statement_update(
Expand Down

0 comments on commit e8067f7

Please sign in to comment.