Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions java/flight/flight-sql-jdbc-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>me.alexpanov</groupId>
<artifactId>free-port-finder</artifactId>
<version>1.1.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
Expand All @@ -49,12 +50,14 @@
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.Meta.StatementType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link FlightSqlClient} handler.
*/
public final class ArrowFlightSqlClientHandler implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);
private final FlightSqlClient sqlClient;
private final Set<CallOption> options = new HashSet<>();

Expand Down Expand Up @@ -189,7 +192,18 @@ public Schema getDataSetSchema() {

@Override
public void close() {
preparedStatement.close(getOptions());
try {
preparedStatement.close(getOptions());
} catch (FlightRuntimeException fre) {
// ARROW-17785: suppress exceptions caused by flaky gRPC layer
if (fre.status().code().equals(FlightStatusCode.UNAVAILABLE) ||
(fre.status().code().equals(FlightStatusCode.INTERNAL) &&
fre.getMessage().contains("Connection closed after GOAWAY"))) {
LOGGER.warn("Supressed error closing PreparedStatement", fre);
return;
}
throw fre;
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public class ArrowFlightJdbcConnectionPoolDataSourceTest {
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.producer(PRODUCER)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public class ArrowFlightJdbcDriverTest {
new UserPasswordAuthentication.Builder().user("user1", "pass1").user("user2", "pass2")
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ public class ArrowFlightJdbcFactoryTest {
new UserPasswordAuthentication.Builder().user("user1", "pass1").user("user2", "pass2")
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ public class ConnectionTest {
.user(userTest, passTest)
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public class ConnectionTlsTest {
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.useEncryption(certKey.cert, certKey.key)
.producer(PRODUCER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import me.alexpanov.net.FreePortFinder;

/**
* Utility class for unit tests that need to instantiate a {@link FlightServer}
* and interact with it.
Expand Down Expand Up @@ -95,8 +93,6 @@ public static FlightServerTestRule createStandardTestRule(final FlightSqlProduce
.build();

return new Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.producer(producer)
.build();
Expand All @@ -106,11 +102,6 @@ ArrowFlightJdbcDataSource createDataSource() {
return ArrowFlightJdbcDataSource.createNewDataSource(properties);
}

ArrowFlightJdbcDataSource createDataSource(String token) {
properties.put("token", token);
return ArrowFlightJdbcDataSource.createNewDataSource(properties);
}

public ArrowFlightJdbcConnectionPoolDataSource createConnectionPoolDataSource() {
return ArrowFlightJdbcConnectionPoolDataSource.createNewDataSource(properties);
}
Expand Down Expand Up @@ -159,9 +150,8 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
try (FlightServer flightServer =
getStartServer(location ->
initiateServer(location), 3)) {
try (FlightServer flightServer = getStartServer(location -> initiateServer(location), 3)) {
properties.put("port", flightServer.getPort());
LOGGER.info("Started " + FlightServer.class.getName() + " as " + flightServer);
base.evaluate();
} finally {
Expand All @@ -174,12 +164,9 @@ public void evaluate() throws Throwable {
private FlightServer getStartServer(CheckedFunction<Location, FlightServer> newServerFromLocation,
int retries)
throws IOException {

final Deque<ReflectiveOperationException> exceptions = new ArrayDeque<>();

for (; retries > 0; retries--) {
final Location location = Location.forGrpcInsecure(config.getHost(), config.getPort());
final FlightServer server = newServerFromLocation.apply(location);
final FlightServer server = newServerFromLocation.apply(Location.forGrpcInsecure("localhost", 0));
try {
Method start = server.getClass().getMethod("start");
start.setAccessible(true);
Expand All @@ -189,9 +176,7 @@ private FlightServer getStartServer(CheckedFunction<Location, FlightServer> newS
exceptions.add(e);
}
}

exceptions.forEach(
e -> LOGGER.error("Failed to start a new " + FlightServer.class.getName() + ".", e));
exceptions.forEach(e -> LOGGER.error("Failed to start FlightServer", e));
throw new IOException(exceptions.pop().getCause());
}

Expand Down Expand Up @@ -223,44 +208,14 @@ public void close() throws Exception {
* Builder for {@link FlightServerTestRule}.
*/
public static final class Builder {
private final Properties properties = new Properties();
private final Properties properties;
private FlightSqlProducer producer;
private Authentication authentication;
private CertKeyPair certKeyPair;

/**
* Sets the host for the server rule.
*
* @param host the host value.
* @return the Builder.
*/
public Builder host(final String host) {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.HOST.camelName(),
host);
return this;
}

/**
* Sets a random port to be used by the server rule.
*
* @return the Builder.
*/
public Builder randomPort() {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT.camelName(),
FreePortFinder.findFreeLocalPort());
return this;
}

/**
* Sets a specific port to be used by the server rule.
*
* @param port the port value.
* @return the Builder.
*/
public Builder port(final int port) {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT.camelName(),
port);
return this;
public Builder() {
this.properties = new Properties();
this.properties.put("host", "localhost");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public void testShouldRunSelectQuerySettingLargeMaxRowLimit() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(
CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {

final long maxRowsLimit = 3;
statement.setLargeMaxRows(maxRowsLimit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class TokenAuthenticationTest {

static {
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(new TokenAuthentication.Builder()
.token("1234")
.build())
Expand Down