Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
02085c1
GH-35500: [C++][Go][FlightRPC] Add support for result set expiration
kou May 31, 2023
2635593
Use {}
kou Jun 9, 2023
e7fa0e0
Use std::nano::den
kou Jun 10, 2023
01cede8
Reorder
kou Jun 10, 2023
55e83ba
Use \deprecated
kou Jun 10, 2023
fc55763
Reorder
kou Jun 10, 2023
aca8f1e
Improve wording
kou Jun 10, 2023
eb87155
Reorder
kou Jun 10, 2023
15a00b3
Add FlightSqlClient::CloseFlightInfo
kou Jun 10, 2023
9a05526
Fix a typo
kou Jun 10, 2023
aa0fc17
Improve error message
kou Jun 10, 2023
04aacb1
Add missing closed check
kou Jun 10, 2023
7069008
Suppress warnings
kou Jun 13, 2023
06007b5
Clear statuses for each GetFlightInfo
kou Jun 13, 2023
f241e47
Use FlightInfo instead of query
kou Jun 13, 2023
bd44381
Order
kou Jun 13, 2023
afe15ce
Order
kou Jun 13, 2023
af3634c
Don't use CancelFlightInfo yet
kou Jun 14, 2023
d56418e
Add Go implementation
kou Jun 14, 2023
8932fe5
Format
kou Jun 14, 2023
ec97b0c
Add ResultStream::Drain()
kou Jun 14, 2023
0cf45f7
Improve name in .proto
kou Jun 14, 2023
a37c041
Add a test for CancelFlightInfo
kou Jun 14, 2023
ddb41d3
Fix format
kou Jun 14, 2023
11c7add
Add mock methods
kou Jun 14, 2023
769f6c9
Fix static check
kou Jun 14, 2023
dd77974
CancelResult -> CancelStatus
kou Jun 16, 2023
c744447
Don't use any for CancelFlightInfo
kou Jun 16, 2023
5d7f50a
Add Flight SQL server tests for built-in actions
kou Jun 16, 2023
294d65d
Make CancelQUery deprecated
kou Jun 16, 2023
9322427
Fix RefreshFlightEndpoint
kou Jun 16, 2023
56fdbce
Add document
kou Jun 16, 2023
39b8290
Fix wording
kou Jun 16, 2023
346f4d7
Add more deprecated
kou Jun 17, 2023
d0acc07
Fix staticcheck
kou Jun 18, 2023
28fe2b4
Move ReadUntilEOF from flightsql to flight
kou Jun 18, 2023
3944a66
Add shortcuts
kou Jun 19, 2023
871a1c1
Mock
kou Jun 19, 2023
0dc1984
Implement Flight SQL server methods
kou Jun 19, 2023
b9ee3e6
Fix style
kou Jun 19, 2023
2f99ac8
Improve comment
kou Jun 19, 2023
8d9a872
Clean up
kou Jun 19, 2023
69fe1cf
Add tests
kou Jun 19, 2023
5786663
Add default CancelQuery implementation
kou Jun 19, 2023
d4f38a0
Add missing FlightEndpoint implementations
kou Jun 19, 2023
e8d0c78
Fix a typo
kou Jun 19, 2023
1ca8e05
Use constructor style
kou Jun 19, 2023
037b715
Use local variable
kou Jun 19, 2023
246cd42
Fix expected
kou Jun 19, 2023
5f633b5
Fix a typo
kou Jun 19, 2023
69fc412
Change style
kou Jun 19, 2023
c4d765b
Specify std::nullopt explicitly
kou Jun 19, 2023
53e069f
Add more std::nullopt
kou Jun 19, 2023
187c417
Add missing cast
kou Jun 19, 2023
0158ebc
Fix CI falures
kou Jun 19, 2023
edc86da
Fix CI failures
kou Jun 19, 2023
d647e06
Update expected
kou Jun 19, 2023
f69de19
Update test value
kou Jun 19, 2023
99b141a
CancelFlightInfo: return only one result
kou Jun 20, 2023
d82fcac
Improve docstring
kou Jun 20, 2023
deeae0c
Simplify
kou Jun 20, 2023
59607f3
Remove CancelQuery
kou Jun 20, 2023
5c1fd0e
Fix wrong deprecated description of CancelQuery()
kou Jun 20, 2023
4a0a6d6
Add Java support for proposal
lidavidm Jun 16, 2023
c9b69a1
Remove unused cancelQueryRequest
kou Jun 21, 2023
7a7beef
Add fallback from cancelQuery to cancelFlightInfo
lidavidm Jun 21, 2023
201ab9d
Implement CancelFlightInfo
kou Jun 21, 2023
c6cf2fc
Deprecate FlightSqlServerBase::CancelQuery()
kou Jun 21, 2023
e0295d1
Fix a typo
kou Jun 22, 2023
73751ad
Remove needless import
kou Jun 22, 2023
fa8f7d4
Remove expired checks
kou Jun 22, 2023
3e21487
Remove needless checks
kou Jun 23, 2023
7d4ca10
Remove unused imports
kou Jun 23, 2023
1f26633
Add support for CancelQuery() method
kou Jun 23, 2023
05bea5f
Remove unused variables
kou Jun 23, 2023
99d4618
Refresh -> Renew
kou Jun 23, 2023
53152dc
Use RenewFlightInfoRequest
kou Jun 23, 2023
d0b44ae
Fix style
kou Jun 23, 2023
54a37e7
Remove unused import
lidavidm Jun 23, 2023
b732b12
Java: refresh -> renew
lidavidm Jun 23, 2023
111ecb9
Fix build error
kou Jun 23, 2023
e964103
Add CancelFlightInfoRequest
kou Jun 24, 2023
95295c5
Remove CloseFlightInfo
kou Jun 24, 2023
5acb080
Format
kou Jun 24, 2023
f0e3ace
Clean up
kou Jun 25, 2023
61f3666
java: Add CancelFlightInfoRequest
kou Jun 25, 2023
3affd5d
java: Remove CloseFlightInfo
kou Jun 25, 2023
18c96ff
Remove unused function
kou Jun 25, 2023
b5ba62b
Add ticket content to error message
kou Jun 26, 2023
c2f2f05
Increase expiration time
kou Jun 26, 2023
8c60daf
Follow name change
kou Jun 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,30 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
return DoAction(options, action).Value(results);
}

arrow::Result<CancelFlightInfoResult> FlightClient::CancelFlightInfo(
const FlightCallOptions& options, const CancelFlightInfoRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto cancel_result, CancelFlightInfoResult::Deserialize(
std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return std::move(cancel_result);
}

arrow::Result<FlightEndpoint> FlightClient::RenewFlightEndpoint(
const FlightCallOptions& options, const RenewFlightEndpointRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kRenewFlightEndpoint.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto renewed_endpoint,
FlightEndpoint::Deserialize(std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return std::move(renewed_endpoint);
}

arrow::Result<std::vector<ActionType>> FlightClient::ListActions(
const FlightCallOptions& options) {
std::vector<ActionType> actions;
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return DoAction({}, action).Value(results);
}

/// \brief Perform the CancelFlightInfo action, returning a
/// CancelFlightInfoResult
///
/// \param[in] options Per-RPC options
/// \param[in] request The CancelFlightInfoRequest
/// \return Arrow result with a CancelFlightInfoResult
arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
const FlightCallOptions& options, const CancelFlightInfoRequest& request);
arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
const CancelFlightInfoRequest& request) {
return CancelFlightInfo({}, request);
}

/// \brief Perform the RenewFlightEndpoint action, returning a renewed
/// FlightEndpoint
///
/// \param[in] options Per-RPC options
/// \param[in] request The RenewFlightEndpointRequest
/// \return Arrow result with a renewed FlightEndpoint
arrow::Result<FlightEndpoint> RenewFlightEndpoint(
const FlightCallOptions& options, const RenewFlightEndpointRequest& request);
arrow::Result<FlightEndpoint> RenewFlightEndpoint(
const RenewFlightEndpointRequest& request) {
return RenewFlightEndpoint({}, request);
}

/// \brief Retrieve a list of available Action types
/// \param[in] options Per-RPC options
/// \return Arrow result with the available actions
Expand Down
59 changes: 37 additions & 22 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,42 @@ TEST(FlightTypes, FlightDescriptor) {
TEST(FlightTypes, FlightEndpoint) {
ASSERT_OK_AND_ASSIGN(auto location1, Location::ForGrpcTcp("localhost", 1024));
ASSERT_OK_AND_ASSIGN(auto location2, Location::ForGrpcTls("localhost", 1024));
// 2023-06-19 03:14:06.004330100
// We must use microsecond resolution here for portability.
// std::chrono::system_clock::time_point may not provide nanosecond
// resolution on some platforms such as Windows.
const auto expiration_time_duration =
std::chrono::seconds{1687144446} + std::chrono::nanoseconds{4339000};
Timestamp expiration_time(
std::chrono::duration_cast<Timestamp::duration>(expiration_time_duration));
std::vector<FlightEndpoint> values = {
{{""}, {}},
{{"foo"}, {}},
{{"bar"}, {}},
{{"foo"}, {location1}},
{{"bar"}, {location1}},
{{"foo"}, {location2}},
{{"foo"}, {location1, location2}},
{{""}, {}, std::nullopt},
{{"foo"}, {}, std::nullopt},
{{"bar"}, {}, std::nullopt},
{{"foo"}, {}, expiration_time},
{{"foo"}, {location1}, std::nullopt},
{{"bar"}, {location1}, std::nullopt},
{{"foo"}, {location2}, std::nullopt},
{{"foo"}, {location1, location2}, std::nullopt},
};
std::vector<std::string> reprs = {
"<FlightEndpoint ticket=<Ticket ticket=''> locations=[]>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations=[]>",
"<FlightEndpoint ticket=<Ticket ticket=''> locations=[] "
"expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations=[] "
"expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=2023-06-19 03:14:06.004339000>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1024]>",
"[grpc+tcp://localhost:1024] expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations="
"[grpc+tcp://localhost:1024]>",
"[grpc+tcp://localhost:1024] expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tls://localhost:1024]>",
"[grpc+tls://localhost:1024] expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1024, grpc+tls://localhost:1024]>",
"[grpc+tcp://localhost:1024, grpc+tls://localhost:1024] "
"expiration_time=null>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightEndpoint>(values, reprs));
Expand All @@ -210,8 +225,8 @@ TEST(FlightTypes, FlightInfo) {
Schema schema2({});
auto desc1 = FlightDescriptor::Command("foo");
auto desc2 = FlightDescriptor::Command("bar");
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}};
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}, std::nullopt};
std::vector<FlightInfo> values = {
MakeFlightInfo(schema1, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc2, {}, -1, -1, true),
Expand All @@ -227,13 +242,13 @@ TEST(FlightTypes, FlightInfo) {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>] "
"total_records=-1 total_bytes=42 ordered=true>",
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>] total_records=-1 total_bytes=42 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>, "
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1 "
"ordered=false>",
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>, <FlightEndpoint ticket=<Ticket ticket='foo'> "
"locations=[grpc+tcp://localhost:1234] expiration_time=null>] "
"total_records=64 total_bytes=-1 ordered=false>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }

TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); }

TEST(FlightIntegration, ExpirationTimeDoGet) {
ASSERT_OK(RunScenario("expiration_time:do_get"));
}

TEST(FlightIntegration, ExpirationTimeListActions) {
ASSERT_OK(RunScenario("expiration_time:list_actions"));
}

TEST(FlightIntegration, ExpirationTimeCancelFlightInfo) {
ASSERT_OK(RunScenario("expiration_time:cancel_flight_info"));
}

TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}

TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }

TEST(FlightIntegration, FlightSqlExtension) {
Expand Down
Loading