Skip to content

Commit 1b4b896

Browse files
Completable futures added everywhere
1 parent f2c1817 commit 1b4b896

File tree

81 files changed

+1756
-844
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+1756
-844
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Prepared statements use native PostgreSQL syntax `$index`. Supported parameter t
6666

6767
```java
6868
db.querySet("insert into message(id, body) values($1, $2)", 123, "hello")
69-
.subscribe(result -> out.printf("Inserted %d rows", result.updatedRows() ));
69+
.subscribe(result -> out.printf("Inserted %d rows", result.affectedRows() ));
7070
```
7171

7272
### Transactions

src/main/java/com/github/pgasync/Connection.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414

1515
package com.github.pgasync;
1616

17+
import com.github.pgasync.impl.Oid;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.Consumer;
21+
1722
/**
18-
* A single physical connection to PostgreSQL backend.
23+
* A single physical connection to Postgres backend.
1924
*
2025
* @author Antti Laisi
2126
*/
2227
public interface Connection extends Db {
2328

24-
/**
25-
* Closes the connection. Doesn't block current thread.
26-
*/
27-
void close();
29+
CompletableFuture<PreparedStatement> prepareStatement(String sql, Oid... parametersTypes);
2830

29-
}
31+
CompletableFuture<Listening> subscribe(String channel, Consumer<String> onNotification);
32+
}

src/main/java/com/github/pgasync/ConnectionPool.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,15 @@
2020
* Pool of backend {@link Connection}s. Pools implement {@link Db} so
2121
* queries can be issued directly to the pool if using the same connection
2222
* is not required.
23-
*
23+
*
2424
* @author Antti Laisi
2525
*/
2626
public interface ConnectionPool extends Db {
2727

2828
/**
29-
* Executes a {@link java.util.function.Consumer} callback when a connection is
30-
* available. Connection passed to callback must be freed with
31-
* {@link com.github.pgasync.ConnectionPool#release(Connection)}
29+
* Gets a connection when available.
30+
* {@link Connection#close()} method will return the connection into this pool instead of closing it.
3231
*/
3332
CompletableFuture<Connection> getConnection();
3433

35-
/**
36-
* Releases a connection back to the pool.
37-
*
38-
* @param connection Connection fetched with getConnection
39-
*/
40-
void release(Connection connection);
41-
4234
}

src/main/java/com/github/pgasync/ConnectionPoolBuilder.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public ConnectionPoolBuilder database(String database) {
6363
}
6464

6565
public ConnectionPoolBuilder poolSize(int poolSize) {
66-
properties.poolSize = poolSize;
66+
properties.maxConnections = poolSize;
6767
return this;
6868
}
6969

@@ -82,11 +82,6 @@ public ConnectionPoolBuilder ssl(boolean ssl) {
8282
return this;
8383
}
8484

85-
public ConnectionPoolBuilder pipeline(boolean pipeline) {
86-
properties.usePipelining = pipeline;
87-
return this;
88-
}
89-
9085
public ConnectionPoolBuilder validationQuery(String validationQuery) {
9186
properties.validationQuery = validationQuery;
9287
return this;
@@ -102,11 +97,11 @@ public static class PoolProperties {
10297
String username;
10398
String password;
10499
String database;
105-
int poolSize = 20;
100+
int maxConnections = 20;
101+
int maxStatements = 20;
106102
DataConverter dataConverter = null;
107103
List<Converter<?>> converters = new ArrayList<>();
108104
boolean useSsl;
109-
boolean usePipelining;
110105
String validationQuery;
111106

112107
public String getHostname() {
@@ -129,16 +124,16 @@ public String getDatabase() {
129124
return database;
130125
}
131126

132-
public int getPoolSize() {
133-
return poolSize;
127+
public int getMaxConnections() {
128+
return maxConnections;
134129
}
135130

136-
public boolean getUseSsl() {
137-
return useSsl;
131+
public int getMaxStatements() {
132+
return maxStatements;
138133
}
139134

140-
public boolean getUsePipelining() {
141-
return usePipelining;
135+
public boolean getUseSsl() {
136+
return useSsl;
142137
}
143138

144139
public DataConverter getDataConverter() {
Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
package com.github.pgasync;
22

3+
import java.util.concurrent.CompletableFuture;
4+
35
/**
4-
* Main interface to PostgreSQL backend.
6+
* Main interface to Postgres backend.
57
*
68
* @author Antti Laisi
79
*/
8-
public interface Db extends QueryExecutor, AutoCloseable {
9-
10-
/**
11-
* Closes the pool, blocks the calling thread until connections are closed.
12-
*/
13-
@Override
14-
void close() throws Exception;
10+
public interface Db extends QueryExecutor {
1511

12+
CompletableFuture<Void> close();
1613
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.github.pgasync;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
5+
@FunctionalInterface
6+
public interface Listening {
7+
8+
CompletableFuture<Void> unlisten();
9+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.github.pgasync;
2+
3+
import com.github.pgasync.impl.PgColumn;
4+
5+
import java.util.Map;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.function.Consumer;
8+
9+
/**
10+
* Prepared statement in terms of Postgres.
11+
* It lives during database session. It should be reused multiple times and it should be closed after using.
12+
* Doesn't support function call feature because of its deprecation See <a href="https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.6"/>.
13+
*/
14+
public interface PreparedStatement {
15+
16+
/**
17+
* Fetches the whole row set and returns a {@link CompletableFuture} with an instance of {@link ResultSet}.
18+
* This future may be completed with an error. Use this method if you are sure, that all data, returned by the query can be placed into memory.
19+
*
20+
* @param params Array of query parameters values.
21+
* @return An instance of {@link ResultSet} with data.
22+
*/
23+
CompletableFuture<ResultSet> query(Object... params);
24+
25+
/**
26+
* Fetches data rows from Postgres one by one. Use this method when you are unsure, that all data, returned by the query can be placed into memory.
27+
*
28+
* @param onColumns {@link Consumer} of parameters by name map. Gets called when bind/describe chain succeeded.
29+
* @param processor {@link Consumer} of single data row. Performs some processing of data.
30+
* @param params Array of query parameters values.
31+
* @return CompletableFuture that completes when the whole process ends or when an error occurs. Future's value will indicate the number of affected rows by the query.
32+
*/
33+
CompletableFuture<Integer> fetch(Consumer<Map<String, PgColumn>> onColumns, Consumer<Row> processor, Object... params);
34+
35+
CompletableFuture<Void> close();
36+
37+
}
Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
package com.github.pgasync;
22

3+
import com.github.pgasync.impl.PgColumn;
4+
import com.github.pgasync.impl.PgResultSet;
5+
6+
import java.util.ArrayList;
7+
import java.util.Collection;
38
import java.util.List;
9+
import java.util.Map;
410
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.atomic.AtomicReference;
512
import java.util.function.Consumer;
613

714
/**
@@ -17,69 +24,79 @@ public interface QueryExecutor {
1724
CompletableFuture<Transaction> begin();
1825

1926
/**
20-
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
21-
* to mark parameters. Supported parameter types are String, Character, Number, Time, Date, Timestamp
22-
* and byte[].
27+
* Sends parameter less query script. The script may be multi query. Queries are separated with semicolons.
28+
* Accumulates fetched columns, rows and affected rows counts into memory and transforms them into a ResultSet when each {@link ResultSet} is fetched.
29+
* Completes returned {@link CompletableFuture} when the whole process of multiple {@link ResultSet}s fetching ends.
2330
*
24-
* @param sql SQL to execute
25-
* @param params Parameter values
26-
* @return Cold observable that emits 0-n rows.
31+
* @param sql Sql Script text.
32+
* @return CompletableFuture that is completed with a collection of fetched {@link ResultSet}s.
2733
*/
28-
CompletableFuture<Row> queryRows(String sql, Object... params);
34+
default CompletableFuture<Collection<ResultSet>> completeScript(String sql) {
35+
List<ResultSet> results = new ArrayList<>();
36+
AtomicReference<Map<String, PgColumn>> columnsRef = new AtomicReference<>();
37+
AtomicReference<List<Row>> rowsRef = new AtomicReference<>();
38+
return script(
39+
columns -> {
40+
columnsRef.set(columns);
41+
rowsRef.set(new ArrayList<>());
42+
},
43+
rowsRef.get()::add,
44+
affected -> results.add(new PgResultSet(columnsRef.get(), rowsRef.get(), affected)),
45+
sql
46+
)
47+
.thenApply(v -> results);
2948

30-
/**
31-
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
32-
* to mark parameters. Supported parameter types are String, Character, Number, Time, Date, Timestamp
33-
* and byte[].
34-
*
35-
* @param sql SQL to execute
36-
* @param params Parameter values
37-
* @return Cold observable that emits a single result set.
38-
*/
39-
CompletableFuture<ResultSet> querySet(String sql, Object... params);
49+
}
4050

4151
/**
42-
* Begins a transaction.
52+
* Sends parameter less query script. The script may be multi query. Queries are separated with semicolons.
53+
* Unlike {@link #completeScript(String)} doesn't accumulate fetched columns, rows and affected rows counts into memory.
54+
* Instead it calls passed in consumers, when columns, or particular row or an affected rows count is fetched from Postgres.
55+
* Completes returned {@link CompletableFuture} when the whole process of multiple {@link ResultSet}s fetching ends.
4356
*
44-
* @param onTransaction Called when transaction is successfully started.
45-
* @param onError Called on exception thrown
57+
* @param onColumns Columns fetched callback consumer.
58+
* @param onRow A row fetched callback consumer.
59+
* @param onAffected An affected rows callback consumer. It is called when a particular {@link ResultSet} is completely fetched with its affected rows count. This callback should be used to create a {@link ResultSet} instance from already fetched columns, rows and affected rows count.
60+
* @param sql Sql Script text.
61+
* @return CompletableFuture that is completed when the whole process of multiple {@link ResultSet}s fetching ends.
4662
*/
47-
default void begin(Consumer<Transaction> onTransaction, Consumer<Throwable> onError) {
48-
begin()
49-
.thenAccept(onTransaction)
50-
.exceptionally(th -> {
51-
onError.accept(th);
52-
return null;
53-
});
54-
}
63+
CompletableFuture<Void> script(Consumer<Map<String, PgColumn>> onColumns, Consumer<Row> onRow, Consumer<Integer> onAffected, String sql);
5564

5665
/**
57-
* Executes a simple query.
66+
* Sends single query with parameters. Uses extended query protocol of Postgres.
67+
* Accumulates fetched columns, rows and affected rows count into memory and transforms them into ResultSet when it is fetched.
68+
* Completes returned {@link CompletableFuture} when the whole process of {@link ResultSet} fetching ends.
5869
*
59-
* @param sql SQL to execute.
60-
* @param onResult Called when query is completed
61-
* @param onError Called on exception thrown
70+
* @param sql Sql query text with parameters substituted with ?.
71+
* @param params Parameters of the query.
72+
* @return CompletableFuture of {@link ResultSet}.
6273
*/
63-
default void query(String sql, Consumer<ResultSet> onResult, Consumer<Throwable> onError) {
64-
query(sql, null, onResult, onError);
74+
default CompletableFuture<ResultSet> completeQuery(String sql, Object... params) {
75+
AtomicReference<Map<String, PgColumn>> columnsRef = new AtomicReference<>();
76+
List<Row> rows = new ArrayList<>();
77+
return query(
78+
columnsRef::set,
79+
rows::add,
80+
sql,
81+
params
82+
)
83+
.thenApply(affected -> new PgResultSet(columnsRef.get(), rows, affected));
84+
6585
}
6686

6787
/**
68-
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
69-
* to mark parameters. Supported parameter types are String, Character, Number, Time, Date, Timestamp
70-
* and byte[].
88+
* Sends single query with parameters. Uses extended query protocol of Postgres.
89+
* Unlike {@link #completeQuery(String, Object...)} doesn't accumulate columns, rows and affected rows counts into memory.
90+
* Instead it calls passed in consumers, when columns, or particular row is fetched from Postgres.
91+
* Completes returned {@link CompletableFuture} when the process of single {@link ResultSet}s fetching ends.
7192
*
72-
* @param sql SQL to execute
73-
* @param params List of parameters
74-
* @param onResult Called when query is completed
75-
* @param onError Called on exception thrown
93+
* @param onColumns Columns fetched callback consumer.
94+
* @param onRow A row fetched callback consumer.
95+
* @param sql Sql query text with parameters substituted with ?.
96+
* @param params Parameters of the query
97+
* @return CompletableFuture of affected rows count.
98+
* This future is used by implementation to create a {@link ResultSet} instance from already fetched columns, rows and affected rows count.
99+
* Affected rows count is this future's completion value.
76100
*/
77-
default void query(String sql, List/*<Object>*/ params, Consumer<ResultSet> onResult, Consumer<Throwable> onError) {
78-
querySet(sql, params != null ? params.toArray() : null)
79-
.thenAccept(onResult)
80-
.exceptionally(th -> {
81-
onError.accept(th);
82-
return null;
83-
});
84-
}
101+
CompletableFuture<Integer> query(Consumer<Map<String, PgColumn>> onColumns, Consumer<Row> onRow, String sql, Object... params);
85102
}

src/main/java/com/github/pgasync/ResultSet.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import java.util.Iterator;
1919

2020
/**
21-
* SQL result set. Consists of 0-n result rows and amount of updated
21+
* SQL result set. Consists of 0-n result rows and amount of affected
2222
* (INSERT/UPDATE/DELETE) rows.
23-
*
23+
*
2424
* @author Antti Laisi
2525
*/
2626
public interface ResultSet extends Iterable<Row> {
@@ -30,16 +30,11 @@ public interface ResultSet extends Iterable<Row> {
3030
*/
3131
Collection<String> getColumns();
3232

33-
/**
34-
* @return Row iterator
35-
*/
36-
Iterator<Row> iterator();
37-
3833
/**
3934
* @param index Row index starting from 0
4035
* @return Row, never null
4136
*/
42-
Row row(int index);
37+
Row at(int index);
4338

4439
/**
4540
* @return Amount of result rows.
@@ -49,6 +44,6 @@ public interface ResultSet extends Iterable<Row> {
4944
/**
5045
* @return Amount of modified rows.
5146
*/
52-
int updatedRows();
47+
int affectedRows();
5348

5449
}

src/main/java/com/github/pgasync/Row.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.sql.Timestamp;
2222

2323
/**
24-
* Row in a queryRows result set. A row consist of 0-n columns of a single type.
24+
* Row in a query result set. A row consist of 0-n columns of a single type.
2525
* Column values can be accessed with a 0-based index or column label.
2626
*
2727
* @author Antti Laisi

0 commit comments

Comments
 (0)