Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
MammatusPlatypus committed May 3, 2016
1 parent dc25248 commit c15a9d5
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 189 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

ext {
projectVersion = '1.7.1'
projectVersion = '1.8.0'
boonVersion = '0.5.7'
boonGroup = "io.advantageous.boon"
springFrameworkVersion = '4.2.5.RELEASE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.sender.Sender;
import io.advantageous.qbit.service.BeforeMethodCall;
import io.advantageous.reakt.promise.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -259,33 +260,8 @@ public <T> T createProxy(final Class<T> serviceInterface, final String serviceNa

/** Use this before call to register an async handler with the handlers map. */
BeforeMethodCall beforeMethodCall = call -> {

final Object body = call.body();
if (body instanceof Object[]) {

Object[] list = (Object[]) body;

if (list.length > 0) {
final Object o = list[0];
if (o instanceof Callback) {
//noinspection unchecked
handlers.put(new HandlerKey(call.returnAddress(), call.id()), createHandler(serviceInterface, call, (Callback) o));

if (list.length - 1 == 0) {
list = new Object[0];
} else {
list = Arry.slc(list, 1); //Skip first arg it was a handler.
}

}
if (call instanceof MethodCallImpl) {
MethodCallImpl impl = (MethodCallImpl) call;
impl.setBody(list);
}

}
}

registerCallback(serviceInterface, call);
prepareBody(call);
return true;
};

Expand Down Expand Up @@ -316,6 +292,40 @@ public void stop() {
return proxy;
}

private void prepareBody(MethodCall call) {
final Object body = call.body();
if (body instanceof Object[]) {

Object[] list = (Object[]) body;

if (list.length > 0) {
final Object o = list[0];
if (o instanceof Callback) {
//noinspection unchecked
if (list.length - 1 == 0) {
list = new Object[0];
} else {
list = Arry.slc(list, 1); //Skip first arg it was a handler.
}
}
if (call instanceof MethodCallImpl) {
MethodCallImpl impl = (MethodCallImpl) call;
impl.setBody(list);
}

}
}
}

private <T> void registerCallback(Class<T> serviceInterface, MethodCall call) {
final Callback callback = call.callback();

if (callback != null) {
handlers.put(new HandlerKey(call.returnAddress(), call.id()),
createHandler(serviceInterface, call, callback));
}
}

/**
* Create an async handler. Uses some generics reflection to see what the actual type is
*
Expand All @@ -332,25 +342,46 @@ private <T> Callback createHandler(final Class<T> serviceInterface, final Method
final MethodAccess method = clsMeta.method(call.name());

Class<?> returnType = null;

Class<?> compType = null;
if (method.parameterTypes().length > 0) {
Type[] genericParameterTypes = method.getGenericParameterTypes();
ParameterizedType parameterizedType = genericParameterTypes.length > 0 ? (ParameterizedType) genericParameterTypes[0] : null;

Type type = ((parameterizedType != null ? parameterizedType.getActualTypeArguments().length : 0) > 0 ? (parameterizedType != null ? parameterizedType.getActualTypeArguments() : new Type[0])[0] : null);
if (method.returnType() == Promise.class) {

Type t0 = method.method().getGenericReturnType();
if (t0 instanceof ParameterizedType) {
ParameterizedType parameterizedType = ((ParameterizedType) t0);
Type type = ((parameterizedType != null ? parameterizedType.getActualTypeArguments().length : 0) > 0 ? (parameterizedType != null ? parameterizedType.getActualTypeArguments() : new Type[0])[0] : null);

if (type instanceof ParameterizedType) {
returnType = (Class) ((ParameterizedType) type).getRawType();
final Type type1 = ((ParameterizedType) type).getActualTypeArguments()[0];
if (type instanceof ParameterizedType) {
returnType = (Class) ((ParameterizedType) type).getRawType();
final Type type1 = ((ParameterizedType) type).getActualTypeArguments()[0];

if (type1 instanceof Class) {
compType = (Class) type1;
if (type1 instanceof Class) {
compType = (Class) type1;
}
} else if (type instanceof Class) {
returnType = (Class<?>) type;
}
} else if (type instanceof Class) {
returnType = (Class<?>) type;

}
} else {
if (method.parameterTypes().length > 0) {
Type[] genericParameterTypes = method.getGenericParameterTypes();
ParameterizedType parameterizedType = genericParameterTypes.length > 0 ? (ParameterizedType) genericParameterTypes[0] : null;

Type type = ((parameterizedType != null ? parameterizedType.getActualTypeArguments().length : 0) > 0 ? (parameterizedType != null ? parameterizedType.getActualTypeArguments() : new Type[0])[0] : null);

if (type instanceof ParameterizedType) {
returnType = (Class) ((ParameterizedType) type).getRawType();
final Type type1 = ((ParameterizedType) type).getActualTypeArguments()[0];

if (type1 instanceof Class) {
compType = (Class) type1;
}
} else if (type instanceof Class) {
returnType = (Class<?>) type;
}

}
}
final Class<?> actualReturnType = returnType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,8 @@ public BasicQueue(final String name,

if (this.batchSize == 1) {

if (queue instanceof LinkedTransferQueue) {
sendQueueSupplier = () -> new NoBatchSendQueue<>((LinkedTransferQueue<Object>) queue, this, name);
} else {
throw new IllegalStateException("If batch size 1 queue must be a linked transfer queue");
}
sendQueueSupplier = () -> new NoBatchSendQueue<>(queue, this, name);

} else if (queue instanceof LinkedTransferQueue) {

if (tryTransfer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.BlockingQueue;


public class NoBatchSendQueue<T> implements SendQueue<T> {


protected final LinkedTransferQueue<Object> queue;
protected final BlockingQueue<Object> queue;
protected final Queue<T> owner;
protected final String name;
private final Logger logger = LoggerFactory.getLogger(NoBatchSendQueue.class);
Expand All @@ -24,7 +24,7 @@ public class NoBatchSendQueue<T> implements SendQueue<T> {
protected int checkEveryStarted = 0;
protected int index;

public NoBatchSendQueue(final LinkedTransferQueue<Object> queue,
public NoBatchSendQueue(final BlockingQueue<Object> queue,
final Queue<T> owner,
final String name) {
this.queue = queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public EndpointServerBuilder setStatsCollector(StatsCollector statsCollector) {
public QueueBuilder getRequestQueueBuilder() {

if (requestQueueBuilder == null) {
requestQueueBuilder = QueueBuilder.queueBuilder();
requestQueueBuilder = QueueBuilder.queueBuilder().setArrayBlockingQueue().setBatchSize(100);
}

return requestQueueBuilder;
Expand All @@ -454,7 +454,7 @@ public EndpointServerBuilder setRequestQueueBuilder(QueueBuilder requestQueueBui

public QueueBuilder getWebResponseQueueBuilder() {
if (webResponseQueueBuilder == null) {
webResponseQueueBuilder = QueueBuilder.queueBuilder();
webResponseQueueBuilder = QueueBuilder.queueBuilder().setArrayBlockingQueue().setBatchSize(100);
}
return webResponseQueueBuilder;
}
Expand Down Expand Up @@ -620,7 +620,7 @@ public QueueBuilder getResponseQueueBuilder() {
if (responseQueueBuilder == null) {

if (responseQueue == null) {
responseQueueBuilder = QueueBuilder.queueBuilder();
responseQueueBuilder = QueueBuilder.queueBuilder().setArrayBlockingQueue().setBatchSize(100);
} else {


Expand Down Expand Up @@ -677,7 +677,8 @@ public ServiceEndpointServer build() {
getCallbackManager(),
getEventManager(),
getBeforeMethodSent(),
getBeforeMethodCallOnServiceQueue(), getAfterMethodCallOnServiceQueue());
getBeforeMethodCallOnServiceQueue(),
getAfterMethodCallOnServiceQueue());


final ServiceEndpointServer serviceEndpointServer = new ServiceEndpointServerImpl(getHttpServer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ default ServiceQueue startAll() {
return this;
}

default ServiceQueue startServiceQueue() {
return this;
}


default ServiceQueue start(boolean joinEventManager) {
return this;
}

/**
* Return a list of addresses.
*
Expand All @@ -129,14 +138,6 @@ <T> T createProxyWithAutoFlush(Class<T> serviceInterface, PeriodicScheduler peri
void flush();


default ServiceQueue startServiceQueue() {
return this;
}


default ServiceQueue start(boolean joinEventManager) {
return this;
}

Queue<MethodCall<Object>> requestQueue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,62 +711,11 @@ private <T> T proxy(final Class<T> serviceInterface,

validateInterface(serviceInterface);

// final String uuid = serviceInterface.getName() + "::" + UUID.randomUUID().toString();
if (!started.get()) {
logger.info("ServiceQueue::create(...), A proxy is being asked for a service that is not started ", name());
}
final InvocationHandler invocationHandler = new BoonInvocationHandlerForSendQueue(methodCallSendQueue,
serviceInterface, serviceInterface.getSimpleName(), beforeMethodSent);

//
// = new InvocationHandler() {
//
// private long messageId = 0;
// private long timestamp = Timer.timer().now();
// private int times = 10;
//
// @Override
// public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
//
// if (method.getName().equals("toString")) {
// return "PROXY OBJECT " + address();
// }
// if (method.getName().equals("clientProxyFlush")) {
// methodCallSendQueue.flushSends();
// return null;
// }
//
// if (method.getName().equals("stop")) {
// methodCallSendQueue.stop();
// return null;
// }
// messageId++;
// times--;
// if (times == 0) {
// timestamp = Timer.timer().now();
// times = 10;
// } else {
// timestamp++;
// }
//// if (beforeMethodSent == null) {
//// /** TODO LEFT OFF HERE. */
//// final MethodCallLocal call = new MethodCallLocal(method.getName(), uuid,
//// timestamp, messageId, args, null, null);
//// methodCallSendQueue.send(call);
//// } else {
// final String name = method.getName();
// MethodCallBuilder methodCallBuilder = MethodCallBuilder.methodCallBuilder()
// .setLocal(true).setAddress(name)
// .setName(name).setReturnAddress(uuid)
// .setTimestamp(timestamp).setId(messageId)
// .setBodyArgs(args);
// if (beforeMethodSent!=null) beforeMethodSent.beforeMethodSent(methodCallBuilder);
// final MethodCall<Object> call = methodCallBuilder.build();
// methodCallSendQueue.send(call);
//// }
// return null;
// }
// };
final Object o = Proxy.newProxyInstance(serviceInterface.getClassLoader(),
new Class[]{serviceInterface, ClientProxy.class}, invocationHandler
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@

/**
* Manages a collection of services.
*
* NOTE NOTE NOTE NOTE.
* NOTE if you want to debug why a method is not getting called but a break point on {@code doCall}.
*/
public class ServiceBundleImpl implements ServiceBundle {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import io.advantageous.qbit.service.ServiceProxyUtils;
import io.advantageous.qbit.time.Duration;
import io.advantageous.qbit.util.TestTimer;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Optional;
Expand All @@ -17,6 +19,7 @@

import static org.junit.Assert.*;

@Ignore
public class LocalKeyValueStoreServiceTest {

KeyValueStoreService<Todo> kvStore;
Expand All @@ -40,6 +43,12 @@ public void setup() {

Sys.sleep(100);

}

@After
public void after() {


}

public void setupWithDebug() {
Expand Down Expand Up @@ -336,6 +345,7 @@ public void testTimeoutWithPutConfirmation() throws InterruptedException {

timer.seconds(6);

Sys.sleep(1000);

final Optional<Todo> todoOptional2 = getTodoForKey("testKey2");
assertFalse(todoOptional2.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void testSendTooMany() throws Exception {
.setEnqueueTimeout(1)
.setName("unable to send")
.setSize(10)
.setBatchSize(10)
.setBatchSize(10).setArrayBlockingQueue()
.build();

final SendQueue<Object> sendQueue = queue.sendQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ public void before() {
empURI = URI.create("marathon://default/employeeService?env=staging");


serviceQueue = ServiceBuilder.serviceBuilder().setServiceObject(impl).buildAndStartAll();
serviceBundle = ServiceBundleBuilder.serviceBundleBuilder().build();
ServiceBuilder serviceBuilder = ServiceBuilder.serviceBuilder();
serviceBuilder.getRequestQueueBuilder().setArrayBlockingQueue().setBatchSize(10);

serviceQueue = serviceBuilder.setServiceObject(impl).buildAndStartAll();

ServiceBundleBuilder serviceBundleBuilder = ServiceBundleBuilder.serviceBundleBuilder();
serviceBundleBuilder.getRequestQueueBuilder().setArrayBlockingQueue().setBatchSize(10);
serviceBundle = serviceBundleBuilder.build();
serviceBundle.addServiceObject("myservice", impl);
serviceQueue2 = ServiceBuilder.serviceBuilder().setInvokeDynamic(false).setServiceObject(impl)
.buildAndStartAll();
Expand Down
Loading

0 comments on commit c15a9d5

Please sign in to comment.