Skip to content

Commit

Permalink
Merge pull request #324 from FISCO-BCOS/dev
Browse files Browse the repository at this point in the history
sync code
  • Loading branch information
bxq2011hust authored Jun 24, 2019
2 parents 287dee6 + e14dd79 commit f21ea53
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 202 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
### Java template
*.class


# Package Files #
*.war
*.ear
Expand Down Expand Up @@ -40,6 +39,7 @@ src/test/resources/node.key
src/test/java/org/fisco/bcos/temp
build.gradle.bak
.settings/
.classpath
.project
bin/

28 changes: 14 additions & 14 deletions src/main/java/org/fisco/bcos/channel/client/AmopException.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package org.fisco.bcos.channel.client;

public class AmopException extends Exception {

private String errmsg;

public String getErrmsg() {
return errmsg;
}

public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
private String errmsg;

public AmopException(String string) {

errmsg = string;
}
}
public String getErrmsg() {
return errmsg;
}

public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}

public AmopException(String string) {

errmsg = string;
}
}
143 changes: 75 additions & 68 deletions src/main/java/org/fisco/bcos/channel/client/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.fisco.bcos.channel.dto.BcosMessage;
import org.fisco.bcos.channel.dto.BcosRequest;
import org.fisco.bcos.channel.dto.BcosResponse;
Expand All @@ -35,6 +34,7 @@
import org.fisco.bcos.channel.handler.ConnectionInfo;
import org.fisco.bcos.channel.handler.GroupChannelConnectionsConfig;
import org.fisco.bcos.channel.handler.Message;
import org.fisco.bcos.web3j.protocol.ObjectMapperFactory;
import org.fisco.bcos.web3j.protocol.core.methods.response.TransactionReceipt;
import org.fisco.bcos.web3j.protocol.exceptions.TransactionException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -470,29 +470,33 @@ public void run(Timeout timeout) throws Exception {
public void asyncSendChannelMessage2(
ChannelRequest request, ChannelResponseCallback2 callback) {
try {

if(request.getContentByteArray().length >= 32*1024*1024)
{
logger.error("send byte length should not greater than 32M now length:{}",
request.getContentByteArray().length);
System.out.println("send byte length should not greater than 32M now length:"+
request.getContentByteArray().length);
throw new AmopException("send byte length should not greater than 32M");
}

logger.debug("ChannelRequest: " + request.getMessageID());
callback.setService(this);

ChannelMessage2 channelMessage = new ChannelMessage2();

channelMessage.setSeq(request.getMessageID());
channelMessage.setResult(0);
channelMessage.setType((short) 0x30); // 链上链下请求0x30
channelMessage.setData(request.getContentByteArray());

System.out.println("value length:"+request.getContentByteArray().length
+":"+request.getContent().getBytes().length);
channelMessage.setTopic(request.getToTopic());

if (request.getContentByteArray().length >= 32 * 1024 * 1024) {
logger.error(
"send byte length should not greater than 32M now length:{}",
request.getContentByteArray().length);
System.out.println(
"send byte length should not greater than 32M now length:"
+ request.getContentByteArray().length);
throw new AmopException("send byte length should not greater than 32M");
}

logger.debug("ChannelRequest: " + request.getMessageID());
callback.setService(this);

ChannelMessage2 channelMessage = new ChannelMessage2();

channelMessage.setSeq(request.getMessageID());
channelMessage.setResult(0);
channelMessage.setType((short) 0x30); // 链上链下请求0x30
channelMessage.setData(request.getContentByteArray());

System.out.println(
"value length:"
+ request.getContentByteArray().length
+ ":"
+ request.getContent().getBytes().length);
channelMessage.setTopic(request.getToTopic());

try {
List<ConnectionInfo> fromConnectionInfos = new ArrayList<ConnectionInfo>();
Expand Down Expand Up @@ -564,9 +568,9 @@ public void run(Timeout timeout) throws Exception {
logger.error("system error", e);
}
}

public void asyncMulticastChannelMessage2(ChannelRequest request) {
try {
try {
logger.debug("ChannelRequest: " + request.getMessageID());

ChannelMessage2 channelMessage = new ChannelMessage2();
Expand Down Expand Up @@ -597,34 +601,37 @@ public void asyncMulticastChannelMessage2(ChannelRequest request) {
throw new Exception("not found agencyName");
}
}

logger.debug(
"FromOrg:{} nodes:{}",
request.getFromOrg(),
fromChannelConnections.getConnections().size());

for(ConnectionInfo connectionInfo: fromChannelConnections.getConnections()) {
ChannelHandlerContext ctx =
fromChannelConnections.getNetworkConnectionByHost(
connectionInfo.getHost(), connectionInfo.getPort());

if (ctx != null && ctx.channel().isActive()) {
ByteBuf out = ctx.alloc().buffer();
channelMessage.writeHeader(out);
channelMessage.writeExtra(out);

ctx.writeAndFlush(out);

logger.debug(
"send message to "
+ connectionInfo.getHost()
+ ":"
+ String.valueOf(connectionInfo.getPort())
+ " 成功");
} else {
logger.error("sending node unavailable, {}",
connectionInfo.getHost() + ":" + String.valueOf(connectionInfo.getPort()));
}

for (ConnectionInfo connectionInfo : fromChannelConnections.getConnections()) {
ChannelHandlerContext ctx =
fromChannelConnections.getNetworkConnectionByHost(
connectionInfo.getHost(), connectionInfo.getPort());

if (ctx != null && ctx.channel().isActive()) {
ByteBuf out = ctx.alloc().buffer();
channelMessage.writeHeader(out);
channelMessage.writeExtra(out);

ctx.writeAndFlush(out);

logger.debug(
"send message to "
+ connectionInfo.getHost()
+ ":"
+ String.valueOf(connectionInfo.getPort())
+ " 成功");
} else {
logger.error(
"sending node unavailable, {}",
connectionInfo.getHost()
+ ":"
+ String.valueOf(connectionInfo.getPort()));
}
}
} catch (Exception e) {
logger.error("send message fail:", e);
Expand Down Expand Up @@ -803,22 +810,22 @@ public void onReceiveChannelMessage2(ChannelHandlerContext ctx, ChannelMessage2
push.setCtx(ctx);
push.setTopic(message.getTopic());

push.setSeq(message.getSeq());
push.setMessageID(message.getSeq());
System.out.println("data length:"+message.getData().length);
logger.info("msg:"+Arrays.toString(message.getData()));
push.setContent(message.getData());
pushCallback.onPush(push);
} else {
logger.error("can not push,unset push callback");
}
} catch (Exception e) {
logger.error("push error:", e);
}
} else if (message.getType() == 0x31) { // 链上链下回包
logger.debug("channel message:{}", message.getSeq());
if (callback != null) {
logger.debug("found callback response");
push.setSeq(message.getSeq());
push.setMessageID(message.getSeq());
System.out.println("data length:" + message.getData().length);
logger.info("msg:" + Arrays.toString(message.getData()));
push.setContent(message.getData());
pushCallback.onPush(push);
} else {
logger.error("can not push,unset push callback");
}
} catch (Exception e) {
logger.error("push error:", e);
}
} else if (message.getType() == 0x31) { // 链上链下回包
logger.debug("channel message:{}", message.getSeq());
if (callback != null) {
logger.debug("found callback response");

ChannelResponse response = new ChannelResponse();
if (message.getResult() != 0) {
Expand Down Expand Up @@ -890,8 +897,8 @@ public void onReceiveTransactionMessage(ChannelHandlerContext ctx, BcosMessage m

try {
TransactionReceipt receipt =
objectMapper.readValue(message.getData(), TransactionReceipt.class);

ObjectMapperFactory.getObjectMapper()
.readValue(message.getData(), TransactionReceipt.class);
callback.onResponse(receipt);
} catch (Exception e) {
TransactionReceipt receipt = new TransactionReceipt();
Expand Down
56 changes: 25 additions & 31 deletions src/main/java/org/fisco/bcos/channel/dto/ChannelPush.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
public class ChannelPush {
static Logger logger = LoggerFactory.getLogger(ChannelPush.class);


private String keyID; // 链ID
private String orgApp; // 来源标识
private String version; // 版本
Expand All @@ -23,8 +22,7 @@ public class ChannelPush {
private Integer ttl; // TTL

private byte[] content; // 请求包体



public String getKeyID() {
return keyID;
}
Expand Down Expand Up @@ -97,34 +95,30 @@ public void setTtl(Integer ttl) {
this.ttl = ttl;
}

public byte[] getContent2() {
return content;
}


public String getContent()
{
if(content == null) {
return null;
}

String _content = new String(content);
return _content;
}

public void setContent(String content) {
if(content == null)
{
this.content = null;
return;
}
this.content = content.getBytes();
}


public void setContent(byte[] content) {
this.content = content;
}
public byte[] getContent2() {
return content;
}

public String getContent() {
if (content == null) {
return null;
}

String _content = new String(content);
return _content;
}

public void setContent(String content) {
if (content == null) {
this.content = null;
return;
}
this.content = content.getBytes();
}

public void setContent(byte[] content) {
this.content = content;
}

public void sendResponse(ChannelResponse response) {
logger.debug("send ChannelResponse seq:{}", response.getMessageID());
Expand Down
Loading

0 comments on commit f21ea53

Please sign in to comment.