From 9e611c007f7f91913dc2458f3275801cba1b1d48 Mon Sep 17 00:00:00 2001 From: Steve Lord Date: Tue, 12 Mar 2024 14:15:04 -0700 Subject: [PATCH] GH-37720: [Java][FlightSQL] Implement stateless prepared statements --- .../arrow/flight/sql/FlightSqlClient.java | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java index 6fe31fae9216b..9a46cbf6f7c77 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java @@ -78,6 +78,7 @@ import org.apache.arrow.flight.SetSessionOptionsResult; import org.apache.arrow.flight.SyncPutListener; import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.impl.FlightSql; import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult; import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; import org.apache.arrow.flight.sql.util.TableRef; @@ -1048,15 +1049,31 @@ private Schema deserializeSchema(final ByteString bytes) { public FlightInfo execute(final CallOption... options) { checkOpen(); - final FlightDescriptor descriptor = FlightDescriptor + FlightDescriptor descriptor = FlightDescriptor .command(Any.pack(CommandPreparedStatementQuery.newBuilder() .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) .build()) .toByteArray()); if (parameterBindingRoot != null && parameterBindingRoot.getRowCount() > 0) { - try (final SyncPutListener listener = putParameters(descriptor, options)) { - listener.getResult(); + try (final SyncPutListener putListener = putParameters(descriptor, options)) { + final PutResult read = putListener.read(); + if (read != null) { + try (final ArrowBuf metadata = read.getApplicationMetadata()) { + final FlightSql.DoPutPreparedStatementResult doPutPreparedStatementResult = + FlightSql.DoPutPreparedStatementResult.parseFrom(metadata.nioBuffer()); + descriptor = FlightDescriptor + .command(Any.pack(CommandPreparedStatementQuery.newBuilder() + .setPreparedStatementHandle( + doPutPreparedStatementResult.getPreparedStatementHandle()) + .build()) + .toByteArray()); + } + } + } catch (final InterruptedException | ExecutionException e) { + throw CallStatus.CANCELLED.withCause(e).toRuntimeException(); + } catch (final InvalidProtocolBufferException e) { + throw CallStatus.INVALID_ARGUMENT.withCause(e).toRuntimeException(); } }