Skip to content

Commit

Permalink
Merge branch 'worker'
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Bobukh committed Jul 31, 2014
2 parents a2eacd0 + acdebb7 commit dc85a22
Show file tree
Hide file tree
Showing 50 changed files with 3,979 additions and 203 deletions.
73 changes: 73 additions & 0 deletions cocaine-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Cocaine Client

## Usage Example

```java
package example;

import cocaine.Locator;
import cocaine.Service;
import org.apache.log4j.Logger;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;

/**
* @author Anton Bobukh <abobukh@yandex-team.ru>
*/
public class Example {

private static final Logger logger = Logger.getLogger(Example.class);

public static void main(String[] args) throws Exception {
try (Locator locator = Locator.create()) {

Service echo = locator.service("echo");
Observable<byte[]> response = echo.invoke("enqueue", "invoke", "10".getBytes());

Observable<String> strings = response.map(new Func1<byte[], String>() {
@Override
public String call(byte[] bytes) {
return new String(bytes);
}
}).doOnNext(new Action1<String>() {
@Override
public void call(String value) {
logger.info("Received: " + value);
}
});

long max = strings.take(5).map(new Func1<String, Long>() {
@Override
public Long call(String value) {
return Long.parseLong(value);
}
}).reduce(new Func2<Long, Long, Long>() {
@Override
public Long call(Long max, Long current) {
return Math.max(max, current);
}
}).toBlocking().single();
logger.info("Max: " + max);

String aggregated = strings.skip(5).filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String value) {
return value.length() < 2;
}
}).reduce("", new Func2<String, String, String>() {
@Override
public String call(String result, String current) {
return result + " " + current;
}
}).toBlocking().single().trim();
logger.info("Aggregated: " + aggregated);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

}

```
66 changes: 66 additions & 0 deletions cocaine-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>ru.yandex</groupId>
<artifactId>cocaine</artifactId>
<version>0.11.1.0-SNAPSHOT</version>
</parent>

<groupId>ru.yandex.cocaine</groupId>
<artifactId>cocaine-client</artifactId>
<version>0.11.1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Cocaine-Client</name>
<description>Client for Cocaine Application Engine.</description>

<prerequisites>
<maven>3.0</maven>
</prerequisites>

<properties>
<test.include>*Test.java</test.include>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>cocaine-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-core</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void close() {
private ServiceInfo resolve(String name) {
logger.info("Resolving service info for " + name);
try {
byte[] result = service.invoke("resolve", name).toBlockingObservable().single();
byte[] result = service.invoke("resolve", name).toBlocking().single();
return pack.read(result, ServiceInfoTemplate.create(name));
} catch (Exception e) {
throw new LocatorResolveException(name, endpoint, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import cocaine.netty.ServiceMessageHandler;
import com.google.common.base.Joiner;
Expand All @@ -22,20 +23,22 @@
/**
* @author Anton Bobukh <abobukh@yandex-team.ru>
*/
public class Service {
public class Service implements AutoCloseable {

private static final Logger logger = Logger.getLogger(Service.class);

private final String name;
private final ServiceApi api;
private final Sessions sessions;

private AtomicBoolean closed;
private Channel channel;

private Service(String name, ServiceApi api, Bootstrap bootstrap, Supplier<SocketAddress> endpoint) {
this.name = name;
this.sessions = new Sessions(name);
this.api = api;
this.closed = new AtomicBoolean(false);
connect(bootstrap, endpoint, new ServiceMessageHandler(name, sessions));
}

Expand All @@ -54,7 +57,14 @@ public Observable<byte[]> invoke(String method, List<Object> args) {
int requestedMethod = api.getMethod(method);
channel.write(new InvocationRequest(requestedMethod, session.getId(), args));

return session.getObservable();
return session.getInput();
}

@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
channel.close();
}
}

@Override
Expand All @@ -70,11 +80,11 @@ private void connect(final Bootstrap bootstrap, final Supplier<SocketAddress> en
channel.pipeline().addLast(handler);
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
public void operationComplete(final ChannelFuture future) throws Exception {
future.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
if (!bootstrap.group().isShuttingDown()) {
if (!closed.get() && !bootstrap.group().isShuttingDown()) {
connect(bootstrap, endpoint, handler);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ public class ServiceErrorException extends ServiceException {

private final int code;

public ServiceErrorException(String serviceName, String message, int code) {
super(serviceName, code + " - " + message);
public ServiceErrorException(String service, String message, int code) {
super(service, code + " - " + message);
this.code = code;
}

Expand Down
19 changes: 19 additions & 0 deletions cocaine-client/src/main/java/cocaine/ServiceException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package cocaine;

/**
* @author Anton Bobukh <abobukh@yandex-team.ru>
*/
public class ServiceException extends CocaineException {

private final String service;

public ServiceException(String service, String message) {
super(service + " - " + message);
this.service = service;
}

public String getService() {
return service;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public final class Sessions {
private static final Logger logger = Logger.getLogger(Sessions.class);

private final AtomicLong counter;
private final Map<Long, Observer<byte[]>> sessions;
private final Map<Long, Session> sessions;
private final String service;

public Sessions(String service) {
Expand All @@ -33,35 +33,36 @@ public Session create() {
Subject<byte[], byte[]> subject = ReplaySubject.create();

logger.debug("Creating new session: " + id);
sessions.put(id, subject);
return new Session(id, subject);
Session session = new Session(id, subject);
sessions.put(id, session);
return session;
}

public void onChunk(long id, byte[] chunk) {
Observer<byte[]> session = sessions.get(id);
Session session = sessions.get(id);
if (session != null) {
logger.debug("Pushing new chunk " + Arrays.toString(chunk) + " to session " + id);
session.onNext(chunk);
session.input.onNext(chunk);
} else {
logger.warn("Session " + id + " does not exist");
}
}

public void onCompleted(long id) {
Observer<byte[]> session = sessions.remove(id);
Session session = sessions.remove(id);
if (session != null) {
logger.debug("Closing session " + id);
session.onCompleted();
session.input.onCompleted();
} else {
logger.warn("Session " + id + " does not exist");
}
}

public void onError(long id, ServiceException exception) {
Observer<byte[]> session = sessions.remove(id);
Session session = sessions.remove(id);
if (session != null) {
logger.debug("Setting error " + exception.getMessage() + " for session " + id);
session.onError(exception);
session.input.onError(exception);
} else {
logger.warn("Session " + id + " does not exist");
}
Expand All @@ -84,19 +85,19 @@ public void onError(ServiceException exception) {
public static final class Session {

private final long id;
private final Observable<byte[]> observable;
private final Subject<byte[], byte[]> input;

private Session(long id, Observable<byte[]> observable) {
private Session(long id, Subject<byte[], byte[]> input) {
this.id = id;
this.observable = observable;
this.input = input;
}

public long getId() {
return id;
}

public Observable<byte[]> getObservable() {
return observable;
public Observable<byte[]> getInput() {
return input;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ public class UnexpectedMessageException extends ServiceException {

private final Message msg;

public UnexpectedMessageException(String serviceName, Message msg) {
super(serviceName, "Unexpected message: " + msg.toString());
public UnexpectedMessageException(String service, Message msg) {
super(service, "Unexpected message: " + msg.toString());
this.msg = msg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
*/
public class UnknownMethodException extends ServiceException {

public UnknownMethodException(String serviceName, String method) {
super(serviceName, "Unknown service method: " + method);
public UnknownMethodException(String service, String method) {
super(service, "Unknown service method: " + method);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import cocaine.message.ChunkMessage;
import cocaine.message.ErrorMessage;
import cocaine.message.Message;
import cocaine.message.MessageType;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.log4j.Logger;
Expand Down
Loading

0 comments on commit dc85a22

Please sign in to comment.