-
Notifications
You must be signed in to change notification settings - Fork 140
[Doc] QBit Java microservice lib introducing EventBus replication and EventBus connectors
QBit has an event bus.
It is useful for services to listen to events. Events are treated like method calls and that they can be handled by the same thread that handles the service method calls so modification of the state of the service is handled in one thread negating the need for complex thread management and synchronization.
If you are new to QBit events please start here:
- QBit Event Bus more detailed example
- QBit Microservice working with a private event bus
- QBit Java Microservice lib using your own interface to the event bus.
Then come back to this more advanced topic and for some background on the QBit event bus.
QBit now has the ability to send events over a remote connection.
Every event manager EventManager
talks to an EventBus
which talks to a series of Channels
. Channels
are like topics and they manage a collection of listeners. There are adapters that can move an event from a Channel
to a ServiceQueue
.
To support integration with other event busses and remote event busses we added the concept of an EventConnector
. The EventBus
sends all events to its EventConnector
. The default EventConnector
is a no-op EventConnector
. You can plug in additional event connection with the EventConnector
.
The EventConnector
interface is a simple one.
package io.advantageous.qbit.events.spi;
import io.advantageous.qbit.service.ServiceFlushable;
public interface EventConnector extends ServiceFlushable{
/** Forwards the event.
* @param event event
*/
void forwardEvent(EventTransferObject<Object> event);
default void flush() {
}
}
##EventManagerBuilder
Since constructing an event manager is more complicated now, we added an EventManager builder which allows you to specify Predicates which then determine if an event should be replicated by the event connector.
####Using Event manager builder
EventManagerBuilder eventManagerBuilder = eventManagerBuilder();
eventManagerBuilder.setEventConnector(eventConnector);
eventManagerBuilder.addEventConnectorPredicate(objectEvent -> apples.isColor("Red") );
eventManagerBuilder.addEventConnectorPredicate(objectEvent -> day.is("Tuesday") );
EventManager build = eventManagerBuilder.build();
We also added another SPI class for events that implements the Event interface.
####Event Transfer Object for sending events
public class EventTransferObject<T> implements Event<T> {
This is a concrete class that gets passed for connecting events.
We also added a utility class for connecting event managers easily.
####SimpleEventConnector for connecting EventManagers.
public class SimpleEventConnector implements EventConnector {
private final EventManager eventManager;
public SimpleEventConnector(EventManager eventManager) {
this.eventManager = eventManager;
}
@Override
public void forwardEvent(EventTransferObject<Object> event) {
this.eventManager.forwardEvent(event);
}
@Override
public void flush() {
flushServiceProxy(eventManager);
}
}
One can imagine using the predicate of the builder and SimpleEventConnector to channel events to certain event managers.
##Remote Event Bus replication and connection
None of this is required for the simple cases.
There is now a subproject that provides remote event replication support: qbit-eventbus-replicator
.
You can use this to replicate events:
Let's say that you have two event busses:
####EventManager A
/** Two event managers A and B. Event on A gets replicated to B. */
EventManager eventManagerA;
####EventManager B who will listen remotely to A
EventManager eventManagerB;
You want events that happen on A to be transmitted to B. Now A and B are on two different server instances on two different JVMs.
Let's setup our EventB service bus.
####EventManager B Start up
EventManagerBuilder eventManagerBuilderB = new EventManagerBuilder();
/** Build B. */
EventManager eventManagerBImpl = eventManagerBuilderB.build();
Create a service bundle and add event manager B to it.
####EventManager B Start up 2
serviceBundle = serviceBundleBuilder().build(); //build service bundle
serviceBundle.addServiceObject("eventManagerB", eventManagerBImpl);
Create a service queue client proxy to B and register for events.
####EventManager B create service queue proxy and listen for an event foo.bar
eventManagerB = serviceBundle.createLocalProxy(EventManager.class, "eventManagerB");
eventManagerB.register("foo.bar", event -> body.set(event.body()));
Now we want B to listen to calls from a remote event bus. To do this we use a EventRemoteReplicatorService
which is a service that is easy to expose via WebSockets/REST for handing remote event calls.
To make EventRemoteReplicatorService
easy to use, we added a EventBusRemoteReplicatorBuilder
.
####Using EventRemoteReplicatorService via EventBusRemoteReplicatorBuilder
EventBusRemoteReplicatorBuilder replicatorBuilder = eventBusRemoteReplicatorBuilder();
replicatorBuilder.endpointServerBuilder().setPort(9097);
replicatorBuilder.setEventManager(eventManagerB);
ServiceEndpointServer
ServiceEndpointServer
ServiceEndpointServer serviceServer = replicatorBuilder.build();
You can pass EventBusRemoteReplicatorBuilder
a endpointServerBuilder or it will construct one.
Notice above we call
####Telling EventRemoteReplicatorService which event bus to replicate to
replicatorBuilder.setEventManager(eventManagerB);
This connects the eventManagerB
to the EventRemoteReplicatorService
which is exposed via WebSockets to handle remote calls.
EventRemoteReplicatorService
is a EventConnector
. This means when we start up eventManagerA
on the other server, all we have to do is pass the proxy client to EventRemoteReplicatorService
into eventManagerA
and all the events from A will be replicated to eventManagerB
.
To connect A to B, we need a WebSocket QBit proxy client to the replication running in the B server. To do this we provided a utility builder class called EventBusReplicationClientBuilder
as follows:
####Creating an EventConnector that talks to the EventRemoteReplicatorService running on B
EventConnector replicatorClient;
...
EventBusReplicationClientBuilder clientReplicatorBuilder = eventBusReplicationClientBuilder();
clientReplicatorBuilder.clientBuilder().setPort(9097);
Client client = clientReplicatorBuilder.build();
replicatorClient = clientReplicatorBuilder.build(client);
Calls to the replicatorClient
get sent to the remote replicator running in server B.
Now we just wire the A event bus to call the replicatorClient.
####Wiring up event bus A
/* Create A that connects to the replicator client. */
EventManager eventManagerAImpl =
eventManagerBuilderA.setEventConnector(replicatorClient).build();
serviceBundleA = serviceBundleBuilder().build(); //build service bundle
/* Wire in the bus. */
serviceBundleA.addServiceObject("eventManagerA", eventManagerAImpl);
/* Create a proxy client so we can use it over the service queue. */
eventManagerA = serviceBundleA.createLocalProxy(EventManager.class, "eventManagerA");
Now we can listen on B running on another machine:
####Listening on B
eventManagerB.register("foo.bar", event -> handleEvent(event.body()));
And when events get sent to A, they are also sent to B.
####Sending on A
eventManagerA.send("foo.bar", "hello");
We just created a listener running on a remote server.
Here is a complete example:
package io.advantageous.qbit.events.impl;
import io.advantageous.qbit.client.Client;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.events.EventManagerBuilder;
import io.advantageous.qbit.events.spi.EventConnector;
import io.advantageous.qbit.server.ServiceEndpointServer
ServiceEndpointServer
ServiceEndpointServer;
import io.advantageous.qbit.service.ServiceBundle;
import io.advantageous.qbit.service.ServiceProxyUtils;
import io.advantageous.qbit.test.TimedTesting;
import org.boon.core.Sys;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
import static io.advantageous.qbit.events.impl.EventBusRemoteReplicatorBuilder.eventBusRemoteReplicatorBuilder;
import static io.advantageous.qbit.events.impl.EventBusReplicationClientBuilder.eventBusReplicationClientBuilder;
import static io.advantageous.qbit.service.ServiceBundleBuilder.serviceBundleBuilder;
import static org.junit.Assert.assertEquals;
/**
* @author rhightower
*/
public class EventManagerReplicationOverWebSocket extends TimedTesting {
EventConnector replicatorClient;
ServiceBundle serviceBundleB;
ServiceBundle serviceBundleA;
@Test
public void test() {
/** Two event managers A and B. Event on A gets replicated to B. */
EventManager eventManagerA;
EventManager eventManagerB;
EventManagerBuilder eventManagerBuilderA = new EventManagerBuilder();
EventManagerBuilder eventManagerBuilderB = new EventManagerBuilder();
/** Build B. */
EventManager eventManagerBImpl = eventManagerBuilderB.build();
serviceBundleB = serviceBundleBuilder().build(); //build service bundle
serviceBundleB.addServiceObject("eventManagerB", eventManagerBImpl);
eventManagerB = serviceBundleB.createLocalProxy(EventManager.class, "eventManagerB"); //wire B to Service Bundle
EventBusRemoteReplicatorBuilder replicatorBuilder = eventBusRemoteReplicatorBuilder();
replicatorBuilder.endpointServerBuilder().setPort(9097);
replicatorBuilder.setEventManager(eventManagerB);
ServiceEndpointServer
ServiceEndpointServer
ServiceEndpointServer serviceServer = replicatorBuilder.build();
EventBusReplicationClientBuilder clientReplicatorBuilder = eventBusReplicationClientBuilder();
clientReplicatorBuilder.clientBuilder().setPort(9097);
Client client = clientReplicatorBuilder.build();
replicatorClient = clientReplicatorBuilder.build(client);
serviceServer.start();
client.start();
/* Create A that connects to the replicator client. */
EventManager eventManagerAImpl = eventManagerBuilderA.setEventConnector(replicatorClient).build();
serviceBundleA = serviceBundleBuilder().build(); //build service bundle
serviceBundleA.addServiceObject("eventManagerA", eventManagerAImpl);
eventManagerA = serviceBundleA.createLocalProxy(EventManager.class, "eventManagerA"); //wire A to Service Bundle
serviceBundleA.start();
serviceBundleB.start();
final AtomicReference<Object> body = new AtomicReference<>();
eventManagerB.register("foo.bar", event -> body.set(event.body()));
eventManagerA.send("foo.bar", "hello");
ServiceProxyUtils.flushServiceProxy(eventManagerA);
waitForTrigger(20, o -> body.get()!=null);
assertEquals("hello", body.get());
serviceBundleA.stop();
serviceBundleB.stop();
client.stop();
Sys.sleep(100);
serviceServer.stop();
}
}
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting