Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Flight proto: PollFlightInfo & expiration time #5413

Merged
merged 1 commit into from
Feb 22, 2024

Conversation

Jefffrey
Copy link
Contributor

Which issue does this PR close?

Closes #5367

Rationale for this change

Update Flight.proto to as of: https://github.com/apache/arrow/blob/0993b369c4b91d81a17166d1427e7c26cd9beee4/format/Flight.proto

Update FlightSql.proto to as of: https://github.com/apache/arrow/blob/0993b369c4b91d81a17166d1427e7c26cd9beee4/format/FlightSql.proto

Taking inspiration from these arrow commits:

What changes are included in this PR?

  • New dependency on prost-types to get the Timestamp struct (also bump prost version to latest)
  • New fields for following structs (with relevant builder and display impl amended):
    • FlightEndpoint: expiration_time and app_metadata
    • FlightInfo: app_metadata
  • New structs:
    • PollInfo
    • CancelFlightInfoRequest
    • CancelFlightInfoResult
    • RenewFlightEndpointRequest
  • New poll_flight_info method for Flight server
  • New methods on Flight client
    • poll_flight_info
    • cancel_flight_info
    • renew_flight_endpoint

Are there any user-facing changes?

Yes

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Feb 20, 2024
Comment on lines +630 to +678
pub async fn cancel_flight_info(
&mut self,
request: CancelFlightInfoRequest,
) -> Result<CancelFlightInfoResult> {
let action = Action::new("CancelFlightInfo", request.encode_to_vec());
let response = self.do_action(action).await?.try_next().await?;
let response = response.ok_or(FlightError::protocol(
"Received no response for cancel_flight_info call",
))?;
CancelFlightInfoResult::decode(response)
.map_err(|e| FlightError::DecodeError(e.to_string()))
}

/// Make a `RenewFlightEndpoint` call to the server and return
/// the renewed [`FlightEndpoint`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use arrow_flight::{FlightClient, FlightDescriptor, RenewFlightEndpointRequest};
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Send a 'CMD' request to the server
/// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
/// let flight_endpoint = client
/// .get_flight_info(request)
/// .await
/// .expect("error handshaking")
/// .endpoint[0];
///
/// // Renew the endpoint
/// let request = RenewFlightEndpointRequest::new(flight_endpoint);
/// let flight_endpoint = client
/// .renew_flight_endpoint(request)
/// .await
/// .expect("error renewing");
/// # }
/// ```
pub async fn renew_flight_endpoint(
&mut self,
request: RenewFlightEndpointRequest,
) -> Result<FlightEndpoint> {
let action = Action::new("RenewFlightEndpoint", request.encode_to_vec());
let response = self.do_action(action).await?.try_next().await?;
let response = response.ok_or(FlightError::protocol(
"Received no response for renew_flight_endpoint call",
))?;
FlightEndpoint::decode(response).map_err(|e| FlightError::DecodeError(e.to_string()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure if to add these to the SQL client & server as well? Maybe in a followup PR to keep size down?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think the SQL interface should support this feature as well. Let's keep that out of this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise #5420

Comment on lines -475 to -477
/// .with_descriptor(
/// FlightDescriptor::new_cmd("a command")
/// )
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since was also calling with_descriptor(...) again below

@@ -391,6 +391,7 @@ impl FlightSqlServiceClient<Channel> {

/// Explicitly shut down and clean up the client.
pub async fn close(&mut self) -> Result<(), ArrowError> {
// TODO: consume self instead of &mut self to explicitly prevent reuse?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this? Since close() doesn't exactly close anything at the moment, so maybe better to force closure via taking self

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that close should be consuming 👍

Let's do that in a follow-up PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised #5421

@tustvold tustvold requested a review from alamb February 21, 2024 16:52
@tustvold
Copy link
Contributor

This looks plausible to me, but I'm not very familiar with this code anymore

Copy link
Contributor

@crepererum crepererum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking care of this!

@@ -391,6 +391,7 @@ impl FlightSqlServiceClient<Channel> {

/// Explicitly shut down and clean up the client.
pub async fn close(&mut self) -> Result<(), ArrowError> {
// TODO: consume self instead of &mut self to explicitly prevent reuse?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that close should be consuming 👍

Let's do that in a follow-up PR though.

Comment on lines +630 to +678
pub async fn cancel_flight_info(
&mut self,
request: CancelFlightInfoRequest,
) -> Result<CancelFlightInfoResult> {
let action = Action::new("CancelFlightInfo", request.encode_to_vec());
let response = self.do_action(action).await?.try_next().await?;
let response = response.ok_or(FlightError::protocol(
"Received no response for cancel_flight_info call",
))?;
CancelFlightInfoResult::decode(response)
.map_err(|e| FlightError::DecodeError(e.to_string()))
}

/// Make a `RenewFlightEndpoint` call to the server and return
/// the renewed [`FlightEndpoint`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use arrow_flight::{FlightClient, FlightDescriptor, RenewFlightEndpointRequest};
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Send a 'CMD' request to the server
/// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
/// let flight_endpoint = client
/// .get_flight_info(request)
/// .await
/// .expect("error handshaking")
/// .endpoint[0];
///
/// // Renew the endpoint
/// let request = RenewFlightEndpointRequest::new(flight_endpoint);
/// let flight_endpoint = client
/// .renew_flight_endpoint(request)
/// .await
/// .expect("error renewing");
/// # }
/// ```
pub async fn renew_flight_endpoint(
&mut self,
request: RenewFlightEndpointRequest,
) -> Result<FlightEndpoint> {
let action = Action::new("RenewFlightEndpoint", request.encode_to_vec());
let response = self.do_action(action).await?.try_next().await?;
let response = response.ok_or(FlightError::protocol(
"Received no response for renew_flight_endpoint call",
))?;
FlightEndpoint::decode(response).map_err(|e| FlightError::DecodeError(e.to_string()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think the SQL interface should support this feature as well. Let's keep that out of this PR though.

@crepererum crepererum merged commit e7ce4bb into apache:master Feb 22, 2024
16 checks passed
@Jefffrey Jefffrey deleted the update_flight_proto branch February 22, 2024 20:02
@alamb
Copy link
Contributor

alamb commented Feb 26, 2024

Thank you @Jefffrey and @crepererum

@tustvold tustvold mentioned this pull request Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Update Flight proto
4 participants