Skip to content

Commit f413c15

Browse files
Merge pull request #2 from marat-gainullin/soft-locks
Lock have been softend. Waiting policy has been changed
2 parents 5ae3f6f + cba56a0 commit f413c15

File tree

2 files changed

+131
-149
lines changed

2 files changed

+131
-149
lines changed

src/main/java/com/github/pgasync/PgConnectionPool.java

Lines changed: 130 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,15 @@
2424
import com.pgasync.Transaction;
2525

2626
import javax.annotation.concurrent.GuardedBy;
27-
import java.util.ArrayList;
28-
import java.util.Collection;
29-
import java.util.Iterator;
30-
import java.util.LinkedHashMap;
31-
import java.util.LinkedList;
32-
import java.util.Map;
33-
import java.util.Queue;
27+
import java.util.*;
3428
import java.util.concurrent.CompletableFuture;
3529
import java.util.concurrent.Executor;
3630
import java.util.concurrent.locks.Lock;
3731
import java.util.concurrent.locks.ReentrantLock;
38-
import java.util.function.BiConsumer;
39-
import java.util.function.Consumer;
40-
import java.util.function.Function;
41-
import java.util.function.Supplier;
32+
import java.util.function.*;
4233
import java.util.logging.Level;
4334
import java.util.logging.Logger;
35+
import java.util.stream.Collectors;
4436

4537
/**
4638
* Resource pool for backend connections.
@@ -128,7 +120,7 @@ private void closeNextStatement(Iterator<PooledPgPreparedStatement> statementsSo
128120
return null;
129121
});
130122
} else {
131-
onComplete.completeAsync(() -> null, futuresExecutor);
123+
futuresExecutor.execute(() -> onComplete.complete(null));
132124
}
133125
}
134126

@@ -147,7 +139,8 @@ CompletableFuture<Void> shutdown() {
147139

148140
@Override
149141
public CompletableFuture<Void> close() {
150-
return release(this);
142+
release(this);
143+
return CompletableFuture.completedFuture(null);
151144
}
152145

153146
@Override
@@ -245,173 +238,162 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
245238

246239
private final int maxConnections;
247240
private final int maxStatements;
248-
private final Lock lock = new ReentrantLock();
249241

250-
@GuardedBy("lock")
242+
private final Lock guard = new ReentrantLock();
243+
@GuardedBy("guard")
251244
private int size;
252-
@GuardedBy("lock")
253-
private boolean closed;
254-
@GuardedBy("lock")
255-
private final Queue<CompletableFuture<? super Connection>> uponAvailableSubscribers = new LinkedList<>();
256-
@GuardedBy("lock")
257-
private final Queue<PooledPgConnection> availableConnections = new LinkedList<>();
258-
@GuardedBy("lock")
259-
private CompletableFuture<Void> uponFullyAvailable;
245+
@GuardedBy("guard")
246+
private final Queue<CompletableFuture<? super Connection>> pending = new LinkedList<>();
247+
@GuardedBy("guard")
248+
private final Queue<PooledPgConnection> connections = new LinkedList<>();
249+
@GuardedBy("guard")
250+
private CompletableFuture<Void> closing;
260251

261252
public PgConnectionPool(ConnectibleBuilder.ConnectibleProperties properties, Supplier<CompletableFuture<ProtocolStream>> obtainStream, Executor futuresExecutor) {
262253
super(properties, obtainStream, futuresExecutor);
263254
this.maxConnections = properties.getMaxConnections();
264255
this.maxStatements = properties.getMaxStatements();
265256
}
266257

267-
private CompletableFuture<Void> fullyAvailable() {
268-
if (uponFullyAvailable == null) {
269-
if (size <= availableConnections.size()) {
270-
return CompletableFuture.completedFuture(null);
271-
} else {
272-
uponFullyAvailable = new CompletableFuture<>();
273-
return uponFullyAvailable;
274-
}
275-
} else {
276-
return CompletableFuture.failedFuture(new IllegalStateException("Only a single 'fullyAvailable' request at a time is supported"));
258+
private <T> T locked(Supplier<T> action) {
259+
guard.lock();
260+
try {
261+
return action.get();
262+
} finally {
263+
guard.unlock();
277264
}
278265
}
279266

280-
@Override
281-
public CompletableFuture<Void> close() {
282-
lock.lock();
283-
try {
284-
closed = true;
285-
while (!uponAvailableSubscribers.isEmpty()) {
286-
CompletableFuture<? super Connection> queued = uponAvailableSubscribers.poll();
287-
futuresExecutor.execute(() -> queued.completeExceptionally(new SqlException("Connection pool is closing")));
288-
}
289-
return fullyAvailable()
290-
.thenApply(v -> {
291-
lock.lock();
292-
try {
293-
uponFullyAvailable = null;
294-
Collection<CompletableFuture<Void>> shutdownTasks = new ArrayList<>();
295-
while (!availableConnections.isEmpty()) {
296-
PooledPgConnection connection = availableConnections.poll();
297-
shutdownTasks.add(connection.shutdown());
298-
size--;
299-
}
300-
return CompletableFuture.allOf(shutdownTasks.toArray(CompletableFuture<?>[]::new));
301-
} finally {
302-
lock.unlock();
303-
}
304-
})
305-
.thenCompose(Function.identity());
306-
} finally {
307-
lock.unlock();
267+
private void release(PooledPgConnection connection) {
268+
if (connection == null) {
269+
throw new IllegalArgumentException("'connection' should be not null");
308270
}
271+
Runnable lucky = locked(() -> {
272+
CompletableFuture<? super Connection> nextUser = pending.poll();
273+
if (nextUser != null) {
274+
return () -> nextUser.complete(connection);
275+
} else {
276+
connections.add(connection);
277+
return checkClosed();
278+
}
279+
});
280+
futuresExecutor.execute(lucky);
309281
}
310282

311283
@Override
312284
public CompletableFuture<Connection> getConnection() {
313-
CompletableFuture<Connection> uponAvailable = new CompletableFuture<>();
314-
lock.lock();
315-
try {
316-
if (closed) {
317-
futuresExecutor.execute(() -> uponAvailable.completeExceptionally(new SqlException("Connection pool is closed")));
285+
if (locked(() -> closing != null)) {
286+
return CompletableFuture.failedFuture(new SqlException("Connection pool is closed"));
287+
} else {
288+
Connection cached = locked(this::firstAliveConnection);
289+
if (cached != null) {
290+
return CompletableFuture.completedFuture(cached);
318291
} else {
319-
Connection connection = firstAliveConnection();
320-
if (connection != null) {
321-
uponAvailable.completeAsync(() -> connection, futuresExecutor);
322-
} else {
323-
if (tryIncreaseSize()) {
324-
obtainStream.get()
325-
.thenApply(stream -> new PooledPgConnection(new PgConnection(stream, dataConverter))
326-
.connect(username, password, database))
327-
.thenCompose(Function.identity())
328-
.thenApply(pooledConnection -> {
329-
if (validationQuery != null && !validationQuery.isBlank()) {
330-
return pooledConnection.completeScript(validationQuery)
331-
.handle((rss, th) -> {
332-
if (th != null) {
333-
return ((PooledPgConnection) pooledConnection).delegate.close()
334-
.thenApply(v -> CompletableFuture.<Connection>failedFuture(th))
335-
.thenCompose(Function.identity());
336-
} else {
337-
return CompletableFuture.completedFuture(pooledConnection);
338-
}
339-
})
340-
.thenCompose(Function.identity());
341-
} else {
342-
return CompletableFuture.completedFuture(pooledConnection);
343-
}
344-
})
345-
.thenCompose(Function.identity())
346-
.thenAccept(pooledConnection -> uponAvailable.completeAsync(() -> pooledConnection, futuresExecutor))
347-
.exceptionally(th -> {
348-
lock.lock();
349-
try {
350-
size--;
351-
futuresExecutor.execute(() -> uponAvailable.completeExceptionally(th));
352-
if (uponFullyAvailable != null && size <= availableConnections.size()) {
353-
uponFullyAvailable.completeAsync(null, futuresExecutor);
354-
}
355-
return null;
356-
} finally {
357-
lock.unlock();
358-
}
359-
});
292+
CompletableFuture<Connection> deferred = new CompletableFuture<>();
293+
boolean makeNewConnection = locked(() -> {
294+
pending.add(deferred);
295+
if (size < maxConnections) {
296+
size++;
297+
return true;
360298
} else {
361-
// Pool is full now and all connections are busy
362-
uponAvailableSubscribers.offer(uponAvailable);
299+
return false;
363300
}
301+
});
302+
if (makeNewConnection) {
303+
obtainStream.get()
304+
.thenApply(stream -> new PooledPgConnection(new PgConnection(stream, dataConverter))
305+
.connect(username, password, database))
306+
.thenCompose(Function.identity())
307+
.thenApply(pooledConnection -> {
308+
if (validationQuery != null && !validationQuery.isBlank()) {
309+
return pooledConnection.completeScript(validationQuery)
310+
.handle((rss, th) -> {
311+
if (th != null) {
312+
return ((PooledPgConnection) pooledConnection).delegate.close()
313+
.thenApply(v -> CompletableFuture.<Connection>failedFuture(th))
314+
.thenCompose(Function.identity());
315+
} else {
316+
return CompletableFuture.completedFuture(pooledConnection);
317+
}
318+
})
319+
.thenCompose(Function.identity());
320+
} else {
321+
return CompletableFuture.completedFuture(pooledConnection);
322+
}
323+
})
324+
.thenCompose(Function.identity())
325+
.whenComplete((connected, th) -> {
326+
if (th == null) {
327+
release((PooledPgConnection) connected);
328+
} else {
329+
Collection<Runnable> actions = locked(() -> {
330+
size--;
331+
List<Runnable> unlucky = pending.stream()
332+
.<Runnable>map(item -> () ->
333+
item.completeExceptionally(th))
334+
.collect(Collectors.toList());
335+
unlucky.add(checkClosed());
336+
pending.clear();
337+
return unlucky;
338+
});
339+
actions.forEach(futuresExecutor::execute);
340+
}
341+
});
364342
}
343+
return deferred;
365344
}
366-
} finally {
367-
lock.unlock();
368345
}
369-
370-
return uponAvailable;
371346
}
372347

373-
private Connection firstAliveConnection() {
374-
Connection connection = availableConnections.poll();
375-
while (connection != null && !connection.isConnected()) {
376-
size--;
377-
connection = availableConnections.poll();
348+
private static class CloseTuple {
349+
private final CompletableFuture<Void> closing;
350+
private final Runnable immediate;
351+
352+
public CloseTuple(CompletableFuture<Void> closing, Runnable immediate) {
353+
this.closing = closing;
354+
this.immediate = immediate;
378355
}
379-
return connection;
380356
}
381357

382-
private boolean tryIncreaseSize() {
383-
if (size < maxConnections) {
384-
size++;
385-
return true;
358+
@Override
359+
public CompletableFuture<Void> close() {
360+
CloseTuple tuple = locked(() -> {
361+
if (closing == null) {
362+
closing = new CompletableFuture<>()
363+
.thenApply(v -> locked(() ->
364+
CompletableFuture.allOf(connections.stream()
365+
.map(PooledPgConnection::shutdown)
366+
.toArray(CompletableFuture[]::new)
367+
)
368+
))
369+
.thenCompose(Function.identity());
370+
return new CloseTuple(closing, checkClosed());
371+
} else {
372+
return new CloseTuple(CompletableFuture.failedFuture(new IllegalStateException("PG pool is already shutting down")), NO_OP);
373+
}
374+
});
375+
futuresExecutor.execute(tuple.immediate);
376+
return tuple.closing;
377+
}
378+
379+
private static final Runnable NO_OP = () -> {
380+
};
381+
382+
private Runnable checkClosed() {
383+
if (closing != null && size <= connections.size()) {
384+
assert pending.isEmpty();
385+
return () -> closing.complete(null);
386386
} else {
387-
return false;
387+
return NO_OP;
388388
}
389389
}
390390

391-
private CompletableFuture<Void> release(PooledPgConnection connection) {
392-
if (connection == null) {
393-
throw new IllegalArgumentException("'connection' should be not null");
394-
}
395-
lock.lock();
396-
try {
397-
if (connection.isConnected()) {
398-
if (!uponAvailableSubscribers.isEmpty()) {
399-
uponAvailableSubscribers.poll().completeAsync(() -> connection, futuresExecutor);
400-
} else {
401-
availableConnections.offer(connection);
402-
if (uponFullyAvailable != null && size <= availableConnections.size()) {
403-
uponFullyAvailable.complete(null);
404-
}
405-
}
406-
} else {
407-
size--;
408-
if (uponFullyAvailable != null && size <= availableConnections.size()) {
409-
uponFullyAvailable.complete(null);
410-
}
411-
}
412-
} finally {
413-
lock.unlock();
391+
private Connection firstAliveConnection() {
392+
Connection connection = connections.poll();
393+
while (connection != null && !connection.isConnected()) {
394+
size--;
395+
connection = connections.poll();
414396
}
415-
return CompletableFuture.completedFuture(null);
397+
return connection;
416398
}
417399
}

src/test/java/com/github/pgasync/PerformanceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private void performBatches(SortedMap<Integer, SortedMap<Integer, Long>> results
117117

118118
private class Batch {
119119

120-
private long batchSize;
120+
private final long batchSize;
121121
private long performed;
122122
private long startedAt;
123123
private CompletableFuture<Long> onBatch;

0 commit comments

Comments
 (0)