14
14
15
15
package com .github .pgasync ;
16
16
17
+ import com .pgasync .Connection ;
17
18
import com .pgasync .ConnectionPool ;
18
- import com .pgasync .PreparedStatement ;
19
19
import org .junit .*;
20
20
import org .junit .runner .RunWith ;
21
21
import org .junit .runners .Parameterized ;
22
22
import org .junit .runners .Parameterized .Parameters ;
23
23
24
24
import java .util .*;
25
25
import java .util .concurrent .*;
26
- import java .util .concurrent .atomic .LongAdder ;
27
26
import java .util .function .Function ;
27
+ import java .util .stream .Collectors ;
28
28
import java .util .stream .IntStream ;
29
29
import java .util .stream .LongStream ;
30
30
31
31
import static com .github .pgasync .DatabaseRule .createPoolBuilder ;
32
32
import static java .lang .System .currentTimeMillis ;
33
33
import static java .lang .System .out ;
34
- import static org .junit .runners .MethodSorters .NAME_ASCENDING ;
35
34
36
35
@ RunWith (Parameterized .class )
37
- @ FixMethodOrder (NAME_ASCENDING )
38
36
public class PerformanceTest {
39
37
40
38
private static final String SELECT_42 = "select 42" ;
@@ -66,49 +64,33 @@ private static String key(int poolSize) {
66
64
private final int poolSize ;
67
65
private final int numThreads ;
68
66
private ConnectionPool pool ;
69
- private PreparedStatement stmt ;
70
67
71
68
public PerformanceTest (int poolSize , int numThreads ) {
72
69
this .poolSize = poolSize ;
73
70
this .numThreads = numThreads ;
74
71
}
75
72
76
73
@ Before
77
- public void setup () throws Exception {
74
+ public void setup () {
78
75
pool = dbr .builder
79
76
.password ("async-pg" )
80
77
.maxConnections (poolSize )
81
78
.build ();
82
- stmt = pool .getConnection ().get ().prepareStatement (SELECT_42 ).get ();
79
+ List <Connection > connections = IntStream .range (0 , poolSize )
80
+ .mapToObj (i -> pool .getConnection ().join ()).collect (Collectors .toList ());
81
+ connections .forEach (connection -> {
82
+ connection .prepareStatement (SELECT_42 ).join ().close ().join ();
83
+ connection .close ().join ();
84
+ });
83
85
}
84
86
85
87
@ After
86
88
public void tearDown () {
87
- stmt .close ().join ();
88
89
pool .close ().join ();
89
90
}
90
91
91
- /*
92
- @Test(timeout = 2000)
93
- public void t1_preAllocatePool() throws Exception {
94
- CompletableFuture.allOf((CompletableFuture<?>[]) IntStream.range(0, poolSize)
95
- .mapToObj(i -> pool.getConnection()
96
- .thenApply(connection ->
97
- connection.prepareStatement(SELECT_42)
98
- .thenApply(PreparedStatement::close)
99
- .thenCompose(Function.identity())
100
- .thenApply(v -> connection.close())
101
- .thenCompose(Function.identity())
102
- )
103
- .thenCompose(Function.identity())
104
- )
105
- .toArray(size -> new CompletableFuture<?>[size])
106
- ).get();
107
- }
108
- */
109
-
110
92
@ Test
111
- public void t3_run () {
93
+ public void observeSomeBatches () {
112
94
double mean = LongStream .range (0 , repeats )
113
95
.map (i -> {
114
96
try {
@@ -141,7 +123,24 @@ private CompletableFuture<Long> perform() {
141
123
}
142
124
143
125
private void nextSample () {
144
- stmt .query ()
126
+ pool .getConnection ()
127
+ .thenApply (connection ->
128
+ connection .prepareStatement (SELECT_42 )
129
+ .thenApply (stmt -> stmt .query ()
130
+ .thenApply (rs -> stmt .close ())
131
+ .exceptionally (th -> stmt .close ().whenComplete ((v , _th ) -> {
132
+ throw new RuntimeException (th );
133
+ }))
134
+ .thenCompose (Function .identity ())
135
+ .thenApply (v -> connection .close ())
136
+ .exceptionally (th -> connection .close ().whenComplete ((v , _th ) -> {
137
+ throw new RuntimeException (th );
138
+ }))
139
+ .thenCompose (Function .identity ())
140
+ )
141
+ .thenCompose (Function .identity ())
142
+ )
143
+ .thenCompose (Function .identity ())
145
144
.thenAccept (v -> {
146
145
if (++performed < batchSize ) {
147
146
nextSample ();
@@ -154,54 +153,10 @@ private void nextSample() {
154
153
onBatch .completeExceptionally (th );
155
154
return null ;
156
155
});
157
- }
158
- }
159
-
160
- /*
161
- private long performBatch() throws Exception {
162
- List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
163
- long startTime = currentTimeMillis();
164
- for (int i = 0; i < batchSize; i++) {
165
-
166
- batchFutures.add(pool.getConnection()
167
- .thenApply(connection -> connection.prepareStatement(SELECT_42)
168
- .thenApply(stmt -> {
169
- return stmt.query()
170
- .handle((v, th) ->
171
- stmt.close()
172
- .thenAccept(_v -> {
173
- if (th != null) {
174
- throw new RuntimeException(th);
175
- }
176
- })
177
- )
178
- .thenCompose(Function.identity());
179
- }
180
- )
181
- .thenCompose(Function.identity())
182
- .handle((v, th) -> connection.close()
183
- .thenAccept(_v -> {
184
- if (th != null) {
185
- throw new RuntimeException(th);
186
- }
187
- }))
188
- .thenCompose(Function.identity())
189
- )
190
- .thenCompose(Function.identity())
191
- .exceptionally(th -> {
192
- throw new AssertionError(th);
193
- }));
194
156
195
- // batchFutures.add(pool.completeScript(SELECT_42).thenAccept(rs -> {
196
- // }));
197
157
}
198
- CompletableFuture
199
- .allOf(batchFutures.toArray(new CompletableFuture<?>[]{}))
200
- .get();
201
- long duration = currentTimeMillis() - startTime;
202
- return duration;
203
158
}
204
- */
159
+
205
160
@ AfterClass
206
161
public static void printResults () {
207
162
out .println ();
0 commit comments