Skip to content

Commit

Permalink
jooby: update reactive db tests (TechEmpower#5854)
Browse files Browse the repository at this point in the history
- Upgrade reactive driver
- Use a thread-local pgpool
- jooby 2.8.9
  • Loading branch information
jknack authored Jul 9, 2020
1 parent 4c550f0 commit 90a40a6
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 100 deletions.
11 changes: 5 additions & 6 deletions frameworks/Java/jooby/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
<name>jooby</name>

<properties>
<jooby.version>2.8.8</jooby.version>
<!-- downgrade netty and make pg-client happy -->
<netty.version>4.1.34.Final</netty.version>
<jooby.version>2.8.9</jooby.version>
<netty.version>4.1.49.Final</netty.version>
<postgresql.version>42.2.13</postgresql.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down Expand Up @@ -67,9 +66,9 @@
</dependency>

<dependency>
<groupId>io.reactiverse</groupId>
<artifactId>reactive-pg-client</artifactId>
<version>0.11.4</version>
<groupId>io.vertx</groupId>
<artifactId>vertx-pg-client</artifactId>
<version>3.9.1</version>
</dependency>
</dependencies>

Expand Down
50 changes: 30 additions & 20 deletions frameworks/Java/jooby/src/main/java/com/techempower/PgClients.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
package com.techempower;

import io.reactiverse.pgclient.PgClient;
import io.reactiverse.pgclient.PgPool;
import io.reactiverse.pgclient.PgPoolOptions;
import com.typesafe.config.Config;
import io.vertx.core.Vertx;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import io.vertx.core.VertxOptions;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;

public class PgClients {
private final Iterator<PgPool> iterator;
private static final PoolOptions SINGLE = new PoolOptions().setMaxSize(1);

private final PgConnectOptions connectOptions;

private final Vertx vertx;

private ThreadLocal<PgPool> sqlClient = ThreadLocal.withInitial(this::sqlClientPool);

public PgClients(Config config) {
this.vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setWorkerPoolSize(4));
this.connectOptions = pgPoolOptions(config);
}

private PgClients(Collection<PgPool> clients) {
iterator = Stream.generate(() -> clients).flatMap(Collection::stream).iterator();
public PgPool next() {
return sqlClient.get();
}

public synchronized PgPool next() {
return iterator.next();
private PgConnectOptions pgPoolOptions(Config config) {
PgConnectOptions options = new PgConnectOptions();
options.setDatabase(config.getString("databaseName"));
options.setHost(config.getString("serverName"));
options.setPort(config.getInt("portNumber"));
options.setUser(config.getString("user"));
options.setPassword(config.getString("password"));
options.setCachePreparedStatements(true);
return options;
}

public static PgClients create(Vertx vertx, PgPoolOptions options) {
List<PgPool> clients = new ArrayList<>();
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
clients.add(PgClient.pool(vertx, options));
}
return new PgClients(clients);
private PgPool sqlClientPool() {
return PgPool.pool(vertx, connectOptions, SINGLE);
}
}
120 changes: 47 additions & 73 deletions frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package com.techempower;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import io.jooby.Jooby;
import io.jooby.json.JacksonModule;
import io.jooby.rocker.RockerModule;
import io.reactiverse.pgclient.PgClient;
import io.reactiverse.pgclient.PgConnection;
import io.reactiverse.pgclient.PgIterator;
import io.reactiverse.pgclient.PgPoolOptions;
import io.reactiverse.pgclient.Row;
import io.reactiverse.pgclient.Tuple;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.Tuple;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -33,9 +28,7 @@ public class ReactivePg extends Jooby {

{
/** PG client: */
Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
PgPoolOptions options = pgPoolOptions(getConfig().getConfig("db"));
PgClients clients = PgClients.create(vertx, new PgPoolOptions(options).setMaxSize(1));
PgClients clients = new PgClients(getConfig().getConfig("db"));

/** Template engine: */
install(new RockerModule());
Expand All @@ -46,13 +39,14 @@ public class ReactivePg extends Jooby {

/** Single query: */
get("/db", ctx -> {
clients.next().preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), rsp -> {
clients.next().preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), rsp -> {
try {
if (rsp.succeeded()) {
PgIterator rs = rsp.result().iterator();
RowIterator<Row> rs = rsp.result().iterator();
Row row = rs.next();
ctx.setResponseType(JSON)
.send(mapper.writeValueAsBytes(new World(row.getInteger(0), row.getInteger(1))));
.send(
mapper.writeValueAsBytes(new World(row.getInteger(0), row.getInteger(1))));
} else {
ctx.sendError(rsp.cause());
}
Expand All @@ -69,11 +63,11 @@ public class ReactivePg extends Jooby {
AtomicInteger counter = new AtomicInteger();
AtomicBoolean failed = new AtomicBoolean(false);
World[] result = new World[queries];
PgClient client = clients.next();
PgPool client = clients.next();
for (int i = 0; i < result.length; i++) {
client.preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), rsp -> {
client.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), rsp -> {
if (rsp.succeeded()) {
PgIterator rs = rsp.result().iterator();
RowIterator<Row> rs = rsp.result().iterator();
Row row = rs.next();
result[counter.get()] = new World(row.getInteger(0), row.getInteger(1));
} else {
Expand Down Expand Up @@ -101,60 +95,51 @@ public class ReactivePg extends Jooby {
World[] result = new World[queries];
AtomicInteger counter = new AtomicInteger(0);
AtomicBoolean failed = new AtomicBoolean(false);
clients.next().getConnection(ar -> {
if (ar.failed()) {
if (failed.compareAndSet(false, true)) {
ctx.sendError(ar.cause());
PgPool pool = clients.next();
for (int i = 0; i < queries; i++) {
pool.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), query -> {
if (query.succeeded()) {
RowIterator<Row> rs = query.result().iterator();
Tuple row = rs.next();
World world = new World(row.getInteger(0), randomWorld());
result[counter.get()] = world;
} else {
if (failed.compareAndSet(false, true)) {
ctx.sendError(query.cause());
return;
}
}
return;
}
PgConnection conn = ar.result();
for (int i = 0; i < queries; i++) {
conn.preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), query -> {
if (query.succeeded()) {
PgIterator rs = query.result().iterator();
Tuple row = rs.next();
World world = new World(row.getInteger(0), randomWorld());
result[counter.get()] = world;
} else {
conn.close();
if (failed.compareAndSet(false, true)) {
ctx.sendError(query.cause());
return;
}

if (counter.incrementAndGet() == queries && !failed.get()) {
List<Tuple> batch = new ArrayList<>(queries);
for (World world : result) {
batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
}

if (counter.incrementAndGet() == queries && !failed.get()) {
List<Tuple> batch = new ArrayList<>(queries);
for (World world : result) {
batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
}

conn.preparedBatch(UPDATE_WORLD, batch, update -> {
conn.close();
if (update.failed()) {
ctx.sendError(update.cause());
} else {
try {
ctx.setResponseType(JSON)
.send(mapper.writeValueAsBytes(result));
} catch (IOException x) {
ctx.sendError(x);
pool.preparedQuery(UPDATE_WORLD)
.executeBatch(batch, update -> {
if (update.failed()) {
ctx.sendError(update.cause());
} else {
try {
ctx.setResponseType(JSON)
.send(mapper.writeValueAsBytes(result));
} catch (IOException x) {
ctx.sendError(x);
}
}
}
});
}
});
}
});
});
}
});
}
return ctx;
});

/** Fortunes: */
get("/fortunes", ctx -> {
clients.next().preparedQuery(SELECT_FORTUNE, rsp -> {
clients.next().preparedQuery(SELECT_FORTUNE).execute(rsp -> {
if (rsp.succeeded()) {
PgIterator rs = rsp.result().iterator();
RowIterator<Row> rs = rsp.result().iterator();
List<Fortune> fortunes = new ArrayList<>();

while (rs.hasNext()) {
Expand All @@ -176,17 +161,6 @@ public class ReactivePg extends Jooby {
});
}

private PgPoolOptions pgPoolOptions(Config config) {
PgPoolOptions options = new PgPoolOptions();
options.setDatabase(config.getString("databaseName"));
options.setHost(config.getString("serverName"));
options.setPort(config.getInt("portNumber"));
options.setUser(config.getString("user"));
options.setPassword(config.getString("password"));
options.setCachePreparedStatements(true);
return options;
}

public static void main(String[] args) {
runApp(args, EVENT_LOOP, ReactivePg::new);
}
Expand Down
2 changes: 1 addition & 1 deletion frameworks/Kotlin/kooby/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<name>kooby: jooby+kotlin</name>

<properties>
<jooby.version>2.8.8</jooby.version>
<jooby.version>2.8.9</jooby.version>
<postgresql.version>42.2.13</postgresql.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down

0 comments on commit 90a40a6

Please sign in to comment.