Skip to content

Commit 25ed16f

Browse files
Describe is sent only once
1 parent e318d07 commit 25ed16f

File tree

9 files changed

+53
-54
lines changed

9 files changed

+53
-54
lines changed

src/main/java/com/github/pgasync/PgConnection.java

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.logging.Level;
3131
import java.util.logging.Logger;
3232

33+
import com.github.pgasync.message.backend.DataRow;
3334
import com.pgasync.Connection;
3435
import com.pgasync.Listening;
3536
import com.pgasync.PreparedStatement;
@@ -38,11 +39,9 @@
3839
import com.pgasync.Transaction;
3940
import com.github.pgasync.conversion.DataConverter;
4041
import com.github.pgasync.message.backend.Authentication;
41-
import com.github.pgasync.message.backend.RowDescription;
4242
import com.github.pgasync.message.frontend.Bind;
4343
import com.github.pgasync.message.frontend.Close;
4444
import com.github.pgasync.message.frontend.Describe;
45-
import com.github.pgasync.message.frontend.Execute;
4645
import com.github.pgasync.message.Message;
4746
import com.github.pgasync.message.frontend.Parse;
4847
import com.github.pgasync.message.frontend.PasswordMessage;
@@ -63,37 +62,34 @@ public class PgConnection implements Connection {
6362
public class PgPreparedStatement implements PreparedStatement {
6463

6564
private final String sname;
66-
private final String pname;
65+
private Columns columns;
6766

68-
PgPreparedStatement(String sname, String pname) {
67+
PgPreparedStatement(String sname) {
6968
this.sname = sname;
70-
this.pname = pname;
7169
}
7270

7371
@Override
7472
public CompletableFuture<ResultSet> query(Object... params) {
75-
AtomicReference<Map<String, PgColumn>> columnsByNameRef = new AtomicReference<>();
76-
AtomicReference<PgColumn[]> orderedColumnsRef = new AtomicReference<>();
7773
List<Row> rows = new ArrayList<>();
7874
return fetch((columnsByName, orderedColumns) -> {
79-
columnsByNameRef.set(columnsByName);
80-
orderedColumnsRef.set(orderedColumns);
8175
}, rows::add, params)
82-
.thenApply(v -> new PgResultSet(columnsByNameRef.get(), List.of(orderedColumnsRef.get()), rows, 0));
76+
.thenApply(v -> new PgResultSet(columns.byName, List.of(columns.ordered), rows, 0));
8377
}
8478

8579
@Override
8680
public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColumn[]> onColumns, Consumer<Row> processor, Object... params) {
87-
return stream
88-
.send(new Bind(sname, pname, dataConverter.fromParameters(params)))
89-
.thenApply(bindComplete -> stream.send(Describe.portal(pname)))
90-
.thenCompose(Function.identity())
91-
.thenApply(rowDescription -> {
92-
Columns columns = calcColumns(((RowDescription) rowDescription).getColumns());
93-
onColumns.accept(columns.byName, columns.ordered);
94-
return stream.send(new Execute(pname), dataRow -> processor.accept(new PgRow(dataRow, columns.byName, columns.ordered, dataConverter)));
95-
})
96-
.thenCompose(Function.identity());
81+
Bind bind = new Bind(sname, dataConverter.fromParameters(params));
82+
Consumer<DataRow> rowProcessor = dataRow -> processor.accept(new PgRow(dataRow, columns.byName, columns.ordered, dataConverter));
83+
if (columns != null) {
84+
return stream
85+
.send(bind, rowProcessor);
86+
} else {
87+
return stream
88+
.send(bind, Describe.portal(), columnDescriptions -> {
89+
columns = calcColumns(columnDescriptions);
90+
onColumns.accept(columns.byName, columns.ordered);
91+
}, rowProcessor);
92+
}
9793
}
9894

9995
@Override
@@ -132,7 +128,6 @@ private String next() {
132128
}
133129

134130
private static final NameSequence preparedStatementNames = new NameSequence("s-");
135-
private static final NameSequence portalNames = new NameSequence("p-");
136131

137132
private final PgProtocolStream stream;
138133
private final DataConverter dataConverter;
@@ -176,7 +171,7 @@ CompletableFuture<PgPreparedStatement> preparedStatementOf(String sql, Oid... pa
176171
String statementName = preparedStatementNames.next();
177172
return stream
178173
.send(new Parse(sql, statementName, parametersTypes))
179-
.thenApply(parseComplete -> new PgPreparedStatement(statementName, portalNames.next()));
174+
.thenApply(parseComplete -> new PgPreparedStatement(statementName));
180175
}
181176

182177
@Override

src/main/java/com/github/pgasync/PgProtocolStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.github.pgasync.message.backend.CommandComplete;
1919
import com.github.pgasync.message.backend.DataRow;
2020
import com.github.pgasync.message.backend.RowDescription;
21+
import com.github.pgasync.message.frontend.Bind;
22+
import com.github.pgasync.message.frontend.Describe;
2123
import com.github.pgasync.message.frontend.Execute;
2224
import com.github.pgasync.message.frontend.PasswordMessage;
2325
import com.github.pgasync.message.frontend.Query;
@@ -41,7 +43,9 @@ public interface PgProtocolStream {
4143

4244
CompletableFuture<Void> send(Query query, Consumer<RowDescription.ColumnDescription[]> onColumns, Consumer<DataRow> onRow, Consumer<CommandComplete> onAffected);
4345

44-
CompletableFuture<Integer> send(Execute execute, Consumer<DataRow> onRow);
46+
CompletableFuture<Integer> send(Bind bind, Describe describe, Consumer<RowDescription.ColumnDescription[]> onColumns, Consumer<DataRow> onRow);
47+
48+
CompletableFuture<Integer> send(Bind bind, Consumer<DataRow> onRow);
4549

4650
Runnable subscribe(String channel, Consumer<String> onNotification);
4751

src/main/java/com/github/pgasync/io/IO.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public static String getCString(ByteBuffer buffer, Charset charset) {
3434
}
3535

3636
public static void putCString(ByteBuffer buffer, String value, Charset charset) {
37-
buffer.put(value.getBytes(charset));
37+
if (!value.isEmpty()) {
38+
buffer.put(value.getBytes(charset));
39+
}
3840
buffer.put((byte) 0);
3941
}
4042

src/main/java/com/github/pgasync/io/frontend/BindEncoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ protected byte getMessageId() {
7979

8080
@Override
8181
public void writeBody(Bind msg, ByteBuffer buffer, Charset encoding) {
82-
IO.putCString(buffer, msg.getPname(), encoding); // portal
82+
IO.putCString(buffer, "", encoding); // portal
8383
IO.putCString(buffer, msg.getSname(), encoding); // prepared statement
8484
buffer.putShort((short) 0); // number of format codes
8585
buffer.putShort((short) msg.getParams().length); // number of parameters

src/main/java/com/github/pgasync/io/frontend/ExecuteEncoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected byte getMessageId() {
5454

5555
@Override
5656
public void writeBody(Execute msg, ByteBuffer buffer, Charset encoding) {
57-
IO.putCString(buffer, msg.getPname(), encoding); // portal
57+
IO.putCString(buffer, "", encoding); // unnamed portal
5858
buffer.putInt(0); // unlimited maximum rows
5959
}
6060
}

src/main/java/com/github/pgasync/message/frontend/Bind.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,17 @@
2222
public class Bind implements ExtendedQueryMessage {
2323

2424
private final String sname;
25-
private final String pname;
2625
private final byte[][] params;
2726

28-
public Bind(String sname, String pname, byte[][] params) {
27+
public Bind(String sname, byte[][] params) {
2928
this.sname = sname;
30-
this.pname = pname;
3129
this.params = params;
3230
}
3331

3432
public String getSname() {
3533
return sname;
3634
}
3735

38-
public String getPname() {
39-
return pname;
40-
}
41-
4236
public byte[][] getParams() {
4337
return params;
4438
}

src/main/java/com/github/pgasync/message/frontend/Describe.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public String getName() {
5252
return name;
5353
}
5454

55-
public static Describe portal(String name) {
56-
return new Describe(name, Kind.PORTAL);
55+
public static Describe portal() {
56+
return new Describe("", Kind.PORTAL);
5757
}
5858

5959
public static Describe statement(String name) {

src/main/java/com/github/pgasync/message/frontend/Execute.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,7 @@
2121
*/
2222
public class Execute implements ExtendedQueryMessage {
2323

24-
private final String pname;
25-
26-
public Execute(String pname) {
27-
this.pname = pname;
24+
public Execute() {
2825
}
2926

30-
public String getPname() {
31-
return pname;
32-
}
3327
}

src/main/java/com/github/pgasync/netty/NettyPgProtocolStream.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.github.pgasync.netty;
1616

17+
import com.github.pgasync.message.frontend.Bind;
18+
import com.github.pgasync.message.frontend.Describe;
1719
import com.pgasync.SqlException;
1820
import com.github.pgasync.PgProtocolStream;
1921
import com.github.pgasync.message.ExtendedQueryMessage;
@@ -147,11 +149,25 @@ public CompletableFuture<Void> send(Query query, Consumer<RowDescription.ColumnD
147149
}
148150

149151
@Override
150-
public CompletableFuture<Integer> send(Execute execute, Consumer<DataRow> onRow) {
152+
public CompletableFuture<Integer> send(Bind bind, Describe describe, Consumer<RowDescription.ColumnDescription[]> onColumns, Consumer<DataRow> onRow) {
153+
this.onColumns = onColumns;
154+
this.onRow = onRow;
155+
this.onAffected = null;
156+
return offerRoundTrip(() -> {
157+
lastSentMessage = new Execute();
158+
write(bind, describe, lastSentMessage);
159+
}).thenApply(commandComplete -> ((CommandComplete) commandComplete).getAffectedRows());
160+
}
161+
162+
@Override
163+
public CompletableFuture<Integer> send(Bind bind, Consumer<DataRow> onRow) {
151164
this.onColumns = null;
152165
this.onRow = onRow;
153166
this.onAffected = null;
154-
return send(execute).thenApply(commandComplete -> ((CommandComplete) commandComplete).getAffectedRows());
167+
return offerRoundTrip(() -> {
168+
lastSentMessage = new Execute();
169+
write(bind, lastSentMessage);
170+
}).thenApply(commandComplete -> ((CommandComplete) commandComplete).getAffectedRows());
155171
}
156172

157173
@Override
@@ -212,18 +228,12 @@ private void respondWithMessage(Message message) {
212228
publish((NotificationResponse) message);
213229
} else if (message instanceof NoticeResponse) {
214230
Logger.getLogger(NettyPgProtocolStream.class.getName()).log(Level.WARNING, message.toString());
231+
} else if (message == BIndicators.BIND_COMPLETE) {
232+
// op op since bulk message sequence
215233
} else if (message instanceof RowDescription) {
216-
if (isSimpleQueryInProgress()) {
217-
onColumns.accept(((RowDescription) message).getColumns());
218-
} else {
219-
consumeOnResponse().completeAsync(() -> message, futuresExecutor);
220-
}
234+
onColumns.accept(((RowDescription) message).getColumns());
221235
} else if (message == BIndicators.NO_DATA) {
222-
if (isSimpleQueryInProgress()) {
223-
onColumns.accept(new RowDescription.ColumnDescription[]{});
224-
} else {
225-
consumeOnResponse().completeAsync(() -> new RowDescription(new RowDescription.ColumnDescription[]{}), futuresExecutor);
226-
}
236+
onColumns.accept(new RowDescription.ColumnDescription[]{});
227237
} else if (message instanceof DataRow) {
228238
onRow.accept((DataRow) message);
229239
} else if (message instanceof ErrorResponse) {

0 commit comments

Comments
 (0)