14
14
15
15
package com .github .pgasync ;
16
16
17
- import com .github .pgasync .conversion .DataConverter ;
18
17
import com .pgasync .Connection ;
19
18
import com .pgasync .Listening ;
20
19
import com .pgasync .PreparedStatement ;
21
20
import com .pgasync .Row ;
22
21
import com .pgasync .SqlException ;
23
- import com .pgasync .ConnectionPool ;
24
- import com .pgasync .ConnectionPoolBuilder ;
22
+ import com .pgasync .ConnectibleBuilder ;
25
23
import com .pgasync .ResultSet ;
26
24
import com .pgasync .Transaction ;
27
25
28
26
import javax .annotation .concurrent .GuardedBy ;
29
- import java .net .InetSocketAddress ;
30
- import java .nio .charset .Charset ;
31
27
import java .util .ArrayList ;
32
28
import java .util .Collection ;
29
+ import java .util .Iterator ;
33
30
import java .util .LinkedHashMap ;
34
31
import java .util .LinkedList ;
35
32
import java .util .Map ;
42
39
import java .util .function .Function ;
43
40
import java .util .logging .Level ;
44
41
import java .util .logging .Logger ;
45
- import java .util .stream .Collectors ;
46
42
47
43
/**
48
- * Pool for backend connections.
44
+ * Resource pool for backend connections.
49
45
*
50
46
* @author Antti Laisi
51
47
*/
52
- public abstract class PgConnectionPool implements ConnectionPool {
48
+ public abstract class PgConnectionPool extends PgConnectible {
53
49
54
50
private class PooledPgConnection implements Connection {
55
51
@@ -118,15 +114,29 @@ boolean isConnected() {
118
114
return delegate .isConnected ();
119
115
}
120
116
117
+ private void closeNextStatement (Iterator <PooledPgPreparedStatement > statementsSource , CompletableFuture <Void > onComplete ) {
118
+ if (statementsSource .hasNext ()) {
119
+ statementsSource .next ().delegate .close ()
120
+ .thenAccept (v -> {
121
+ statementsSource .remove ();
122
+ closeNextStatement (statementsSource , onComplete );
123
+ })
124
+ .exceptionally (th -> {
125
+ futuresExecutor .execute (() -> onComplete .completeExceptionally (th ));
126
+ return null ;
127
+ });
128
+ } else {
129
+ onComplete .completeAsync (() -> null , futuresExecutor );
130
+ }
131
+ }
132
+
121
133
CompletableFuture <Void > shutdown () {
122
- CompletableFuture <?>[] closeTasks = statements .values ().stream ()
123
- .map (stmt -> stmt .delegate .close ())
124
- .collect (Collectors .toList ()).toArray (new CompletableFuture <?>[]{});
125
- statements .clear ();
126
- return CompletableFuture .allOf (closeTasks )
134
+ CompletableFuture <Void > onComplete = new CompletableFuture <>();
135
+ closeNextStatement (statements .values ().iterator (), onComplete );
136
+ return onComplete
127
137
.thenApply (v -> {
128
138
if (!statements .isEmpty ()) {
129
- Logger . getLogger ( PooledPgConnection . class . getName ()). log ( Level . WARNING , "Stale prepared statements detected {0}" , statements .size ());
139
+ throw new IllegalStateException ( "Stale prepared statements detected (" + statements .size () + ")" );
130
140
}
131
141
return delegate .close ();
132
142
})
@@ -155,7 +165,20 @@ public CompletableFuture<Void> script(BiConsumer<Map<String, PgColumn>, PgColumn
155
165
156
166
@ Override
157
167
public CompletableFuture <Integer > query (BiConsumer <Map <String , PgColumn >, PgColumn []> onColumns , Consumer <Row > onRow , String sql , Object ... params ) {
158
- return delegate .query (onColumns , onRow , sql , params );
168
+ return prepareStatement (sql , dataConverter .assumeTypes (params ))
169
+ .thenApply (stmt ->
170
+ stmt .fetch (onColumns , onRow , params )
171
+ .handle ((affected , th ) ->
172
+ stmt .close ()
173
+ .thenApply (v -> {
174
+ if (th == null ) {
175
+ return affected ;
176
+ } else {
177
+ throw new RuntimeException (th );
178
+ }
179
+ })
180
+ ).thenCompose (Function .identity ())
181
+ ).thenCompose (Function .identity ());
159
182
}
160
183
161
184
@ Override
@@ -171,6 +194,7 @@ public CompletableFuture<PreparedStatement> prepareStatement(String sql, Oid...
171
194
172
195
private class PooledPgPreparedStatement implements PreparedStatement {
173
196
197
+ private static final String DUPLICATED_PREPARED_STATEMENT_DETECTED = "Duplicated prepared statement detected. Closing extra instance. \n {0}" ;
174
198
private final String sql ;
175
199
private final PgConnection .PgPreparedStatement delegate ;
176
200
@@ -181,15 +205,27 @@ private PooledPgPreparedStatement(String sql, PgConnection.PgPreparedStatement d
181
205
182
206
@ Override
183
207
public CompletableFuture <Void > close () {
184
- statements .put (sql , this );
208
+ PooledPgPreparedStatement already = statements .put (sql , this );
185
209
if (evicted != null ) {
186
210
try {
187
- return evicted .delegate .close ();
211
+ if (already != null && already != evicted ) {
212
+ Logger .getLogger (PgConnectionPool .class .getName ()).log (Level .WARNING , DUPLICATED_PREPARED_STATEMENT_DETECTED , already .sql );
213
+ return evicted .delegate .close ()
214
+ .thenApply (v -> already .delegate .close ())
215
+ .thenCompose (Function .identity ());
216
+ } else {
217
+ return evicted .delegate .close ();
218
+ }
188
219
} finally {
189
220
evicted = null ;
190
221
}
191
222
} else {
192
- return CompletableFuture .completedFuture (null );
223
+ if (already != null ) {
224
+ Logger .getLogger (PgConnectionPool .class .getName ()).log (Level .WARNING , DUPLICATED_PREPARED_STATEMENT_DETECTED , already .sql );
225
+ return already .delegate .close ();
226
+ } else {
227
+ return CompletableFuture .completedFuture (null );
228
+ }
193
229
}
194
230
}
195
231
@@ -207,9 +243,7 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
207
243
208
244
private final int maxConnections ;
209
245
private final int maxStatements ;
210
- private final String validationQuery ;
211
246
private final ReentrantLock lock = new ReentrantLock ();
212
- protected final Charset encoding ;
213
247
214
248
@ GuardedBy ("lock" )
215
249
private int size ;
@@ -220,25 +254,10 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
220
254
@ GuardedBy ("lock" )
221
255
private final Queue <PooledPgConnection > connections = new LinkedList <>();
222
256
223
- private final InetSocketAddress address ;
224
- private final String username ;
225
- private final String password ;
226
- private final String database ;
227
- private final DataConverter dataConverter ;
228
-
229
- protected final Executor futuresExecutor ;
230
-
231
- public PgConnectionPool (ConnectionPoolBuilder .PoolProperties properties , Executor futuresExecutor ) {
232
- this .address = InetSocketAddress .createUnresolved (properties .getHostname (), properties .getPort ());
233
- this .username = properties .getUsername ();
234
- this .password = properties .getPassword ();
235
- this .database = properties .getDatabase ();
257
+ public PgConnectionPool (ConnectibleBuilder .ConnectibleProperties properties , Executor futuresExecutor ) {
258
+ super (properties , futuresExecutor );
236
259
this .maxConnections = properties .getMaxConnections ();
237
260
this .maxStatements = properties .getMaxStatements ();
238
- this .dataConverter = properties .getDataConverter ();
239
- this .validationQuery = properties .getValidationQuery ();
240
- this .encoding = Charset .forName (properties .getEncoding ());
241
- this .futuresExecutor = futuresExecutor ;
242
261
}
243
262
244
263
@ Override
@@ -259,7 +278,7 @@ public CompletableFuture<Void> close() {
259
278
} finally {
260
279
lock .unlock ();
261
280
}
262
- return CompletableFuture .allOf (shutdownTasks .toArray (size -> new CompletableFuture <?>[size ] ));
281
+ return CompletableFuture .allOf (shutdownTasks .toArray (CompletableFuture <?>[]:: new ));
263
282
}
264
283
265
284
@ Override
@@ -280,7 +299,17 @@ public CompletableFuture<Connection> getConnection() {
280
299
.connect (username , password , database )
281
300
.thenApply (pooledConnection -> {
282
301
if (validationQuery != null && !validationQuery .isBlank ()) {
283
- return pooledConnection .completeQuery (validationQuery ).thenApply (rs -> pooledConnection );
302
+ return pooledConnection .completeScript (validationQuery )
303
+ .handle ((rss , th ) -> {
304
+ if (th != null ) {
305
+ return ((PooledPgConnection ) pooledConnection ).delegate .close ()
306
+ .thenApply (v -> CompletableFuture .<Connection >failedFuture (th ))
307
+ .thenCompose (Function .identity ());
308
+ } else {
309
+ return CompletableFuture .completedFuture (pooledConnection );
310
+ }
311
+ })
312
+ .thenCompose (Function .identity ());
284
313
} else {
285
314
return CompletableFuture .completedFuture (pooledConnection );
286
315
}
@@ -347,56 +376,4 @@ private CompletableFuture<Void> release(PooledPgConnection connection) {
347
376
return shutdownTask ;
348
377
}
349
378
350
- @ Override
351
- public CompletableFuture <Transaction > begin () {
352
- return getConnection ()
353
- .thenApply (Connection ::begin )
354
- .thenCompose (Function .identity ());
355
- }
356
-
357
- @ Override
358
- public CompletableFuture <Void > script (BiConsumer <Map <String , PgColumn >, PgColumn []> onColumns , Consumer <Row > onRow , Consumer <Integer > onAffected , String sql ) {
359
- return getConnection ()
360
- .thenApply (connection ->
361
- connection .script (onColumns , onRow , onAffected , sql )
362
- .handle ((message , th ) ->
363
- connection .close ()
364
- .thenApply (v -> {
365
- if (th == null ) {
366
- return message ;
367
- } else {
368
- throw new RuntimeException (th );
369
- }
370
- })
371
- ).thenCompose (Function .identity ())
372
- )
373
- .thenCompose (Function .identity ());
374
- }
375
-
376
- @ Override
377
- public CompletableFuture <Integer > query (BiConsumer <Map <String , PgColumn >, PgColumn []> onColumns , Consumer <Row > onRow , String sql , Object ... params ) {
378
- return getConnection ()
379
- .thenApply (connection ->
380
- connection .query (onColumns , onRow , sql , params )
381
- .handle ((affected , th ) ->
382
- connection .close ()
383
- .thenApply (v -> {
384
- if (th == null ) {
385
- return affected ;
386
- } else {
387
- throw new RuntimeException (th );
388
- }
389
- })
390
- ).thenCompose (Function .identity ())
391
- )
392
- .thenCompose (Function .identity ());
393
- }
394
-
395
- /**
396
- * Creates a new socket stream to the backend.
397
- *
398
- * @param address Server address
399
- * @return Stream with no pending messages
400
- */
401
- protected abstract PgProtocolStream openStream (InetSocketAddress address );
402
379
}
0 commit comments