Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.arrow.driver.jdbc;

import static org.apache.arrow.driver.jdbc.utils.FlightStreamQueue.createNewQueue;
import static org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue.createNewQueue;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -26,7 +26,8 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import org.apache.arrow.driver.jdbc.utils.FlightStreamQueue;
import org.apache.arrow.driver.jdbc.client.CloseableEndpointStreamPair;
import org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue;
import org.apache.arrow.driver.jdbc.utils.VectorSchemaRootTransformer;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
Expand All @@ -47,8 +48,8 @@ public final class ArrowFlightJdbcFlightStreamResultSet
extends ArrowFlightJdbcVectorSchemaRootResultSet {

private final ArrowFlightConnection connection;
private FlightStream currentFlightStream;
private FlightStreamQueue flightStreamQueue;
private CloseableEndpointStreamPair currentEndpointData;
private FlightEndpointDataQueue flightEndpointDataQueue;

private VectorSchemaRootTransformer transformer;
private VectorSchemaRoot currentVectorSchemaRoot;
Expand Down Expand Up @@ -102,20 +103,20 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo(

resultSet.transformer = transformer;

resultSet.execute(flightInfo);
resultSet.populateData(flightInfo);
return resultSet;
}

private void loadNewQueue() {
Optional.ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
flightStreamQueue = createNewQueue(connection.getExecutorService());
Optional.ofNullable(flightEndpointDataQueue).ifPresent(AutoCloseables::closeNoChecked);
flightEndpointDataQueue = createNewQueue(connection.getExecutorService());
}

private void loadNewFlightStream() throws SQLException {
if (currentFlightStream != null) {
AutoCloseables.closeNoChecked(currentFlightStream);
if (currentEndpointData != null) {
AutoCloseables.closeNoChecked(currentEndpointData);
}
this.currentFlightStream = getNextFlightStream(true);
this.currentEndpointData = getNextEndpointStream(true);
}

@Override
Expand All @@ -124,24 +125,24 @@ protected AvaticaResultSet execute() throws SQLException {

if (flightInfo != null) {
schema = flightInfo.getSchemaOptional().orElse(null);
execute(flightInfo);
populateData(flightInfo);
}
return this;
}

private void execute(final FlightInfo flightInfo) throws SQLException {
private void populateData(final FlightInfo flightInfo) throws SQLException {
loadNewQueue();
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
flightEndpointDataQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
loadNewFlightStream();

// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
populateDataForCurrentFlightStream();
}
}

private void executeForCurrentFlightStream() throws SQLException {
final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
private void populateDataForCurrentFlightStream() throws SQLException {
final VectorSchemaRoot originalRoot = currentEndpointData.getStream().getRoot();

if (transformer != null) {
try {
Expand All @@ -154,9 +155,9 @@ private void executeForCurrentFlightStream() throws SQLException {
}

if (schema != null) {
execute(currentVectorSchemaRoot, schema);
populateData(currentVectorSchemaRoot, schema);
} else {
execute(currentVectorSchemaRoot);
populateData(currentVectorSchemaRoot);
}
}

Expand All @@ -179,20 +180,20 @@ public boolean next() throws SQLException {
return true;
}

if (currentFlightStream != null) {
currentFlightStream.getRoot().clear();
if (currentFlightStream.next()) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
currentEndpointData.getStream().getRoot().clear();
if (currentEndpointData.getStream().next()) {
populateDataForCurrentFlightStream();
continue;
}

flightStreamQueue.enqueue(currentFlightStream);
flightEndpointDataQueue.enqueue(currentEndpointData);
}

currentFlightStream = getNextFlightStream(false);
currentEndpointData = getNextEndpointStream(false);

if (currentFlightStream != null) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
populateDataForCurrentFlightStream();
continue;
}

Expand All @@ -207,14 +208,14 @@ public boolean next() throws SQLException {
@Override
protected void cancel() {
super.cancel();
final FlightStream currentFlightStream = this.currentFlightStream;
if (currentFlightStream != null) {
currentFlightStream.cancel("Cancel", null);
final CloseableEndpointStreamPair currentEndpoint = this.currentEndpointData;
if (currentEndpoint != null) {
currentEndpoint.getStream().cancel("Cancel", null);
}

if (flightStreamQueue != null) {
if (flightEndpointDataQueue != null) {
try {
flightStreamQueue.close();
flightEndpointDataQueue.close();
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -224,27 +225,28 @@ protected void cancel() {
@Override
public synchronized void close() {
try {
if (flightStreamQueue != null) {
if (flightEndpointDataQueue != null) {
// flightStreamQueue should close currentFlightStream internally
flightStreamQueue.close();
} else if (currentFlightStream != null) {
flightEndpointDataQueue.close();
} else if (currentEndpointData != null) {
// close is only called for currentFlightStream if there's no queue
currentFlightStream.close();
currentEndpointData.close();
}

} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
super.close();
}
}

private FlightStream getNextFlightStream(final boolean isExecution) throws SQLException {
if (isExecution) {
private CloseableEndpointStreamPair getNextEndpointStream(final boolean canTimeout) throws SQLException {
if (canTimeout) {
final int statementTimeout = statement != null ? statement.getQueryTimeout() : 0;
return statementTimeout != 0 ?
flightStreamQueue.next(statementTimeout, TimeUnit.SECONDS) : flightStreamQueue.next();
flightEndpointDataQueue.next(statementTimeout, TimeUnit.SECONDS) : flightEndpointDataQueue.next();
} else {
return flightStreamQueue.next();
return flightEndpointDataQueue.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
new ArrowFlightJdbcVectorSchemaRootResultSet(null, state, signature, resultSetMetaData,
timeZone, null);

resultSet.execute(vectorSchemaRoot);
resultSet.populateData(vectorSchemaRoot);
return resultSet;
}

Expand All @@ -92,7 +92,7 @@ protected AvaticaResultSet execute() throws SQLException {
throw new RuntimeException("Can only execute with execute(VectorSchemaRoot)");
}

void execute(final VectorSchemaRoot vectorSchemaRoot) {
void populateData(final VectorSchemaRoot vectorSchemaRoot) {
final List<Field> fields = vectorSchemaRoot.getSchema().getFields();
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(fields);
signature.columns.clear();
Expand All @@ -102,7 +102,7 @@ void execute(final VectorSchemaRoot vectorSchemaRoot) {
execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), this.signature.columns);
}

void execute(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
void populateData(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(schema.getFields());
signature.columns.clear();
signature.columns.addAll(columns);
Expand Down
Loading