-
Notifications
You must be signed in to change notification settings - Fork 796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Flight proto: PollFlightInfo & expiration time #5413
Conversation
pub async fn cancel_flight_info( | ||
&mut self, | ||
request: CancelFlightInfoRequest, | ||
) -> Result<CancelFlightInfoResult> { | ||
let action = Action::new("CancelFlightInfo", request.encode_to_vec()); | ||
let response = self.do_action(action).await?.try_next().await?; | ||
let response = response.ok_or(FlightError::protocol( | ||
"Received no response for cancel_flight_info call", | ||
))?; | ||
CancelFlightInfoResult::decode(response) | ||
.map_err(|e| FlightError::DecodeError(e.to_string())) | ||
} | ||
|
||
/// Make a `RenewFlightEndpoint` call to the server and return | ||
/// the renewed [`FlightEndpoint`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use arrow_flight::{FlightClient, FlightDescriptor, RenewFlightEndpointRequest}; | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // Send a 'CMD' request to the server | ||
/// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec()); | ||
/// let flight_endpoint = client | ||
/// .get_flight_info(request) | ||
/// .await | ||
/// .expect("error handshaking") | ||
/// .endpoint[0]; | ||
/// | ||
/// // Renew the endpoint | ||
/// let request = RenewFlightEndpointRequest::new(flight_endpoint); | ||
/// let flight_endpoint = client | ||
/// .renew_flight_endpoint(request) | ||
/// .await | ||
/// .expect("error renewing"); | ||
/// # } | ||
/// ``` | ||
pub async fn renew_flight_endpoint( | ||
&mut self, | ||
request: RenewFlightEndpointRequest, | ||
) -> Result<FlightEndpoint> { | ||
let action = Action::new("RenewFlightEndpoint", request.encode_to_vec()); | ||
let response = self.do_action(action).await?.try_next().await?; | ||
let response = response.ok_or(FlightError::protocol( | ||
"Received no response for renew_flight_endpoint call", | ||
))?; | ||
FlightEndpoint::decode(response).map_err(|e| FlightError::DecodeError(e.to_string())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't sure if to add these to the SQL client & server as well? Maybe in a followup PR to keep size down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think the SQL interface should support this feature as well. Let's keep that out of this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raise #5420
/// .with_descriptor( | ||
/// FlightDescriptor::new_cmd("a command") | ||
/// ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since was also calling with_descriptor(...)
again below
@@ -391,6 +391,7 @@ impl FlightSqlServiceClient<Channel> { | |||
|
|||
/// Explicitly shut down and clean up the client. | |||
pub async fn close(&mut self) -> Result<(), ArrowError> { | |||
// TODO: consume self instead of &mut self to explicitly prevent reuse? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on this? Since close()
doesn't exactly close anything at the moment, so maybe better to force closure via taking self
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that close
should be consuming 👍
Let's do that in a follow-up PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raised #5421
This looks plausible to me, but I'm not very familiar with this code anymore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking care of this!
@@ -391,6 +391,7 @@ impl FlightSqlServiceClient<Channel> { | |||
|
|||
/// Explicitly shut down and clean up the client. | |||
pub async fn close(&mut self) -> Result<(), ArrowError> { | |||
// TODO: consume self instead of &mut self to explicitly prevent reuse? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that close
should be consuming 👍
Let's do that in a follow-up PR though.
pub async fn cancel_flight_info( | ||
&mut self, | ||
request: CancelFlightInfoRequest, | ||
) -> Result<CancelFlightInfoResult> { | ||
let action = Action::new("CancelFlightInfo", request.encode_to_vec()); | ||
let response = self.do_action(action).await?.try_next().await?; | ||
let response = response.ok_or(FlightError::protocol( | ||
"Received no response for cancel_flight_info call", | ||
))?; | ||
CancelFlightInfoResult::decode(response) | ||
.map_err(|e| FlightError::DecodeError(e.to_string())) | ||
} | ||
|
||
/// Make a `RenewFlightEndpoint` call to the server and return | ||
/// the renewed [`FlightEndpoint`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use arrow_flight::{FlightClient, FlightDescriptor, RenewFlightEndpointRequest}; | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // Send a 'CMD' request to the server | ||
/// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec()); | ||
/// let flight_endpoint = client | ||
/// .get_flight_info(request) | ||
/// .await | ||
/// .expect("error handshaking") | ||
/// .endpoint[0]; | ||
/// | ||
/// // Renew the endpoint | ||
/// let request = RenewFlightEndpointRequest::new(flight_endpoint); | ||
/// let flight_endpoint = client | ||
/// .renew_flight_endpoint(request) | ||
/// .await | ||
/// .expect("error renewing"); | ||
/// # } | ||
/// ``` | ||
pub async fn renew_flight_endpoint( | ||
&mut self, | ||
request: RenewFlightEndpointRequest, | ||
) -> Result<FlightEndpoint> { | ||
let action = Action::new("RenewFlightEndpoint", request.encode_to_vec()); | ||
let response = self.do_action(action).await?.try_next().await?; | ||
let response = response.ok_or(FlightError::protocol( | ||
"Received no response for renew_flight_endpoint call", | ||
))?; | ||
FlightEndpoint::decode(response).map_err(|e| FlightError::DecodeError(e.to_string())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think the SQL interface should support this feature as well. Let's keep that out of this PR though.
Thank you @Jefffrey and @crepererum |
Which issue does this PR close?
Closes #5367
Rationale for this change
Update
Flight.proto
to as of: https://github.com/apache/arrow/blob/0993b369c4b91d81a17166d1427e7c26cd9beee4/format/Flight.protoUpdate
FlightSql.proto
to as of: https://github.com/apache/arrow/blob/0993b369c4b91d81a17166d1427e7c26cd9beee4/format/FlightSql.protoTaking inspiration from these arrow commits:
What changes are included in this PR?
prost-types
to get theTimestamp
struct (also bumpprost
version to latest)FlightEndpoint
:expiration_time
andapp_metadata
FlightInfo
:app_metadata
PollInfo
CancelFlightInfoRequest
CancelFlightInfoResult
RenewFlightEndpointRequest
poll_flight_info
method for Flight serverpoll_flight_info
cancel_flight_info
renew_flight_endpoint
Are there any user-facing changes?
Yes