Skip to content

Boon QBit remote Websocket proxy and how to integrate QBit and Vertx

RichardHightower edited this page Oct 3, 2014 · 1 revision

It is easy to integration Boon QBit and Vertx Websocket. Boon will even create the client proxies.

Here is a very small example in Vertx.

package org.boon.qbit.vertx.integration.server;

import org.boon.qbit.vertx.integration.model.EmployeeManagerImpl;
import org.qbit.QBit;
import org.qbit.message.MethodCall;
import org.qbit.message.Response;
import org.qbit.queue.ReceiveQueue;
import org.qbit.service.ServiceBundle;
import org.qbit.spi.ProtocolEncoder;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.http.HttpServer;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.http.ServerWebSocket;
import org.vertx.java.platform.Verticle;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.boon.Boon.puts;

public class QBitVerticle extends Verticle {

    private ServiceBundle serviceBundle;

    private  HttpServer httpServer;

    private ReceiveQueue<Response<Object>> responses;

    private ProtocolEncoder encoder;


    private Map<String, ServerWebSocket> webSocketMap = new ConcurrentHashMap<>();

    public void start() {


        container.logger().info("QBitVerticle started");

        serviceBundle = QBit.factory().createBundle("/services");

        serviceBundle.addService("/employeeService", new EmployeeManagerImpl());

        encoder = QBit.factory().createEncoder();

        httpServer = vertx.createHttpServer();
        httpServer.setTCPKeepAlive(true);
        httpServer.setTCPNoDelay(true);
        httpServer.setSoLinger(0);
        httpServer.setMaxWebSocketFrameSize(100_000_000);


        httpServer.websocketHandler(new Handler<ServerWebSocket>() {
            @Override
            public void handle(ServerWebSocket event) {

                puts("GOT CONNECTION", event.path(), event.uri(), serviceBundle.address());

                if (event.uri().startsWith(serviceBundle.address())) {
                    handleWebSocket(event);
                }
            }
        }).requestHandler(new Handler<HttpServerRequest>() {
            @Override
            public void handle(HttpServerRequest event) {

                event.response().end("pong\n");
            }
        });

        httpServer.listen(8080);

        vertx.setPeriodic(50, new Handler<Long>() {
            @Override
            public void handle(Long event) {
                handleServiceBundleFlush();
            }
        });

        vertx.setPeriodic(5, new Handler<Long>() {
            @Override
            public void handle(Long event) {
                drainServiceQueue();
            }
        });


        responses = serviceBundle.responses();



    }

    private void drainServiceQueue() {
        final Iterable<Response<Object>> responsesBatch = responses.readBatch();

        for (Response<Object> response : responsesBatch) {
            final ServerWebSocket serverWebSocket = webSocketMap.get(response.returnAddress());

            if (serverWebSocket != null) {
                String responseAsText = encoder.encodeAsString(response);
                serverWebSocket.writeTextFrame(responseAsText);
            }
        }

    }


    private  void handleWebSocket(final ServerWebSocket websocket) {

        websocket.dataHandler(new Handler<Buffer>() {
            @Override
            public void handle(Buffer event) {
                handleWebSocketData(websocket, event.toString());
            }
        });

        websocket.closeHandler(new Handler<Void>() {
            @Override
            public void handle(Void event) {

                handleWebSocketClosed(websocket);
            }
        });

    }

    private  void handleWebSocketClosed(ServerWebSocket websocket) {

    }

    private  void handleWebSocketData(ServerWebSocket websocket, String message) {


        final MethodCall<Object> methodCall = QBit.factory().createMethodCallToBeParsedFromBody(websocket.remoteAddress().toString(), message);
        serviceBundle.call(methodCall);

        puts("Websocket data", methodCall.returnAddress(), websocket, message);

        webSocketMap.put(methodCall.returnAddress(), websocket);


    }


    private void handleServiceBundleFlush() {
        serviceBundle.flushSends();
    }

}

Once you have a service bundle, you can expose any number of services to vertx for both HTTP and WebSocket development.

Here is a sample service that we exposed above.

package org.boon.qbit.vertx.integration.model;

import org.boon.Lists;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class EmployeeManagerImpl implements EmployeeManager {


    Map<Long, Employee> employeeMap = new HashMap<>();

    @Override
    public void addEmployee(Employee employee) {

        employeeMap.put(employee.getEmployeeId(), employee);
    }

    @Override
    public List<Employee> list() {
        return Lists.list(employeeMap.values());
    }
}

Which implements this interface

package org.boon.qbit.vertx.integration.model;

import java.util.List;

/**
 * Created by Richard on 10/2/14.
 */
public interface EmployeeManager {
    void addEmployee(Employee employee);

    List<Employee> list();
}

Which uses this class

package org.boon.qbit.vertx.integration.model;

/**
 * Created by Richard on 10/2/14.
 */
public class Employee {

    private  String firstName;
    private  String lastName;
    private  int salary;
    private  long employeeId;


    public Employee() {

    }

    public Employee(String firstName, String lastName, int salary, long employeeId) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.salary = salary;
        this.employeeId = employeeId;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public int getSalary() {
        return salary;
    }

    public long getEmployeeId() {
        return employeeId;
    }

    @Override
    public String toString() {
        return "Employee{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", salary=" + salary +
                ", employeeId=" + employeeId +
                '}';
    }
}

Once you have a class defined, and it is bound into Vertx, it is easy to write clients.

package org.boon.qbit.vertx.integration.client;

import org.boon.Boon;
import org.boon.core.Sys;
import org.boon.core.reflection.MapObjectConversion;
import org.boon.qbit.vertx.QBitClient;
import org.boon.qbit.vertx.integration.model.Employee;
import org.boon.qbit.vertx.integration.model.EmployeeManager;
import org.qbit.QBit;
import org.qbit.message.Response;
import org.qbit.queue.ReceiveQueue;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.VertxFactory;

import java.util.List;
import java.util.Map;

import static org.boon.Boon.puts;

/**
 * Created by Richard on 10/2/14.
 */
public class QBitClientMain {



    public static void main (String... args) throws InterruptedException {


        /* Create a new instance of Vertx. */
        Vertx vertx = VertxFactory.newVertx();


        final QBitClient qBitClient = new QBitClient("localhost", 8080, "/services", vertx);

        final EmployeeManager remoteProxy = qBitClient.createProxy(EmployeeManager.class,
                "employeeService");


        remoteProxy.addEmployee(new Employee("Rick", "Hightower", 10, 1L));


        remoteProxy.list();

        final ReceiveQueue<String> receiveQueue = qBitClient.receiveQueue();

        Sys.sleep(1000);

        final String message = receiveQueue.pollWait();

        puts(message);

        final Response<Object> response = QBit.factory().createResponse(message);

        final List<Employee> employees = MapObjectConversion.convertListOfMapsToObjects(Employee.class, (List<Map>) response.body());


        puts(employees);

        Boon.gets();

    }




}

Calling the proxy, sends a websocket call to the backend.

If you do not like using the queue. You can define a proxy interface that has an async handler like so:

package org.boon.qbit.vertx.integration.client;

import org.boon.core.Handler;
import org.boon.qbit.vertx.integration.model.Employee;

import java.util.List;

public interface EmployeeManagerProxy {

    void addEmployee(Employee employee);

    void list(Handler<List<Employee>> employees );
}
package org.boon.qbit.vertx.integration.client;

import org.boon.Boon;
import org.boon.core.Handler;
import org.boon.qbit.vertx.QBitClient;
import org.boon.qbit.vertx.integration.model.Employee;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.VertxFactory;

import java.util.List;

import static org.boon.Boon.puts;

/**
 * Created by Richard on 10/3/14.
 */
public class QBitClientMain2 {

    public static void main(String... args) throws InterruptedException {


        /* Create a new instance of Vertx. */
        Vertx vertx = VertxFactory.newVertx();


        final QBitClient qBitClient = new QBitClient("localhost", 8080, "/services", vertx);

        qBitClient.startReturnProcessing();

        final EmployeeManagerProxy remoteProxy = qBitClient.createProxy(EmployeeManagerProxy.class,
                "employeeService");


        remoteProxy.addEmployee(new Employee("Rick", "Hightower", 10, 1L));


        remoteProxy.list(new Handler<List<Employee>>() {
            @Override
            public void handle(List<Employee> employees) {
                puts(employees);

            }
        });

        Boon.gets();

    }


}
Clone this wiki locally