Skip to content

Commit ad59035

Browse files
authored
GH-990: [JDBC] Fix memory leak on Connection#close due to unclosed ResultSet(s) (#991)
## What's Changed Closing a Connection when there was one or more unclosed ResultSet that had been obtained via methods of the interface DatabaseMetaData would generate exceptions due to memory leaks. Now, closing a Connection will first close all the ResultSet instances obtained from DatabaseMetadata instances associated with that Connection. Closes #990.
1 parent 0eb50b5 commit ad59035

File tree

4 files changed

+113
-7
lines changed

4 files changed

+113
-7
lines changed

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.netty.util.concurrent.DefaultThreadFactory;
2222
import java.sql.SQLException;
2323
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.Map;
2426
import java.util.Properties;
2527
import java.util.concurrent.ExecutorService;
2628
import java.util.concurrent.Executors;
@@ -42,6 +44,8 @@ public final class ArrowFlightConnection extends AvaticaConnection {
4244
private final ArrowFlightSqlClientHandler clientHandler;
4345
private final ArrowFlightConnectionConfigImpl config;
4446
private ExecutorService executorService;
47+
private int metadataResultSetCount;
48+
private Map<Integer, ArrowFlightJdbcFlightStreamResultSet> metadataResultSetMap = new HashMap<>();
4549

4650
/**
4751
* Creates a new {@link ArrowFlightConnection}.
@@ -66,6 +70,7 @@ private ArrowFlightConnection(
6670
this.config = Preconditions.checkNotNull(config, "Config cannot be null.");
6771
this.allocator = Preconditions.checkNotNull(allocator, "Allocator cannot be null.");
6872
this.clientHandler = Preconditions.checkNotNull(clientHandler, "Handler cannot be null.");
73+
this.metadataResultSetCount = 0;
6974
}
7075

7176
/**
@@ -173,6 +178,31 @@ synchronized ExecutorService getExecutorService() {
173178
: executorService;
174179
}
175180

181+
/**
182+
* Registers a new metadata ResultSet and assigns it a unique ID. Metadata ResultSets are those
183+
* created without an associated Statement.
184+
*
185+
* @param resultSet the ResultSet to register
186+
* @return the assigned ID
187+
*/
188+
int getNewMetadataResultSetId(ArrowFlightJdbcFlightStreamResultSet resultSet) {
189+
metadataResultSetMap.put(metadataResultSetCount, resultSet);
190+
return metadataResultSetCount++;
191+
}
192+
193+
/**
194+
* Unregisters a metadata ResultSet when it is closed. This method is called by metadata
195+
* ResultSets during their close operation to remove themselves from the tracking map.
196+
*
197+
* @param id the ID of the ResultSet to unregister, or null if not a metadata ResultSet
198+
*/
199+
void onResultSetClose(Integer id) {
200+
if (id == null) {
201+
return;
202+
}
203+
metadataResultSetMap.remove(id);
204+
}
205+
176206
@Override
177207
public Properties getClientInfo() {
178208
final Properties copy = new Properties();
@@ -190,7 +220,9 @@ public void close() throws SQLException {
190220
} catch (final Exception e) {
191221
topLevelException = e;
192222
}
223+
// copies of the collections are used to avoid concurrent modification problems
193224
ArrayList<AutoCloseable> closeables = new ArrayList<>(statementMap.values());
225+
closeables.addAll(new ArrayList<>(metadataResultSetMap.values()));
194226
closeables.add(clientHandler);
195227
closeables.addAll(allocator.getChildAllocators());
196228
closeables.add(allocator);

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public final class ArrowFlightJdbcFlightStreamResultSet
5454
private VectorSchemaRoot currentVectorSchemaRoot;
5555

5656
private Schema schema;
57+
private Integer id = null; // used for metadata result sets only
5758

5859
/** Public constructor used by ArrowFlightJdbcFactory. */
5960
ArrowFlightJdbcFlightStreamResultSet(
@@ -82,6 +83,7 @@ private ArrowFlightJdbcFlightStreamResultSet(
8283
super(null, state, signature, resultSetMetaData, timeZone, firstFrame);
8384
this.connection = connection;
8485
this.flightInfo = flightInfo;
86+
this.id = connection.getNewMetadataResultSetId(this);
8587
}
8688

8789
/**
@@ -234,7 +236,12 @@ protected void cancel() {
234236

235237
@Override
236238
public synchronized void close() {
239+
237240
try {
241+
if (isClosed()) {
242+
return;
243+
}
244+
this.connection.onResultSetClose(id);
238245
if (flightEndpointDataQueue != null) {
239246
// flightStreamQueue should close currentFlightStream internally
240247
flightEndpointDataQueue.close();

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.sql.Types;
2323
import java.util.HashSet;
2424
import java.util.List;
25-
import java.util.Objects;
2625
import java.util.Set;
2726
import java.util.TimeZone;
2827
import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
@@ -159,12 +158,10 @@ public void close() {
159158
} catch (final Exception e) {
160159
exceptions.add(e);
161160
}
162-
if (!Objects.isNull(statement)) {
163-
try {
164-
super.close();
165-
} catch (final Exception e) {
166-
exceptions.add(e);
167-
}
161+
try {
162+
super.close();
163+
} catch (final Exception e) {
164+
exceptions.add(e);
168165
}
169166
exceptions.parallelStream().forEach(e -> LOGGER.error(e.getMessage(), e));
170167
exceptions.stream()

flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,42 @@
1616
*/
1717
package org.apache.arrow.driver.jdbc;
1818

19+
import static java.lang.String.format;
20+
import static java.util.stream.IntStream.range;
1921
import static org.junit.jupiter.api.Assertions.assertEquals;
2022
import static org.junit.jupiter.api.Assertions.assertFalse;
2123
import static org.junit.jupiter.api.Assertions.assertNotNull;
2224
import static org.junit.jupiter.api.Assertions.assertThrows;
2325
import static org.junit.jupiter.api.Assertions.assertTrue;
2426
import static org.junit.jupiter.api.Assertions.fail;
2527

28+
import com.google.protobuf.Message;
2629
import java.net.URISyntaxException;
2730
import java.sql.Connection;
2831
import java.sql.Driver;
2932
import java.sql.DriverManager;
33+
import java.sql.ResultSet;
3034
import java.sql.SQLException;
3135
import java.sql.Statement;
3236
import java.util.Map;
3337
import java.util.Properties;
38+
import java.util.function.Consumer;
3439
import org.apache.arrow.driver.jdbc.authentication.UserPasswordAuthentication;
3540
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
3641
import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
3742
import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
3843
import org.apache.arrow.flight.FlightMethod;
44+
import org.apache.arrow.flight.FlightProducer.ServerStreamListener;
3945
import org.apache.arrow.flight.NoOpSessionOptionValueVisitor;
4046
import org.apache.arrow.flight.SessionOptionValue;
47+
import org.apache.arrow.flight.sql.FlightSqlProducer.Schemas;
48+
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes;
4149
import org.apache.arrow.memory.BufferAllocator;
4250
import org.apache.arrow.memory.RootAllocator;
4351
import org.apache.arrow.util.AutoCloseables;
52+
import org.apache.arrow.vector.VarCharVector;
53+
import org.apache.arrow.vector.VectorSchemaRoot;
54+
import org.apache.arrow.vector.util.Text;
4455
import org.junit.jupiter.api.AfterEach;
4556
import org.junit.jupiter.api.BeforeEach;
4657
import org.junit.jupiter.api.Test;
@@ -698,4 +709,63 @@ public void testStatementsClosedOnConnectionClose() throws Exception {
698709
assertTrue(statements[i].isClosed());
699710
}
700711
}
712+
713+
@Test
714+
public void testResultSetsFromDatabaseMetadataClosedOnConnectionClose() throws Exception {
715+
// set up the FlightProducer to respond to metadata queries
716+
// getTableTypes() is being used, but any other method would work
717+
int rowCount = 3;
718+
final Message commandGetTableTypes = CommandGetTableTypes.getDefaultInstance();
719+
final Consumer<ServerStreamListener> commandGetTableTypesResultProducer =
720+
listener -> {
721+
try (final BufferAllocator allocator = new RootAllocator();
722+
final VectorSchemaRoot root =
723+
VectorSchemaRoot.create(Schemas.GET_TABLE_TYPES_SCHEMA, allocator)) {
724+
final VarCharVector tableType = (VarCharVector) root.getVector("table_type");
725+
range(0, rowCount)
726+
.forEach(i -> tableType.setSafe(i, new Text(format("table_type #%d", i))));
727+
root.setRowCount(rowCount);
728+
listener.start(root);
729+
listener.putNext();
730+
} catch (final Throwable throwable) {
731+
listener.error(throwable);
732+
} finally {
733+
listener.completed();
734+
}
735+
};
736+
PRODUCER.addCatalogQuery(commandGetTableTypes, commandGetTableTypesResultProducer);
737+
738+
// create a connection
739+
final Properties properties = new Properties();
740+
properties.put(ArrowFlightConnectionProperty.HOST.camelName(), "localhost");
741+
properties.put(
742+
ArrowFlightConnectionProperty.PORT.camelName(), FLIGHT_SERVER_TEST_EXTENSION.getPort());
743+
properties.put(ArrowFlightConnectionProperty.USER.camelName(), userTest);
744+
properties.put(ArrowFlightConnectionProperty.PASSWORD.camelName(), passTest);
745+
properties.put("useEncryption", false);
746+
747+
Connection connection =
748+
DriverManager.getConnection(
749+
"jdbc:arrow-flight-sql://"
750+
+ FLIGHT_SERVER_TEST_EXTENSION.getHost()
751+
+ ":"
752+
+ FLIGHT_SERVER_TEST_EXTENSION.getPort(),
753+
properties);
754+
755+
// create ResultSets from DatabaseMetadata
756+
int numResultSets = 3;
757+
ResultSet[] resultSets = new ResultSet[numResultSets];
758+
for (int i = 0; i < numResultSets; i++) {
759+
resultSets[i] = connection.getMetaData().getTableTypes();
760+
assertFalse(resultSets[i].isClosed());
761+
}
762+
763+
// close the connection
764+
connection.close();
765+
766+
// assert the ResultSets are closed
767+
for (int i = 0; i < numResultSets; i++) {
768+
assertTrue(resultSets[i].isClosed());
769+
}
770+
}
701771
}

0 commit comments

Comments
 (0)