Skip to content

Commit f6fff7a

Browse files
author
Evan Hu
committed
编码解码器
1 parent 88a3524 commit f6fff7a

12 files changed

+144
-68
lines changed

src/main/java/info/xiaomo/gengine/network/IMessageAndHandler.java renamed to src/main/java/info/xiaomo/gengine/network/IMessagePool.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
11
package info.xiaomo.gengine.network;
22

3-
import com.google.protobuf.AbstractMessage;
3+
import com.google.protobuf.Message;
44

55
/** @author xiaomo */
6-
public interface IMessageAndHandler {
6+
public interface IMessagePool {
77

88
/**
99
* 获取消息
1010
*
1111
* @param messageId messageId
1212
* @return AbstractMessage
1313
*/
14-
AbstractMessage getMessage(int messageId);
14+
Message getMessage(int messageId);
1515

1616
/**
1717
* 获取消息id
1818
*
1919
* @param message message
2020
* @return int
2121
*/
22-
int getMessageId(AbstractMessage message);
22+
int getMessageId(Message message);
2323

2424
/**
2525
* 获取handler
@@ -35,14 +35,13 @@ public interface IMessageAndHandler {
3535
* @param messageClazz messageClazz
3636
* @param handler handler
3737
*/
38-
void register(
39-
int messageId, AbstractMessage messageClazz, Class<? extends AbstractHandler> handler);
38+
void register(int messageId, Message messageClazz, Class<? extends AbstractHandler> handler);
4039

4140
/**
4241
* 注册
4342
*
4443
* @param messageId messageId
4544
* @param messageClazz messageClazz
4645
*/
47-
void register(int messageId, AbstractMessage messageClazz);
46+
void register(int messageId, Message messageClazz);
4847
}
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package info.xiaomo.gengine.network;
22

3+
import com.google.protobuf.Message;
34
import io.netty.channel.Channel;
4-
55
/** @author xiaomo */
66
public interface INetworkConsumer {
77

@@ -11,5 +11,5 @@ public interface INetworkConsumer {
1111
* @param msg msg
1212
* @param channel channel
1313
*/
14-
void consume(MsgPack msg, Channel channel);
14+
void consume(Message msg, Channel channel);
1515
}

src/main/java/info/xiaomo/gengine/network/NetworkServiceBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class NetworkServiceBuilder {
2727
private INetworkEventListener listener;
2828

2929
/** 消息池 */
30-
private IMessageAndHandler imessageandhandler;
30+
private IMessagePool messagePool;
3131

3232
/** 默认为false */
3333
private boolean isWebSocket = false;

src/main/java/info/xiaomo/gengine/network/NetworkServiceImpl.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,12 @@ protected void initChannel(Channel ch) {
148148
pip.addLast(new WebSocketServerProtocolHandler("/"));
149149
pip.addLast(new WebSocketDecoder());
150150
pip.addLast(new WebSocketEncoder());
151-
pip.addLast(new MessageDecoder(builder.getUpLimit()));
152-
pip.addLast(new MessageExecutor(builder.getConsumer(), builder.getListener()));
151+
pip.addLast(new DefaultProtobufDecoder(builder.getMessagePool()));
152+
pip.addLast(
153+
new MessageExecutor(
154+
builder.getConsumer(),
155+
builder.getListener(),
156+
builder.getMessagePool()));
153157
for (ChannelHandler handler : builder.getExtraHandlers()) {
154158
pip.addLast(handler);
155159
}
@@ -167,15 +171,15 @@ static class SocketHandler extends ChannelInitializer<Channel> {
167171
@Override
168172
protected void initChannel(Channel ch) {
169173
ChannelPipeline pip = ch.pipeline();
170-
int maxLength = 1048576;
171-
int lengthFieldLength = 4;
172-
int ignoreLength = -4;
173-
int offset = 0;
174-
// pip.addLast(new LengthFieldBasedFrameDecoder(maxLength, offset, lengthFieldLength, ignoreLength, lengthFieldLength));
175-
pip.addLast(new MessageDecoder(builder.getUpLimit()));
176-
// pip.addLast(new LengthFieldPrepender(4, true));
177-
pip.addLast(new MessageEncoder());
178-
pip.addLast(new MessageExecutor(builder.getConsumer(), builder.getListener()));
174+
// pip.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
175+
pip.addLast(new DefaultProtobufDecoder(builder.getMessagePool()));
176+
pip.addLast(new LengthFieldPrepender(4));
177+
pip.addLast(new DefaultProtobufEncoder(builder.getMessagePool()));
178+
pip.addLast(
179+
new MessageExecutor(
180+
builder.getConsumer(),
181+
builder.getListener(),
182+
builder.getMessagePool()));
179183
for (ChannelHandler handler : builder.getExtraHandlers()) {
180184
pip.addLast(handler);
181185
}

src/main/java/info/xiaomo/gengine/network/client/Client.java

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
8585
new ClientMessageExecutor(
8686
builder.getConsumer(),
8787
builder.getEventListener(),
88+
builder.getMsgPool(),
8889
futureMap,
8990
idleCheck));
9091
}

src/main/java/info/xiaomo/gengine/network/client/ClientMessageExecutor.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,34 @@
11
package info.xiaomo.gengine.network.client;
22

33
import java.util.Map;
4+
import info.xiaomo.gengine.network.IMessagePool;
45
import info.xiaomo.gengine.network.INetworkConsumer;
56
import info.xiaomo.gengine.network.INetworkEventListener;
67
import info.xiaomo.gengine.network.MsgPack;
78
import info.xiaomo.gengine.network.handler.MessageExecutor;
89
import io.netty.channel.ChannelHandlerContext;
910
import io.netty.handler.timeout.IdleState;
1011
import io.netty.handler.timeout.IdleStateEvent;
12+
import lombok.extern.slf4j.Slf4j;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
1315

16+
@Slf4j
1417
public class ClientMessageExecutor extends MessageExecutor {
1518

16-
public static final Logger LOGGER = LoggerFactory.getLogger(ClientMessageExecutor.class);
17-
1819
protected Map<Short, ClientFuture<MsgPack>> futureMap;
1920

21+
protected IMessagePool pool;
22+
2023
protected boolean idleCheck;
2124

2225
public ClientMessageExecutor(
2326
INetworkConsumer consumer,
2427
INetworkEventListener listener,
28+
IMessagePool pool,
2529
Map<Short, ClientFuture<MsgPack>> futureMap,
2630
boolean idleCheck) {
27-
super(consumer, listener);
31+
super(consumer, listener, pool);
2832
this.futureMap = futureMap;
2933
this.idleCheck = idleCheck;
3034
}

src/main/java/info/xiaomo/gengine/network/client/DefaultNetworkConsumerAdapter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package info.xiaomo.gengine.network.client;
22

3+
import com.google.protobuf.Message;
34
import info.xiaomo.gengine.network.INetworkConsumer;
4-
import info.xiaomo.gengine.network.MsgPack;
55
import io.netty.channel.Channel;
66

77
/**
@@ -13,7 +13,7 @@
1313
public class DefaultNetworkConsumerAdapter implements INetworkConsumer {
1414

1515
@Override
16-
public void consume(MsgPack msg, Channel channel) {
16+
public void consume(Message msg, Channel channel) {
1717
// Nothing to do
1818
}
1919
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package info.xiaomo.gengine.network.handler;
2+
3+
import com.google.protobuf.Message;
4+
import java.util.List;
5+
import java.util.Objects;
6+
import info.xiaomo.gengine.network.IMessagePool;
7+
import io.netty.buffer.ByteBuf;
8+
import io.netty.channel.ChannelHandlerContext;
9+
import io.netty.handler.codec.ByteToMessageDecoder;
10+
import lombok.extern.slf4j.Slf4j;
11+
12+
@Slf4j
13+
public class DefaultProtobufDecoder extends ByteToMessageDecoder {
14+
15+
private IMessagePool pool;
16+
17+
public DefaultProtobufDecoder(IMessagePool pool) {
18+
this.pool = pool;
19+
}
20+
21+
@Override
22+
protected void decode(
23+
ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
24+
throws Exception {
25+
int msgId = byteBuf.readInt();
26+
byte[] bytes = new byte[byteBuf.readableBytes()];
27+
byteBuf.readBytes(bytes);
28+
29+
Message message = pool.getMessage(msgId);
30+
31+
if (message == null) {
32+
log.error("解码到未注册的消息id:{}", msgId);
33+
}
34+
list.add(Objects.requireNonNull(message).getParserForType().parseFrom(bytes));
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package info.xiaomo.gengine.network.handler;
2+
3+
import com.google.protobuf.Message;
4+
import info.xiaomo.gengine.network.IMessagePool;
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.handler.codec.MessageToByteEncoder;
8+
import lombok.extern.slf4j.Slf4j;
9+
10+
@Slf4j
11+
public class DefaultProtobufEncoder extends MessageToByteEncoder<Message> {
12+
13+
private IMessagePool pool;
14+
15+
public DefaultProtobufEncoder(IMessagePool pool) {
16+
this.pool = pool;
17+
}
18+
19+
@Override
20+
protected void encode(
21+
ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf)
22+
throws Exception {
23+
int msgId = pool.getMessageId(message);
24+
if (msgId == 0) {
25+
log.error("消息未注册:{}", message.getClass().getSimpleName());
26+
return;
27+
}
28+
29+
byte[] bytes = message.toByteArray();
30+
int length = Integer.BYTES + bytes.length;
31+
byteBuf.ensureWritable(length);
32+
boolean writable = byteBuf.isWritable(length);
33+
if (!writable) {
34+
log.error("消息过大,编码失败:{}:{}", msgId, length);
35+
}
36+
37+
byteBuf.writeInt(msgId);
38+
byteBuf.writeBytes(bytes);
39+
}
40+
}

src/main/java/info/xiaomo/gengine/network/handler/MessageExecutor.java

+11-22
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,27 @@
33
import com.google.protobuf.AbstractMessage;
44
import com.google.protobuf.Message;
55
import java.lang.reflect.Method;
6+
import info.xiaomo.gengine.network.IMessagePool;
67
import info.xiaomo.gengine.network.INetworkConsumer;
78
import info.xiaomo.gengine.network.INetworkEventListener;
8-
import info.xiaomo.gengine.network.MsgPack;
9-
import info.xiaomo.gengine.network.pool.MessagePool;
109
import info.xiaomo.gengine.utils.ClassUtil;
1110
import io.netty.channel.ChannelHandlerContext;
1211
import io.netty.channel.SimpleChannelInboundHandler;
1312
import lombok.extern.slf4j.Slf4j;
1413

1514
/** @author xiaomo */
1615
@Slf4j
17-
public class MessageExecutor extends SimpleChannelInboundHandler<MsgPack> {
16+
public class MessageExecutor extends SimpleChannelInboundHandler<Message> {
1817

1918
protected final INetworkEventListener listener;
2019
private final INetworkConsumer consumer;
20+
private final IMessagePool pool;
2121

22-
public MessageExecutor(INetworkConsumer consumer, INetworkEventListener listener) {
22+
public MessageExecutor(
23+
INetworkConsumer consumer, INetworkEventListener listener, IMessagePool pool) {
2324
this.consumer = consumer;
2425
this.listener = listener;
26+
this.pool = pool;
2527
}
2628

2729
@Override
@@ -30,27 +32,14 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
3032
}
3133

3234
@Override
33-
public void channelRead0(ChannelHandlerContext ctx, MsgPack msgPack) throws Exception {
34-
AbstractMessage abstractMessage = MessagePool.messages.get(msgPack.getMsgId());
35+
public void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
36+
int msgId = pool.getMessageId(message);
3537

36-
if (abstractMessage == null) {
37-
if (msgPack.getMsgId() == 0) {
38-
log.error("请求消息未设置msgId");
39-
} else {
40-
log.error("消息未注册,请检查");
41-
}
38+
if (msgId == 0) {
39+
log.error("请求消息未设置msgId");
4240
return;
4341
}
44-
45-
Method m = ClassUtil.findProtobufMsg(abstractMessage.getClass());
46-
if (m != null) {
47-
AbstractMessage message = (AbstractMessage) m.invoke(null);
48-
Message msg = message.newBuilderForType().mergeFrom(msgPack.getBytes()).build();
49-
msgPack.setMsg(msg);
50-
consumer.consume(msgPack, ctx.channel());
51-
} else {
52-
log.error("找有找到消息体:{}", msgPack.getMsgId());
53-
}
42+
consumer.consume(message, ctx.channel());
5443
}
5544

5645
@Override

src/main/java/info/xiaomo/gengine/network/pool/MessagePool.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
11
package info.xiaomo.gengine.network.pool;
22

3-
import com.google.protobuf.AbstractMessage;
3+
import com.google.protobuf.Message;
44
import java.util.HashMap;
55
import java.util.Map;
66
import info.xiaomo.gengine.network.AbstractHandler;
7-
import info.xiaomo.gengine.network.IMessageAndHandler;
7+
import info.xiaomo.gengine.network.IMessagePool;
88

99
/** desc : 消息池的父类 Copyright(©) 2017 by xiaomo. */
10-
public class MessagePool implements IMessageAndHandler {
10+
public class MessagePool implements IMessagePool {
1111

1212
/** 消息类字典 */
13-
public static final Map<Integer, AbstractMessage> messages = new HashMap<>(10);
13+
public static final Map<Integer, Message> messages = new HashMap<>(10);
1414

1515
/** 类和 */
1616
private final Map<String, Integer> ids = new HashMap<>(10);
1717

1818
private final Map<Integer, Class<? extends AbstractHandler>> handlers = new HashMap<>(10);
1919

2020
@Override
21-
public AbstractMessage getMessage(int messageId) {
21+
public Message getMessage(int messageId) {
2222
return messages.get(messageId);
2323
}
2424

2525
@Override
26-
public int getMessageId(AbstractMessage message) {
26+
public int getMessageId(Message message) {
2727
return ids.get(message.getClass().getName());
2828
}
2929

@@ -42,14 +42,14 @@ public AbstractHandler getHandler(int messageId) {
4242

4343
@Override
4444
public void register(
45-
int messageId, AbstractMessage messageClazz, Class<? extends AbstractHandler> handler) {
45+
int messageId, Message messageClazz, Class<? extends AbstractHandler> handler) {
4646
messages.put(messageId, messageClazz);
4747
handlers.put(messageId, handler);
4848
ids.put(messageClazz.getClass().getName(), messageId);
4949
}
5050

5151
@Override
52-
public void register(int messageId, AbstractMessage messageClazz) {
52+
public void register(int messageId, Message messageClazz) {
5353
messages.put(messageId, messageClazz);
5454
ids.put(messageClazz.getClass().getName(), messageId);
5555
}

0 commit comments

Comments
 (0)