Skip to content

Commit

Permalink
Merge pull request #1 from id-regis/upgrade-qbit-1.10.0.RELEASE
Browse files Browse the repository at this point in the history
Upgrade to qbit 1.10.0.RELEASE
  • Loading branch information
RichardHightower authored Jun 18, 2016
2 parents 6e25de3 + ba7170b commit f8b52ab
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 26 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

ext {
projectVersion = '1.1.0.RELEASE'
projectVersion = '1.10.0.RELEASE'
boonVersion = '0.5.7'
boonGroup = "io.advantageous.boon"
springFrameworkVersion = '4.2.5.RELEASE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import static io.advantageous.qbit.eventbus.EventBusRemoteReplicatorBuilder.eventBusRemoteReplicatorBuilder;
import static io.advantageous.qbit.eventbus.EventBusReplicationClientBuilder.eventBusReplicationClientBuilder;
Expand Down Expand Up @@ -88,8 +85,7 @@ public EventBusCluster(final EventManager eventManager,

this.servicePool = new ServicePool(eventBusName, null);


endpointDefinition = serviceDiscovery.registerWithTTL(eventBusName, replicationPortLocal,
endpointDefinition = serviceDiscovery.registerWithTTL(eventBusName, replicationHostLocal, replicationPortLocal,
(int) replicationServerCheckInTimeUnit.toSeconds(replicationServerCheckInInterval));

serviceDiscovery.checkInOk(endpointDefinition.getId());
Expand Down Expand Up @@ -136,22 +132,19 @@ private EventManager createEventManager() {
EventManager.class, periodicScheduler, 100, TimeUnit.MILLISECONDS);
}


public EventManager createClientEventManager() {
return eventServiceQueue().createProxy(EventManager.class);
}

@Override
public void start() {


if (eventServiceQueue != null) {
eventServiceQueue.start();
}

startServerReplicator();


healthyNodeMonitor = periodicScheduler.repeat(
this::healthyNodeMonitor, peerCheckTimeInterval, peerCheckTimeUnit);

Expand All @@ -168,10 +161,8 @@ private void checkInWithServiceDiscoveryHealth() {
serviceDiscovery.checkInOk(endpointDefinition.getId());
}


private void startServerReplicator() {


final EventBusRemoteReplicatorBuilder replicatorBuilder = eventBusRemoteReplicatorBuilder();
replicatorBuilder.setName(this.eventBusName);
replicatorBuilder.serviceServerBuilder().setPort(replicationPortLocal);
Expand All @@ -184,10 +175,8 @@ private void startServerReplicator() {
serviceEndpointServerForReplicator.start();
}


private void healthyNodeMonitor() {


if (debug) logger.debug("EventBusCluster::healthyNodeMonitor " + eventConnectorHub.size());
final List<EndpointDefinition> endpointDefinitions = serviceDiscovery.loadServices(eventBusName);
final List<EndpointDefinition> removeNodes = new ArrayList<>();
Expand All @@ -212,7 +201,6 @@ public void servicePoolChanged(final String serviceName) {
public void serviceAdded(String serviceName, EndpointDefinition endpointDefinition) {
if (serviceName.equals(eventBusName)) {


if (replicationHostLocal.equals(endpointDefinition.getHost()) &&
replicationPortLocal == endpointDefinition.getPort()) {
if (debug)
Expand Down Expand Up @@ -245,7 +233,6 @@ public void serviceRemoved(String serviceName, EndpointDefinition endpointDefini

});


if (change.get()) {
if (removeNodes.size() > 0) {
removeServices(removeNodes);
Expand All @@ -254,10 +241,8 @@ public void serviceRemoved(String serviceName, EndpointDefinition endpointDefini
removeBadServices();
}


}


private void addEventConnector(final String newHost, final int newPort) {

if (info) logger.info(Str.sputs("Adding new event connector for",
Expand All @@ -283,7 +268,6 @@ private int removeServices(List<EndpointDefinition> removeServicesList) {
while (listIterator.hasNext()) {
final EventConnector connector = listIterator.next();


/** Remove ones in the removeServicesList. */
if (connector instanceof RemoteTCPClientProxy) {

Expand All @@ -309,11 +293,9 @@ private int removeServices(List<EndpointDefinition> removeServicesList) {

connectorsToRemove.forEach(eventConnectorHub::remove);


return removeCount;
}


private int removeBadServices() {
final ListIterator<EventConnector> listIterator = eventConnectorHub.listIterator();

Expand All @@ -323,7 +305,6 @@ private int removeBadServices() {
while (listIterator.hasNext()) {
final EventConnector connector = listIterator.next();


/** Remove connections that are closed. */
if (connector instanceof RemoteTCPClientProxy) {

Expand All @@ -341,7 +322,6 @@ private int removeBadServices() {
continue;
}


}

}
Expand Down Expand Up @@ -392,6 +372,5 @@ public void stop() {
logger.warn("EventBusCluster is unable to stop eventServiceQueue");
}


}
}

0 comments on commit f8b52ab

Please sign in to comment.