-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the enhancement requested
Based on the proposal in https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit# .
See also the discussion thread: https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
In Flight RPC, FlightInfo includes addresses of workers alongside result partition info. This lets clients fetch data directly from workers1, in parallel or even distributed across multiple machines. But this also comes with tradeoffs.
Queries generally don’t complete instantly (as much as we would like them to). So where can we put the ‘query evaluation time’?
- In GetFlightInfo: block and wait for the query to complete.
- Con: this is a long-running blocking call, which may fail or time out. Then when the client retries, the server has to redo all the work.
- Con: parts of the result may be ready before others, but the client can’t do anything until everything is ready.
- In DoGet: return a fixed number of partitions
- Con: this makes handling worker failures hard. Systems like Trino support fault-tolerant execution by replacing workers at runtime. But GetFlightInfo has already passed, so we can’t notify the client of new workers2.
- Con: we have to know or fix the partitioning up front.
Neither solution is optimal.
Proposal
We can address this by adding a retryable version of GetFlightInfo. First, we add a new RPC call and result message:
service FlightService {
// ...
rpc PollFlightInfo(FlightDescriptor) returns (PollInfo);
}
message PollInfo {
// The currently available results so far.
FlightInfo info = 1;
// The descriptor the client should use on the next try.
// If unset, the query is complete.
FlightDescriptor flight_descriptor = 2;
// Query progress. Must be in [0.0, 1.0] but need not be
// monotonic or nondecreasing. If unknown, do not set.
optional double progress = 3;
// Expiration time for this request. After this passes, the server
// might not accept the retry descriptor anymore (and the query may
// be cancelled). This may be updated on a call to PollFlightInfo.
google.protobuf.Timestamp expiration_time = 4;
}A client executes a query and polls for result completion. The server returns a FlightInfo representing the state of the query execution up to that point.
sequenceDiagram
Client->>Server: PollFlightInfo(FlightDescriptor)
Server->>Client: PollInfo(FlightDescriptor', FlightInfo)
Client->>Server: PollFlightInfo(FlightDescriptor')
Server->>Client: PollInfo(FlightDescriptor'', FlightInfo)
Client->>Server: PollFlightInfo(FlightDescriptor'')
Server->>Client: PollInfo(_, FlightInfo)
The server:
-
Must respond with the complete FlightInfo each time, not just the delta between the previous and current FlightInfo.
-
Should respond as quickly as possible on the first call.
-
Should not respond until the result would be different from last time. (That way, the client can “long poll” for updates without constantly making requests. Clients can set a short timeout to avoid blocking calls if desired.)
-
May respond by only updating the
PollInfo.progressvalue (though it shouldn’t spam the client with updates). -
Should recognize a
PollInfo.flight_descriptorthat is not necessarily the latest (in case the client misses an update in between). -
Should only append to the endpoints in
FlightInfoeach time. (Otherwise the client has to do extra work to identify what endpoints it has and hasn’t seen.)When
FlightInfo.orderedis set, this means the server returns endpoints in order. -
Should return an error status instead of a response if the query fails. The client should not retry the request (except for TIMED_OUT and UNAVAILABLE, which may not originate from the server).
Prior Art
- Amazon Redshift: executing a query gives an ID that can be used to check the query status and fetch results.
- Google BigQuery Storage: you explicitly create a “read session”, after which you can read subsets of the response with further requests. There is no “query execution time” since BigQuery Storage only queries tables. Instead, running a query (with the base BigQuery API) will cache the result in a table that can be read via BigQuery Storage.
- Snowflake: short queries return synchronously. Longer queries require polling for completion of the query. You cannot retrieve any results until the query is complete.
Component(s)
C++, FlightRPC, Format, Go, Java
Footnotes
-
Of course, servers are free to return the location of a proxy/load balancer/etc., or omit locations and have the client fetch results from the same server that they issued the query to. Flight RPC offers this flexibility to servers; clients don’t have to know or care. ↩
-
Again, the server could proxy workers, or depend on Kubernetes DNS routing, or configure gRPC XDS. But this somewhat defeats the point of returning worker locations in the first place, and is much more complicated (operationally, implementation-wise). ↩