diff --git a/build.gradle b/build.gradle index 1a956d6..9a1c1c4 100644 --- a/build.gradle +++ b/build.gradle @@ -17,7 +17,7 @@ */ ext { - projectVersion = '1.10.0.RELEASE' + projectVersion = '1.10.6' boonVersion = '0.5.7' boonGroup = "io.advantageous.boon" springFrameworkVersion = '4.2.5.RELEASE' diff --git a/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeService.java b/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeService.java index 1bb84bc..9820610 100644 --- a/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeService.java +++ b/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeService.java @@ -5,6 +5,12 @@ import io.advantageous.qbit.annotation.Service; import io.advantageous.qbit.annotation.http.GET; import io.advantageous.qbit.annotation.http.PUT; +import io.advantageous.qbit.reactive.Callback; +import io.advantageous.reakt.promise.Promise; +import io.advantageous.reakt.promise.Promises; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static io.advantageous.qbit.admin.ManagedServiceBuilder.managedServiceBuilder; @@ -18,12 +24,62 @@ public class TradeService { private long count; + final ExecutorService executorService = Executors.newFixedThreadPool(10); + @PUT("/trade") - public boolean t(final Trade trade) { - trade.getNm().hashCode(); - trade.getAmt(); - count++; - return true; + public void t(Callback cb, final Trade trade) { + executorService.submit(() -> { + trade.getNm().hashCode(); + trade.getAmt(); + count++; + cb.resolve(true); + try { + Thread.sleep(199); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + } + + + + @PUT("/a") + public void a(Callback cb, final Trade trade) { + trade.getNm().hashCode(); + trade.getAmt(); + count++; + cb.resolve(true); + } + + + + @PUT("/trade2") + public Promise t2(final Trade trade) { + return Promises.invokablePromise(promise -> { + executorService.submit(() -> { + trade.getNm().hashCode(); + trade.getAmt(); + count++; + promise.resolve(true); + try { + Thread.sleep(201); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + }); + } + + + @PUT("/trade2") + public Promise b(final Trade trade) { + return Promises.invokablePromise(promise -> { + trade.getNm().hashCode(); + trade.getAmt(); + count++; + promise.resolve(true); + }); } @GET("/count") diff --git a/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceAsync.java b/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceAsync.java index c3b5023..35ac8b6 100644 --- a/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceAsync.java +++ b/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceAsync.java @@ -1,10 +1,13 @@ package io.advantageous.qbit.example.perf.websocket; import io.advantageous.qbit.reactive.Callback; +import io.advantageous.reakt.promise.Promise; public interface TradeServiceAsync { - + void a(Callback callback, final Trade trade); void t(Callback callback, final Trade trade); + Promise b(final Trade trade); + Promise t2(final Trade trade); void count(Callback callback); } diff --git a/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceLoadTestWebSocket.java b/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceLoadTestWebSocket.java index a1c2bc5..98b0b43 100644 --- a/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceLoadTestWebSocket.java +++ b/examples/standalone/src/main/java/io/advantageous/qbit/example/perf/websocket/TradeServiceLoadTestWebSocket.java @@ -54,9 +54,12 @@ public static void main(final String... args) { totalCount += counts.get(c).get(); } + long seconds = (System.currentTimeMillis()-startTime)/1000; + puts("total", Str.num(totalCount), + "\tseconds", seconds, "\telapsed time", Str.num(System.currentTimeMillis()-startTime), - "\trate", Str.num(totalCount/(System.currentTimeMillis()-startTime)*1000)); + "\trate", Str.num(totalCount/seconds)); } } @@ -67,27 +70,37 @@ public static void main(final String... args) { * @param count holds the total count */ private static void runCalls(final int numCalls, final AtomicInteger count) { - final Client client = clientBuilder().setUri("/") - //.setHost("192.168.0.1") - .setAutoFlush(false).build(); + final Client client = clientBuilder().setUri("/").build(); final TradeServiceAsync tradeService = client.createProxy(TradeServiceAsync.class, "t"); client.startClient(); for (int call=0; call < numCalls; call++) { - tradeService.t(response -> { + tradeService.a(response -> { if (response) { count.incrementAndGet(); + //System.out.println("count " + count.get()); } }, new Trade("IBM", 1)); - /** Apply some back pressure. */ - if (call % 10 == 0) { - while (call - 5_000 > count.get()) { - Sys.sleep(10); +// tradeService.t2(new Trade("IBM", 1)) +// .then(response -> { +// if (response) { +// count.incrementAndGet(); +// } +// } +// ) +// .catchError(Throwable::printStackTrace) +// .invoke(); + + if (call % 100 == 0) { + if (call > count.get() - 2000) { + Sys.sleep(1); } + //flushServiceProxy(tradeService); } + } flushServiceProxy(tradeService); diff --git a/examples/standalone/src/main/resources/logback.xml b/examples/standalone/src/main/resources/logback.xml index befb3c5..e27ac4e 100644 --- a/examples/standalone/src/main/resources/logback.xml +++ b/examples/standalone/src/main/resources/logback.xml @@ -26,7 +26,7 @@ - + diff --git a/qbit-ext/scala/src/test/scala/io/advantageous/qbit/scala/QBitImplicitConversionsTest.scala b/qbit-ext/scala/src/test/scala/io/advantageous/qbit/scala/QBitImplicitConversionsTest.scala index 3751775..b53f517 100644 --- a/qbit-ext/scala/src/test/scala/io/advantageous/qbit/scala/QBitImplicitConversionsTest.scala +++ b/qbit-ext/scala/src/test/scala/io/advantageous/qbit/scala/QBitImplicitConversionsTest.scala @@ -19,9 +19,9 @@ class QBitImplicitConversionsTest extends FlatSpec with Matchers { } - "A Function" should "convert to a consumer" in { - CallbackBuilder.newCallbackBuilder().withErrorHandler((error: Throwable) => {}) - } +// "A Function" should "convert to a consumer" in { +// CallbackBuilder.newCallbackBuilder().withErrorHandler((error: Throwable) => {}) +// }