Skip to content

Commit 85b97ce

Browse files
Refactoring of EventGroup using
1 parent 4235468 commit 85b97ce

17 files changed

+538
-580
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
<dependency>
6767
<groupId>io.netty</groupId>
6868
<artifactId>netty-handler</artifactId>
69-
<version>4.1.11.Final</version>
69+
<version>4.1.33.Final</version>
7070
</dependency>
7171
<!-- https://mvnrepository.com/artifact/javax.xml.bind/jaxb-api -->
7272
<dependency>

src/main/java/com/github/pgasync/PgConnectible.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
import com.github.pgasync.conversion.DataConverter;
44
import com.pgasync.Connection;
5-
import com.pgasync.ConnectibleBuilder;
5+
import com.pgasync.NettyConnectibleBuilder;
66
import com.pgasync.Connectible;
77
import com.pgasync.Row;
88
import com.pgasync.Transaction;
99

10-
import java.net.InetSocketAddress;
1110
import java.nio.charset.Charset;
1211
import java.util.Map;
1312
import java.util.concurrent.CompletableFuture;
@@ -19,23 +18,23 @@
1918
public abstract class PgConnectible implements Connectible {
2019

2120
final String validationQuery;
22-
final InetSocketAddress address;
2321
final String username;
2422
final DataConverter dataConverter;
23+
final Executor futuresExecutor;
24+
final Function<Executor, ProtocolStream> toStream;
25+
2526
protected final String password;
2627
protected final String database;
2728
protected final Charset encoding;
2829

29-
protected final Executor futuresExecutor;
30-
31-
PgConnectible(ConnectibleBuilder.ConnectibleProperties properties, Executor futuresExecutor) {
32-
this.address = InetSocketAddress.createUnresolved(properties.getHostname(), properties.getPort());
30+
PgConnectible(NettyConnectibleBuilder.ConnectibleProperties properties, Function<Executor, ProtocolStream> toStream, Executor futuresExecutor) {
3331
this.username = properties.getUsername();
3432
this.password = properties.getPassword();
3533
this.database = properties.getDatabase();
3634
this.dataConverter = properties.getDataConverter();
3735
this.validationQuery = properties.getValidationQuery();
3836
this.encoding = Charset.forName(properties.getEncoding());
37+
this.toStream = toStream;
3938
this.futuresExecutor = futuresExecutor;
4039
}
4140

@@ -83,11 +82,4 @@ public CompletableFuture<Integer> query(BiConsumer<Map<String, PgColumn>, PgColu
8382
.thenCompose(Function.identity());
8483
}
8584

86-
/**
87-
* Creates a new socket stream to the backend.
88-
*
89-
* @param address Server address
90-
* @return Stream with no pending messages
91-
*/
92-
protected abstract PgProtocolStream openStream(InetSocketAddress address);
9385
}

src/main/java/com/github/pgasync/PgConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ private String next() {
129129

130130
private static final NameSequence preparedStatementNames = new NameSequence("s-");
131131

132-
private final PgProtocolStream stream;
132+
private final ProtocolStream stream;
133133
private final DataConverter dataConverter;
134134
private final Charset encoding;
135135

136136
private Columns currentColumns;
137137

138-
PgConnection(PgProtocolStream stream, DataConverter dataConverter, Charset encoding) {
138+
PgConnection(ProtocolStream stream, DataConverter dataConverter, Charset encoding) {
139139
this.stream = stream;
140140
this.dataConverter = dataConverter;
141141
this.encoding = encoding;

src/main/java/com/github/pgasync/PgConnectionPool.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import com.pgasync.PreparedStatement;
2020
import com.pgasync.Row;
2121
import com.pgasync.SqlException;
22-
import com.pgasync.ConnectibleBuilder;
22+
import com.pgasync.NettyConnectibleBuilder;
2323
import com.pgasync.ResultSet;
2424
import com.pgasync.Transaction;
2525

@@ -45,7 +45,7 @@
4545
*
4646
* @author Antti Laisi
4747
*/
48-
public abstract class PgConnectionPool extends PgConnectible {
48+
public class PgConnectionPool extends PgConnectible {
4949

5050
private class PooledPgConnection implements Connection {
5151

@@ -254,8 +254,8 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
254254
@GuardedBy("lock")
255255
private final Queue<PooledPgConnection> connections = new LinkedList<>();
256256

257-
public PgConnectionPool(ConnectibleBuilder.ConnectibleProperties properties, Executor futuresExecutor) {
258-
super(properties, futuresExecutor);
257+
public PgConnectionPool(NettyConnectibleBuilder.ConnectibleProperties properties, Function<Executor, ProtocolStream> addressToStream, Executor futuresExecutor) {
258+
super(properties, addressToStream, futuresExecutor);
259259
this.maxConnections = properties.getMaxConnections();
260260
this.maxStatements = properties.getMaxStatements();
261261
}
@@ -295,7 +295,7 @@ public CompletableFuture<Connection> getConnection() {
295295
uponAvailable.completeAsync(() -> connection, futuresExecutor);
296296
} else {
297297
if (tryIncreaseSize()) {
298-
new PooledPgConnection(new PgConnection(openStream(address), dataConverter, encoding))
298+
new PooledPgConnection(new PgConnection(toStream.apply(futuresExecutor), dataConverter, encoding))
299299
.connect(username, password, database)
300300
.thenApply(pooledConnection -> {
301301
if (validationQuery != null && !validationQuery.isBlank()) {

src/main/java/com/github/pgasync/PgDatabase.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
package com.github.pgasync;
22

33
import com.pgasync.Connection;
4-
import com.pgasync.ConnectibleBuilder;
4+
import com.pgasync.NettyConnectibleBuilder;
5+
56
import java.util.concurrent.CompletableFuture;
67
import java.util.concurrent.Executor;
78
import java.util.function.Function;
89

9-
public abstract class PgDatabase extends PgConnectible {
10+
public class PgDatabase extends PgConnectible {
1011

11-
public PgDatabase(ConnectibleBuilder.ConnectibleProperties properties, Executor futuresExecutor) {
12-
super(properties, futuresExecutor);
12+
public PgDatabase(NettyConnectibleBuilder.ConnectibleProperties properties, Function<Executor, ProtocolStream> toStream, Executor futuresExecutor) {
13+
super(properties, toStream, futuresExecutor);
1314
}
1415

1516
@Override
1617
public CompletableFuture<Connection> getConnection() {
17-
return new PgConnection(openStream(address), dataConverter, encoding)
18+
return new PgConnection(toStream.apply(futuresExecutor), dataConverter, encoding)
1819
.connect(username, password, database)
1920
.thenApply(connection -> {
2021
if (validationQuery != null && !validationQuery.isBlank()) {

0 commit comments

Comments
 (0)