Skip to content

Commit aa6b398

Browse files
authored
GH-38573: [Java][FlightRPC] Try all locations in JDBC driver (#40104)
### Rationale for this change This brings the JDBC driver up to par with other Flight SQL clients. ### What changes are included in this PR? Try multiple locations for the Flight SQL driver. ### Are these changes tested? Yes ### Are there any user-facing changes? No * Closes: #38573 Authored-by: David Li <li.davidm96@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
1 parent 11ef68d commit aa6b398

File tree

4 files changed

+137
-19
lines changed

4 files changed

+137
-19
lines changed

java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,8 @@ public FlightClient build() {
909909

910910
builder
911911
.maxTraceEvents(MAX_CHANNEL_TRACE_EVENTS)
912-
.maxInboundMessageSize(maxInboundMessageSize);
912+
.maxInboundMessageSize(maxInboundMessageSize)
913+
.maxInboundMetadataSize(maxInboundMessageSize);
913914
return new FlightClient(allocator, builder.build(), middleware);
914915
}
915916
}

java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -116,26 +116,47 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
116116
sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
117117
} else {
118118
// Clone the builder and then set the new endpoint on it.
119-
// GH-38573: This code currently only tries the first Location and treats a failure as fatal.
120-
// This should be changed to try other Locations that are available.
121-
119+
122120
// GH-38574: Currently a new FlightClient will be made for each partition that returns a non-empty Location
123121
// then disposed of. It may be better to cache clients because a server may report the same Locations.
124122
// It would also be good to identify when the reported location is the same as the original connection's
125123
// Location and skip creating a FlightClient in that scenario.
126-
final URI endpointUri = endpoint.getLocations().get(0).getUri();
127-
final Builder builderForEndpoint = new Builder(ArrowFlightSqlClientHandler.this.builder)
128-
.withHost(endpointUri.getHost())
129-
.withPort(endpointUri.getPort())
130-
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
131-
132-
final ArrowFlightSqlClientHandler endpointHandler = builderForEndpoint.build();
133-
try {
134-
endpoints.add(new CloseableEndpointStreamPair(
135-
endpointHandler.sqlClient.getStream(endpoint.getTicket(),
136-
endpointHandler.getOptions()), endpointHandler.sqlClient));
137-
} catch (Exception ex) {
138-
AutoCloseables.close(endpointHandler);
124+
List<Exception> exceptions = new ArrayList<>();
125+
CloseableEndpointStreamPair stream = null;
126+
for (Location location : endpoint.getLocations()) {
127+
final URI endpointUri = location.getUri();
128+
final Builder builderForEndpoint = new Builder(ArrowFlightSqlClientHandler.this.builder)
129+
.withHost(endpointUri.getHost())
130+
.withPort(endpointUri.getPort())
131+
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
132+
133+
ArrowFlightSqlClientHandler endpointHandler = null;
134+
try {
135+
endpointHandler = builderForEndpoint.build();
136+
stream = new CloseableEndpointStreamPair(
137+
endpointHandler.sqlClient.getStream(endpoint.getTicket(),
138+
endpointHandler.getOptions()), endpointHandler.sqlClient);
139+
// Make sure we actually get data from the server
140+
stream.getStream().getSchema();
141+
} catch (Exception ex) {
142+
if (endpointHandler != null) {
143+
AutoCloseables.close(endpointHandler);
144+
}
145+
exceptions.add(ex);
146+
continue;
147+
}
148+
break;
149+
}
150+
if (stream != null) {
151+
endpoints.add(stream);
152+
} else if (exceptions.isEmpty()) {
153+
// This should never happen...
154+
throw new IllegalStateException("Could not connect to endpoint and no errors occurred");
155+
} else {
156+
Exception ex = exceptions.remove(0);
157+
while (!exceptions.isEmpty()) {
158+
ex.addSuppressed(exceptions.remove(exceptions.size() - 1));
159+
}
139160
throw ex;
140161
}
141162
}

java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,14 @@ private CloseableEndpointStreamPair next(final EndpointStreamSupplier endpointSt
108108
if (endpoint != null) {
109109
return endpoint;
110110
}
111-
} catch (final ExecutionException | InterruptedException | CancellationException e) {
111+
} catch (final ExecutionException e) {
112+
// Unwrap one layer
113+
final Throwable cause = e.getCause();
114+
if (cause instanceof FlightRuntimeException) {
115+
throw (FlightRuntimeException) cause;
116+
}
117+
throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
118+
} catch (InterruptedException | CancellationException e) {
112119
throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
113120
}
114121
}

java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.sql.Statement;
4040
import java.util.ArrayList;
4141
import java.util.Arrays;
42+
import java.util.Collections;
4243
import java.util.HashSet;
4344
import java.util.List;
4445
import java.util.Random;
@@ -49,7 +50,10 @@
4950
import org.apache.arrow.driver.jdbc.utils.PartitionedFlightSqlProducer;
5051
import org.apache.arrow.flight.FlightEndpoint;
5152
import org.apache.arrow.flight.FlightProducer;
53+
import org.apache.arrow.flight.FlightRuntimeException;
5254
import org.apache.arrow.flight.FlightServer;
55+
import org.apache.arrow.flight.FlightStatusCode;
56+
import org.apache.arrow.flight.Location;
5357
import org.apache.arrow.flight.Ticket;
5458
import org.apache.arrow.memory.BufferAllocator;
5559
import org.apache.arrow.memory.RootAllocator;
@@ -63,6 +67,7 @@
6367
import org.junit.ClassRule;
6468
import org.junit.Rule;
6569
import org.junit.Test;
70+
import org.junit.jupiter.api.Assertions;
6671
import org.junit.rules.ErrorCollector;
6772

6873
import com.google.common.collect.ImmutableSet;
@@ -351,7 +356,7 @@ public void testShouldInterruptFlightStreamsIfQueryIsCancelledMidProcessingForTi
351356
.toString(),
352357
anyOf(is(format("Error while executing SQL \"%s\": Query canceled", query)),
353358
allOf(containsString(format("Error while executing SQL \"%s\"", query)),
354-
containsString("CANCELLED"))));
359+
anyOf(containsString("CANCELLED"), containsString("Cancelling")))));
355360
}
356361
}
357362

@@ -455,6 +460,90 @@ allocator, forGrpcInsecure("localhost", 0), rootProducer)
455460
}
456461
}
457462

463+
@Test
464+
public void testPartitionedFlightServerIgnoreFailure() throws Exception {
465+
final Schema schema = new Schema(
466+
Collections.singletonList(Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
467+
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
468+
final FlightEndpoint firstEndpoint =
469+
new FlightEndpoint(new Ticket("first".getBytes(StandardCharsets.UTF_8)),
470+
Location.forGrpcInsecure("127.0.0.2", 1234),
471+
Location.forGrpcInsecure("127.0.0.3", 1234));
472+
473+
try (final PartitionedFlightSqlProducer rootProducer = new PartitionedFlightSqlProducer(
474+
schema, firstEndpoint);
475+
FlightServer rootServer = FlightServer.builder(
476+
allocator, forGrpcInsecure("localhost", 0), rootProducer)
477+
.build()
478+
.start();
479+
Connection newConnection = DriverManager.getConnection(String.format(
480+
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
481+
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
482+
Statement newStatement = newConnection.createStatement()) {
483+
final SQLException e = Assertions.assertThrows(SQLException.class, () -> {
484+
ResultSet result = newStatement.executeQuery("Select partitioned_data");
485+
while (result.next()) {
486+
}
487+
});
488+
final Throwable cause = e.getCause();
489+
Assertions.assertTrue(cause instanceof FlightRuntimeException);
490+
final FlightRuntimeException fre = (FlightRuntimeException) cause;
491+
Assertions.assertEquals(FlightStatusCode.UNAVAILABLE, fre.status().code());
492+
}
493+
}
494+
}
495+
496+
@Test
497+
public void testPartitionedFlightServerAllFailure() throws Exception {
498+
// Arrange
499+
final Schema schema = new Schema(
500+
Collections.singletonList(Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
501+
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
502+
VectorSchemaRoot firstPartition = VectorSchemaRoot.create(schema, allocator)) {
503+
firstPartition.setRowCount(1);
504+
((IntVector) firstPartition.getVector(0)).set(0, 1);
505+
506+
// Construct the data-only nodes first.
507+
FlightProducer firstProducer = new PartitionedFlightSqlProducer.DataOnlyFlightSqlProducer(
508+
new Ticket("first".getBytes(StandardCharsets.UTF_8)), firstPartition);
509+
510+
final FlightServer.Builder firstBuilder = FlightServer.builder(
511+
allocator, forGrpcInsecure("localhost", 0), firstProducer);
512+
513+
// Run the data-only nodes so that we can get the Locations they are running at.
514+
try (FlightServer firstServer = firstBuilder.build()) {
515+
firstServer.start();
516+
final Location badLocation = Location.forGrpcInsecure("127.0.0.2", 1234);
517+
final FlightEndpoint firstEndpoint =
518+
new FlightEndpoint(new Ticket("first".getBytes(StandardCharsets.UTF_8)),
519+
badLocation, firstServer.getLocation());
520+
521+
// Finally start the root node.
522+
try (final PartitionedFlightSqlProducer rootProducer = new PartitionedFlightSqlProducer(
523+
schema, firstEndpoint);
524+
FlightServer rootServer = FlightServer.builder(
525+
allocator, forGrpcInsecure("localhost", 0), rootProducer)
526+
.build()
527+
.start();
528+
Connection newConnection = DriverManager.getConnection(String.format(
529+
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
530+
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
531+
Statement newStatement = newConnection.createStatement();
532+
// Act
533+
ResultSet result = newStatement.executeQuery("Select partitioned_data")) {
534+
List<Integer> resultData = new ArrayList<>();
535+
while (result.next()) {
536+
resultData.add(result.getInt(1));
537+
}
538+
539+
// Assert
540+
assertEquals(firstPartition.getRowCount(), resultData.size());
541+
assertTrue(resultData.contains(((IntVector) firstPartition.getVector(0)).get(0)));
542+
}
543+
}
544+
}
545+
}
546+
458547
@Test
459548
public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception {
460549
try (Statement statement = connection.createStatement();

0 commit comments

Comments
 (0)