Skip to content

Commit 5e9ea6d

Browse files
committed
Issue #29 ProtocolMultiplexer checkin and changes to pom netty
dependency
1 parent 1691c50 commit 5e9ea6d

File tree

5 files changed

+405
-1
lines changed

5 files changed

+405
-1
lines changed

jetclient/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<dependency>
2424
<groupId>io.netty</groupId>
2525
<artifactId>netty</artifactId>
26-
<version>3.3.1.Final</version>
26+
<version>3.5.11.Final</version>
2727
</dependency>
2828
</dependencies>
2929

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package org.menacheri.jetserver.handlers.netty;
2+
3+
import static org.menacheri.jetserver.event.Events.LOG_IN;
4+
import static org.menacheri.jetserver.event.Events.PROTCOL_VERSION;
5+
6+
import java.util.List;
7+
8+
import org.jboss.netty.buffer.ChannelBuffer;
9+
import org.jboss.netty.channel.ChannelHandler;
10+
import org.jboss.netty.channel.ChannelPipeline;
11+
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
12+
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
13+
import org.menacheri.jetserver.event.Events;
14+
15+
/**
16+
* Applies a protocol to the incoming pipeline which will handle login.
17+
* Subsequent protocol may also be manipulated by this login handlers.
18+
*
19+
* @author Abraham Menacherry
20+
*
21+
*/
22+
public interface LoginProtocol
23+
{
24+
/**
25+
* Apply a protocol on the pipeline to handle login. Implementations will
26+
* first "search" if the incoming bytes correspond to the implementations
27+
* protocol, only if they match, the correspoinding protocol will be
28+
* applied.
29+
*
30+
* @param buffer
31+
* The incoming buffer, by default around 5 bytes will be read
32+
* and passed on to detect the protocol
33+
* @param pipeline
34+
* The channelpipeline on which the login protocol handlers need
35+
* to be set.
36+
* @return Returs true if the protocol was applied, else false.
37+
*/
38+
public boolean applyProtocol(ChannelBuffer buffer, ChannelPipeline pipeline);
39+
40+
/**
41+
* Searches the incoming bytes of a client connection to determine if its an
42+
* HTTP connection, in which case Websocket or HTTP related handlers will be
43+
* applied on the piepline.
44+
*
45+
* @author Abraham Menacherry
46+
*
47+
*/
48+
public static class HTTPProtocol implements LoginProtocol
49+
{
50+
@Override
51+
public boolean applyProtocol(ChannelBuffer buffer,
52+
ChannelPipeline pipeline)
53+
{
54+
final int magic1 = buffer.getUnsignedByte(buffer.readerIndex());
55+
final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1);
56+
if (isHttp(magic1, magic2))
57+
{
58+
// TODO apply the protocol
59+
}
60+
return false;
61+
}
62+
63+
/**
64+
* Method which checks if the first 2 incoming parameters are G, E or
65+
* similar combiantions which signal that its an HTTP protocol, since
66+
* some protocols like jetserver's default protocol send the length
67+
* first (which is 2 arbitrary bytes), its better if this protocol is
68+
* searched last to avoid switching to HTTP protocol prematurely.
69+
*
70+
* @param magic1
71+
* @param magic2
72+
* @return
73+
*/
74+
protected boolean isHttp(int magic1, int magic2)
75+
{
76+
return magic1 == 'G' && magic2 == 'E' || // GET
77+
magic1 == 'P' && magic2 == 'O' || // POST
78+
magic1 == 'P' && magic2 == 'U' || // PUT
79+
magic1 == 'H' && magic2 == 'E' || // HEAD
80+
magic1 == 'O' && magic2 == 'P' || // OPTIONS
81+
magic1 == 'P' && magic2 == 'A' || // PATCH
82+
magic1 == 'D' && magic2 == 'E' || // DELETE
83+
magic1 == 'T' && magic2 == 'R' || // TRACE
84+
magic1 == 'C' && magic2 == 'O'; // CONNECT
85+
}
86+
}
87+
88+
/**
89+
* This is the default protocol of jetserver. If incoming event is of type
90+
* LOG_IN and also has appropriate protocol version as defined in the
91+
* {@link Events} class, then this protocol will be applied. The 3rd and 4th
92+
* bytes of the incoming transmission are searched to get this information.
93+
*
94+
* @author Abraham Menacherry
95+
*
96+
*/
97+
public static class DefaultJetProtocol implements LoginProtocol
98+
{
99+
100+
private int frameSize = 1024;
101+
private EventDecoder eventDecoder;
102+
private LoginHandler loginHandler;
103+
private LengthFieldPrepender lengthFieldPrepender;
104+
105+
@Override
106+
public boolean applyProtocol(ChannelBuffer buffer,
107+
ChannelPipeline pipeline)
108+
{
109+
final int opCode = buffer.getUnsignedByte(buffer.readerIndex() + 2);
110+
final int protocolVersion = buffer.getUnsignedByte(buffer
111+
.readerIndex() + 3);
112+
if (isJetProtocol(opCode, protocolVersion))
113+
{
114+
pipeline.addLast("framer", createLengthBasedFrameDecoder());
115+
pipeline.addLast("eventDecoder", eventDecoder);
116+
pipeline.addLast("loginHandler", loginHandler);
117+
pipeline.addLast("lengthFieldPrepender", lengthFieldPrepender);
118+
}
119+
return true;
120+
}
121+
122+
protected boolean isJetProtocol(int magic1, int magic2)
123+
{
124+
return magic1 == LOG_IN && magic2 == PROTCOL_VERSION;
125+
}
126+
127+
public ChannelHandler createLengthBasedFrameDecoder()
128+
{
129+
return new LengthFieldBasedFrameDecoder(frameSize, 0, 2, 0, 2);
130+
}
131+
132+
public int getFrameSize()
133+
{
134+
return frameSize;
135+
}
136+
137+
public void setFrameSize(int frameSize)
138+
{
139+
this.frameSize = frameSize;
140+
}
141+
142+
public EventDecoder getEventDecoder()
143+
{
144+
return eventDecoder;
145+
}
146+
147+
public void setEventDecoder(EventDecoder eventDecoder)
148+
{
149+
this.eventDecoder = eventDecoder;
150+
}
151+
152+
public LoginHandler getLoginHandler()
153+
{
154+
return loginHandler;
155+
}
156+
157+
public void setLoginHandler(LoginHandler loginHandler)
158+
{
159+
this.loginHandler = loginHandler;
160+
}
161+
162+
public LengthFieldPrepender getLengthFieldPrepender()
163+
{
164+
return lengthFieldPrepender;
165+
}
166+
167+
public void setLengthFieldPrepender(
168+
LengthFieldPrepender lengthFieldPrepender)
169+
{
170+
this.lengthFieldPrepender = lengthFieldPrepender;
171+
}
172+
}
173+
174+
public static class CompositeProtocol implements LoginProtocol
175+
{
176+
private List<LoginProtocol> protocols;
177+
178+
@Override
179+
public boolean applyProtocol(ChannelBuffer buffer,
180+
ChannelPipeline pipeline)
181+
{
182+
if (null != protocols)
183+
{
184+
for (LoginProtocol protocol : protocols)
185+
{
186+
if (protocol.applyProtocol(buffer, pipeline))
187+
{
188+
return true;
189+
}
190+
}
191+
}
192+
return false;
193+
}
194+
195+
public List<LoginProtocol> getProtocols()
196+
{
197+
return protocols;
198+
}
199+
200+
public void setProtocols(List<LoginProtocol> protocols)
201+
{
202+
this.protocols = protocols;
203+
}
204+
}
205+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package org.menacheri.jetserver.handlers.netty;
2+
3+
import org.jboss.netty.buffer.ChannelBuffer;
4+
import org.jboss.netty.channel.Channel;
5+
import org.jboss.netty.channel.ChannelHandlerContext;
6+
import org.jboss.netty.channel.ChannelPipeline;
7+
import org.jboss.netty.handler.codec.frame.FrameDecoder;
8+
import org.menacheri.jetserver.util.BinaryUtils;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
/**
13+
* This class can be used to switch login-protocol based on the incoming bytes
14+
* sent by a client. So, based on the incoming bytes, it is possible to set SSL
15+
* enabled, normal HTTP, default jetserver protocol, or custom user protocol for
16+
* allowing client to login to jetserver. The appropriate protocol searcher
17+
* needs to be injected via spring to this class.
18+
*
19+
* @author Abraham Menacherry
20+
*
21+
*/
22+
public class ProtocolMultiplexerDecoder extends FrameDecoder
23+
{
24+
25+
private static final Logger LOG = LoggerFactory
26+
.getLogger(ProtocolMultiplexerDecoder.class);
27+
28+
private final LoginProtocol loginProtocol;
29+
private final int bytesForProtocolCheck;
30+
31+
public ProtocolMultiplexerDecoder(int bytesForProtocolCheck,
32+
LoginProtocol loginProtocol)
33+
{
34+
this.loginProtocol = loginProtocol;
35+
this.bytesForProtocolCheck = bytesForProtocolCheck;
36+
}
37+
38+
@Override
39+
protected Object decode(ChannelHandlerContext ctx, Channel channel,
40+
ChannelBuffer buffer) throws Exception
41+
{
42+
// Will use the first bytes to detect a protocol.
43+
if (buffer.readableBytes() < bytesForProtocolCheck)
44+
{
45+
return null;
46+
}
47+
48+
ChannelPipeline pipeline = ctx.getPipeline();
49+
50+
if (!loginProtocol.applyProtocol(buffer, pipeline))
51+
{
52+
byte[] headerBytes = new byte[bytesForProtocolCheck];
53+
buffer.getBytes(buffer.readerIndex(), headerBytes, 0,
54+
bytesForProtocolCheck);
55+
LOG.error(
56+
"Unknown protocol, discard everything and close the connection {}. Incoming Bytes {}",
57+
ctx.getChannel().getId(),
58+
BinaryUtils.getHexString(headerBytes));
59+
close(buffer, channel);
60+
return null;
61+
}
62+
else
63+
{
64+
pipeline.remove(this);
65+
}
66+
67+
// Forward the current read buffer as is to the new handlers.
68+
return buffer.readBytes(buffer.readableBytes());
69+
}
70+
71+
protected void close(ChannelBuffer buffer, Channel channel)
72+
{
73+
buffer.skipBytes(buffer.readableBytes());
74+
channel.close();
75+
}
76+
77+
public LoginProtocol getLoginProtocol()
78+
{
79+
return loginProtocol;
80+
}
81+
82+
public int getBytesForProtocolCheck()
83+
{
84+
return bytesForProtocolCheck;
85+
}
86+
87+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.menacheri.jetserver.server.netty;
2+
3+
import static org.jboss.netty.channel.Channels.pipeline;
4+
5+
import org.jboss.netty.channel.ChannelHandler;
6+
import org.jboss.netty.channel.ChannelPipeline;
7+
import org.jboss.netty.channel.ChannelPipelineFactory;
8+
import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
9+
import org.jboss.netty.handler.timeout.IdleStateHandler;
10+
import org.jboss.netty.util.Timer;
11+
import org.menacheri.jetserver.handlers.netty.LoginProtocol;
12+
import org.menacheri.jetserver.handlers.netty.ProtocolMultiplexerDecoder;
13+
14+
public class ProtocolMultiplexerPipelineFactory implements
15+
ChannelPipelineFactory
16+
{
17+
private static final int MAX_IDLE_SECONDS = 60;
18+
private Timer timer;
19+
private IdleStateAwareChannelHandler idleCheckHandler;
20+
private int bytesForProtocolCheck;
21+
private LoginProtocol loginProtocol;
22+
23+
@Override
24+
public ChannelPipeline getPipeline() throws Exception
25+
{
26+
// Create a default pipeline implementation.
27+
ChannelPipeline pipeline = pipeline();
28+
pipeline.addLast("idleStateCheck", new IdleStateHandler(timer, 0, 0,
29+
MAX_IDLE_SECONDS));
30+
pipeline.addLast("idleCheckHandler", idleCheckHandler);
31+
pipeline.addLast("multiplexer", createProtcolMultiplexerDecoder());
32+
return pipeline;
33+
}
34+
35+
protected ChannelHandler createProtcolMultiplexerDecoder()
36+
{
37+
return new ProtocolMultiplexerDecoder(bytesForProtocolCheck,loginProtocol);
38+
}
39+
40+
public Timer getTimer()
41+
{
42+
return timer;
43+
}
44+
45+
public void setTimer(Timer timer)
46+
{
47+
this.timer = timer;
48+
}
49+
50+
public IdleStateAwareChannelHandler getIdleCheckHandler()
51+
{
52+
return idleCheckHandler;
53+
}
54+
55+
public void setIdleCheckHandler(IdleStateAwareChannelHandler idleCheckHandler)
56+
{
57+
this.idleCheckHandler = idleCheckHandler;
58+
}
59+
60+
public int getBytesForProtocolCheck()
61+
{
62+
return bytesForProtocolCheck;
63+
}
64+
65+
public void setBytesForProtocolCheck(int bytesForProtocolCheck)
66+
{
67+
this.bytesForProtocolCheck = bytesForProtocolCheck;
68+
}
69+
70+
public LoginProtocol getLoginProtocol()
71+
{
72+
return loginProtocol;
73+
}
74+
75+
public void setLoginProtocol(LoginProtocol loginProtocol)
76+
{
77+
this.loginProtocol = loginProtocol;
78+
}
79+
}

0 commit comments

Comments
 (0)