Skip to content

Commit

Permalink
obj_storage
Browse files Browse the repository at this point in the history
  • Loading branch information
ThilinaManamgoda committed Sep 20, 2017
1 parent 6566422 commit 4e485e5
Show file tree
Hide file tree
Showing 28 changed files with 316 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class ConfigConstants {
public static final String SYS_SERVICE_CONNECTIONS_CONNECTION_HOST = "sys-service.connections.connection.host";
public static final String SYS_SERVICE_CONNECTIONS_CONNECTION_PORT = "sys-service.connections.connection.port";
public static final String SYS_SERVICE_CONNECTIONS_PATH = "sys-service.connections.path";
public static final String SYS_SERVICE_CONNECTIONS_PROTOCOL ="sys-service.connections.protocol" ;
public static final String SYS_SERVICE_CONNECTIONS_PROTOCOL = "sys-service.connections.protocol";

//etcd
public static final String ETCD_CLUSTER_CONNECTIONS_URL = "etcd-cluster.connections.url";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public class Server implements Runnable {
public static final boolean ENABLE_SSL = Launcher.getBoolean(ConfigConstants.TRANSPORT_SSL_CONFIG_ENABLED);



@Override
public void run() {
logger.info("Starting Http Transport Service !");
Expand All @@ -31,10 +30,10 @@ public void run() {
EventLoopGroup remoteHostEventLoopGroup = new NioEventLoopGroup();


if(ENABLE_SSL){
if (ENABLE_SSL) {
//Load SSL certs
SSLHandlerProvider.initSSLContext();
}else {
} else {
logger.info("SSL is not enabled !");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();


if(Server.ENABLE_SSL){
if (Server.ENABLE_SSL) {
SslHandler sslHandler = SSLHandlerProvider.getSSLHandler();
channelPipeline.addFirst(sslHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ public class SysServiceHostResolveHandler extends ChannelInboundHandlerAdapter {
private final static String HOST = "Host";
private static final String SYS_HOST = Launcher.getStringList(ConfigConstants.SYS_SERVICE_CONNECTIONS_CONNECTION_HOST).get(0);
private static final int SYS_PORT = Launcher.getIntList(ConfigConstants.SYS_SERVICE_CONNECTIONS_CONNECTION_PORT).get(0);
private static final String SYS_PATH=Launcher.getString(ConfigConstants.SYS_SERVICE_CONNECTIONS_PATH);
private static final String SYS_PATH = Launcher.getString(ConfigConstants.SYS_SERVICE_CONNECTIONS_PATH);
private static final String SYS_PROTOCOL = Launcher.getString(ConfigConstants.SYS_SERVICE_CONNECTIONS_PROTOCOL);
Channel remoteHostChannel = null;
EventLoopGroup remoteHostEventLoopGroup;
String instanceID;

public SysServiceHostResolveHandler(EventLoopGroup remoteHostEventLoopGroup) {
this.remoteHostEventLoopGroup = remoteHostEventLoopGroup;
}
Expand Down Expand Up @@ -58,7 +59,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
instanceID= request.headers().get("domain");
instanceID = request.headers().get("domain");

EtcdUtil.getValue(instanceID).thenAccept(x -> {

Expand All @@ -70,7 +71,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
requestIp();
} else if (stateImpl.getState() == InstanceStates.RUNNING) {
logger.info("These instances are up and running");
String remoteIp= LoadBalanceUtil.getRemoteHost(stateImpl);
String remoteIp = LoadBalanceUtil.getRemoteHost(stateImpl);
try {
EtcdUtil.putValue("localhost", StateImplJsonHelp.toString(stateImpl));
} catch (EtcdClientException e) {
Expand All @@ -89,13 +90,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ctx.fireChannelRead(msg);
}

private void requestIp( ) {
private void requestIp() {
// Prepare the HTTP request.
HttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, getURI());
request.headers().set(HttpHeaderNames.HOST, SYS_HOST);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
request.headers().set("domain",instanceID);
request.headers().set("domain", instanceID);

// Send the HTTP request.
remoteHostChannel.writeAndFlush(request);
Expand All @@ -107,19 +108,21 @@ private void requestIp( ) {
}
logger.info("Request sent to the System Service");
}
private String getURI(){

String url=null;
private String getURI() {

String url = null;
try {
URI uri=new URI(SYS_PROTOCOL,null,SYS_HOST,SYS_PORT,SYS_PATH,null,null);
url=uri.toURL().toString();
URI uri = new URI(SYS_PROTOCOL, null, SYS_HOST, SYS_PORT, SYS_PATH, null, null);
url = uri.toURL().toString();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (MalformedURLException e) {
e.printStackTrace();
}
return url;
}

private final class CustomListener implements ChannelFutureListener {
private Channel mainChannel;

Expand All @@ -130,7 +133,7 @@ private final class CustomListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
logger.info("connected to the System service: " +SYS_HOST+":"+SYS_PORT+SYS_PATH);
logger.info("connected to the System service: " + SYS_HOST + ":" + SYS_PORT + SYS_PATH);
remoteHostChannel = channelFuture.channel();
//Reading the main channel after Sys service is connected
mainChannel.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import lambda.netty.loadbalancer.core.proxy.ProxyEvent;
import org.apache.log4j.Logger;

import java.nio.charset.StandardCharsets;


public class SysServiceResponseHandler extends SimpleChannelInboundHandler<HttpObject> {
final static Logger logger = Logger.getLogger(SysServiceResponseHandler.class);
Expand All @@ -25,9 +23,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
logger.info("Sys response has received triggering the proxyEvent ");
FullHttpResponse fullHttpResponse = (FullHttpResponse) msg;

String remoteIP=fullHttpResponse.headers().get("remoteIP");
String domain=fullHttpResponse.headers().get("domain");
logger.info("Domain: "+domain+" Remote IP: "+remoteIP);
String remoteIP = fullHttpResponse.headers().get("remoteIP");
String domain = fullHttpResponse.headers().get("domain");
logger.info("Domain: " + domain + " Remote IP: " + remoteIP);

ProxyEvent proxyEvent = new ProxyEvent(remoteIP);
proxyEvent.setDomain(domain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
public class Launcher {
private static final Logger logger = Logger.getLogger(Launcher.class);
private final static String CONFIG_PROPERTIES_FILE = "config.xml";

static {
Configurations configs = new Configurations();
try {
Expand All @@ -27,13 +28,12 @@ public class Launcher {


// start implementing after the static block. it's loading the configuration
private static ExecutorService service = Executors.newFixedThreadPool(Launcher.getIntValue(ConfigConstants.LAUNCHER_THREADS));
public final static boolean SCALABILITY_ENABLED=Launcher.getBoolean(ConfigConstants.CONFIG_SCALABILITY_ENABLED);
private static ExecutorService service = Executors.newFixedThreadPool(Launcher.getIntValue(ConfigConstants.LAUNCHER_THREADS));
public final static boolean SCALABILITY_ENABLED = Launcher.getBoolean(ConfigConstants.CONFIG_SCALABILITY_ENABLED);

private static XMLConfiguration xmlConfiguration;



public static String getString(String tag) {
return xmlConfiguration.getString(tag);
}
Expand All @@ -57,22 +57,24 @@ public static List<Integer> getIntList(String key) {
if (obj instanceof List) {
List tmp = (List) obj;
List<Integer> tmp_return = new ArrayList<>(tmp.size());
tmp.forEach(x->tmp_return.add(Integer.parseInt((String) x)));
tmp.forEach(x -> tmp_return.add(Integer.parseInt((String) x)));

return tmp_return;
}
return null;
}

public static boolean getBoolean(String key){
public static boolean getBoolean(String key) {
String val = getString(key);

return val.equals("true")? true:false;
return val.equals("true") ? true : false;
}

public static long getLong(String s) {

return xmlConfiguration.getLong(s);
}

public static void main(String[] args) throws InterruptedException {

// State state = new StateImpl();
Expand All @@ -91,9 +93,9 @@ public static void main(String[] args) throws InterruptedException {
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
if(SCALABILITY_ENABLED){
if (SCALABILITY_ENABLED) {
service.submit(new ScalabilityManager());
}else {
} else {
logger.info("Scalability is not enabled !");
}
service.submit(new Server());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

public interface State {
/**
*
* @param host host:port ex: "127.0.0.1:8508"
*/
void pushHost(String host);
Expand All @@ -28,7 +27,8 @@ public interface State {
void setState(InstanceStates state);

Queue<String> getHosts();
public String getDomain() ;

public String getDomain();

public void setDomain(String domain);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class StateImpl implements State {
private Queue<String> hosts = new LinkedList<>();
private InstanceStates state;
private String domain;

public StateImpl() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


import io.netty.channel.*;
import io.netty.util.AttributeKey;
import lambda.netty.loadbalancer.core.launch.Launcher;
import lambda.netty.loadbalancer.core.scalability.ScaleInfoDAO;
import org.apache.log4j.Logger;
Expand All @@ -16,19 +15,20 @@ public class ProxyBackendHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;

private long time;

public ProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}

public ProxyBackendHandler(Channel channel, long time) {
this.inboundChannel=channel;
this.time=time;
this.inboundChannel = channel;
this.time = time;
}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundChannel.writeAndFlush(msg).addListener(new CustomListener((String)ctx.channel().attr(DOMAIN).get()));
inboundChannel.writeAndFlush(msg).addListener(new CustomListener((String) ctx.channel().attr(DOMAIN).get()));
}

@Override
Expand All @@ -55,10 +55,9 @@ public CustomListener(String domain) {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
logger.info("Message redirected to the Client");
if(Launcher.SCALABILITY_ENABLED){
if (Launcher.SCALABILITY_ENABLED) {
logger.info("Putting response time !");
System.out.println(time);
ScaleInfoDAO.putTime(domain,System.currentTimeMillis()-time);
ScaleInfoDAO.putTime(domain, System.currentTimeMillis() - time);
}
inboundChannel.close();
channelFuture.channel().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,26 @@
public class ProxyBackendHandlersInit extends ChannelInitializer<SocketChannel> {
Channel channel;
long time;

public ProxyBackendHandlersInit(Channel channel, long time) {
this.channel = channel;
this.time=time;
this.time = time;

}

public ProxyBackendHandlersInit(Channel channel) {
this.channel=channel;
this.channel = channel;
}

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addFirst(new HttpRequestEncoder());
if(Launcher.SCALABILITY_ENABLED){
socketChannel.pipeline().addLast(new ProxyBackendHandler(channel,time));
}else{
if (Launcher.SCALABILITY_ENABLED) {
socketChannel.pipeline().addLast(new ProxyBackendHandler(channel, time));
} else {

socketChannel.pipeline().addLast( new ProxyBackendHandler(channel));
socketChannel.pipeline().addLast(new ProxyBackendHandler(channel));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

public class ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
final static Logger logger = Logger.getLogger(ProxyFrontendHandler.class);
public static final AttributeKey DOMAIN =AttributeKey.newInstance("domain");
public static final AttributeKey DOMAIN = AttributeKey.newInstance("domain");
Bootstrap b;
Object requestToProxyServer;
// As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
Expand Down Expand Up @@ -39,17 +39,15 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
public void channelActive(ChannelHandlerContext ctx) {
final Channel channel = ctx.channel();
b = new Bootstrap();
b= b.group(ctx.channel().eventLoop())
b = b.group(ctx.channel().eventLoop())
.channel(ctx.channel().getClass());
if(Launcher.SCALABILITY_ENABLED){
if (Launcher.SCALABILITY_ENABLED) {
b.handler(new ProxyBackendHandlersInit(channel, System.currentTimeMillis()));
}else{
} else {
b.handler(new ProxyBackendHandlersInit(channel));
}




}

@Override
Expand Down Expand Up @@ -79,7 +77,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
ProxyEvent proxyEvent = (ProxyEvent) evt;
ChannelFuture f = b.connect(proxyEvent.getHost(), proxyEvent.getPort());
f.channel().attr(DOMAIN).set(proxyEvent.getDomain());
f.addListener(new CustomListener(proxyEvent.getHost()+":"+proxyEvent.getPort()));
f.addListener(new CustomListener(proxyEvent.getHost() + ":" + proxyEvent.getPort()));
} else {
System.out.println(evt);
}
Expand All @@ -88,13 +86,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
private final class CustomListener implements ChannelFutureListener {
private String backendServer;

CustomListener(String backendServer){
CustomListener(String backendServer) {
this.backendServer = backendServer;
}

@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
logger.info("Connected to the backend server: "+backendServer );
logger.info("Connected to the backend server: " + backendServer);
outboundChannel = channelFuture.channel();
outboundChannel.writeAndFlush(requestToProxyServer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ public class HandleScaling implements Runnable {

private String domain;

HandleScaling(String domain){
HandleScaling(String domain) {
this.domain = domain;
}

@Override
public void run() {
logger.info("scaled");
Expand Down
Loading

0 comments on commit 4e485e5

Please sign in to comment.