Skip to content

Commit 433fb51

Browse files
Connection pool closing fixed
1 parent 86b7b86 commit 433fb51

File tree

1 file changed

+26
-10
lines changed

1 file changed

+26
-10
lines changed

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import javax.annotation.concurrent.GuardedBy;
2222
import java.net.InetSocketAddress;
2323
import java.nio.charset.Charset;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
2426
import java.util.LinkedHashMap;
2527
import java.util.LinkedList;
2628
import java.util.Map;
@@ -31,6 +33,9 @@
3133
import java.util.function.BiConsumer;
3234
import java.util.function.Consumer;
3335
import java.util.function.Function;
36+
import java.util.logging.Level;
37+
import java.util.logging.Logger;
38+
import java.util.stream.Collectors;
3439

3540
/**
3641
* Pool for backend connections.
@@ -106,16 +111,24 @@ boolean isConnected() {
106111
return delegate.isConnected();
107112
}
108113

109-
void shutdown() {
110-
statements.forEach((sql, stmt) -> stmt.delegate.close().join());
114+
CompletableFuture<Void> shutdown() {
115+
CompletableFuture<?>[] closeTasks = statements.values().stream()
116+
.map(stmt -> stmt.delegate.close())
117+
.collect(Collectors.toList()).toArray(new CompletableFuture<?>[]{});
111118
statements.clear();
112-
delegate.close().join();
119+
return CompletableFuture.allOf(closeTasks)
120+
.thenApply(v -> {
121+
if (!statements.isEmpty()) {
122+
Logger.getLogger(PooledPgConnection.class.getName()).log(Level.WARNING, "Stale prepared statements detected {0}", statements.size());
123+
}
124+
return delegate.close();
125+
})
126+
.thenCompose(Function.identity());
113127
}
114128

115129
@Override
116130
public CompletableFuture<Void> close() {
117-
release(this);
118-
return CompletableFuture.completedFuture(null);
131+
return release(this);
119132
}
120133

121134
@Override
@@ -222,6 +235,7 @@ public PgConnectionPool(PoolProperties properties, Executor futuresExecutor) {
222235

223236
@Override
224237
public CompletableFuture<Void> close() {
238+
Collection<CompletableFuture<Void>> shutdownTasks = new ArrayList<>();
225239
lock.lock();
226240
try {
227241
closed = true;
@@ -231,13 +245,13 @@ public CompletableFuture<Void> close() {
231245
}
232246
while (!connections.isEmpty()) {
233247
PooledPgConnection connection = connections.poll();
234-
connection.shutdown();
248+
shutdownTasks.add(connection.shutdown());
235249
size--;
236250
}
237251
} finally {
238252
lock.unlock();
239253
}
240-
return CompletableFuture.completedFuture(null);
254+
return CompletableFuture.allOf(shutdownTasks.toArray(size -> new CompletableFuture<?>[size]));
241255
}
242256

243257
@Override
@@ -297,15 +311,16 @@ private boolean tryIncreaseSize() {
297311
}
298312
}
299313

300-
private void release(PooledPgConnection connection) {
314+
private CompletableFuture<Void> release(PooledPgConnection connection) {
301315
if (connection == null) {
302-
throw new IllegalArgumentException("'connection' should be non null to be returned to the pool");
316+
throw new IllegalArgumentException("'connection' should be not null");
303317
}
318+
CompletableFuture<Void> shutdownTask = CompletableFuture.completedFuture(null);
304319
lock.lock();
305320
try {
306321
if (closed) {
307322
if (connection.isConnected()) {
308-
connection.shutdown();
323+
shutdownTask = connection.shutdown();
309324
}
310325
} else {
311326
if (connection.isConnected()) {
@@ -321,6 +336,7 @@ private void release(PooledPgConnection connection) {
321336
} finally {
322337
lock.unlock();
323338
}
339+
return shutdownTask;
324340
}
325341

326342
@Override

0 commit comments

Comments
 (0)