Skip to content

Commit

Permalink
Merge pull request #299 from fqliao/release-2.0.0-rc3
Browse files Browse the repository at this point in the history
 remove static nodeToBlockNumberMap
  • Loading branch information
fqliao authored Jun 12, 2019
2 parents 20b469b + 499bc74 commit 0c575d6
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 44 deletions.
27 changes: 20 additions & 7 deletions src/main/java/org/fisco/bcos/channel/client/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.fisco.bcos.channel.dto.ChannelPush2;
import org.fisco.bcos.channel.dto.ChannelRequest;
import org.fisco.bcos.channel.dto.ChannelResponse;
import org.fisco.bcos.channel.handler.*;
import org.fisco.bcos.channel.handler.ChannelConnections;
import org.fisco.bcos.channel.handler.ConnectionCallback;
import org.fisco.bcos.channel.handler.ConnectionInfo;
import org.fisco.bcos.channel.handler.GroupChannelConnectionsConfig;
import org.fisco.bcos.channel.handler.Message;
import org.fisco.bcos.web3j.protocol.core.methods.response.TransactionReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,7 +50,7 @@ public class Service {
private int groupId;
private static ObjectMapper objectMapper = new ObjectMapper();
private BigInteger number = BigInteger.valueOf(0);

private ConcurrentHashMap<String, Integer> nodeToBlockNumberMap = new ConcurrentHashMap<>();
/** add transaction seq callback */
private Map<String, Object> seq2TransactionCallback = new ConcurrentHashMap<String, Object>();

Expand All @@ -63,6 +67,14 @@ public void setTopics(Set<String> topics) {
}
}

public ConcurrentHashMap<String, Integer> getNodeToBlockNumberMap() {
return nodeToBlockNumberMap;
}

public void setNodeToBlockNumberMap(ConcurrentHashMap<String, Integer> nodeToBlockNumberMap) {
this.nodeToBlockNumberMap = nodeToBlockNumberMap;
}

public boolean flushTopics(List<String> topics) {

try {
Expand Down Expand Up @@ -398,7 +410,8 @@ public void asyncSendEthereumMessage(BcosRequest request, BcosResponseCallback c
throw new Exception("not found agencyName");
}
}
ChannelHandlerContext ctx = channelConnections.randomNetworkConnection();
ChannelHandlerContext ctx =
channelConnections.randomNetworkConnection(nodeToBlockNumberMap);

ByteBuf out = ctx.alloc().buffer();
bcosMessage.writeHeader(out);
Expand Down Expand Up @@ -440,7 +453,7 @@ public void run(Timeout timeout) throws Exception {
response.setErrorCode(-1);
response.setErrorMessage(
e.getMessage()
+ "Requset send failed! Can not connect to nodes success, please checkout the node status and the sdk config!");
+ " Requset send failed! Can not connect to nodes success, please checkout the node status and the sdk config!");
response.setContent("");
response.setMessageID(request.getMessageID());

Expand Down Expand Up @@ -754,11 +767,11 @@ public void onReceiveBlockNotify(ChannelHandlerContext ctx, ChannelMessage2 mess
int port = socketChannel.remoteAddress().getPort();
Integer number = Integer.parseInt(split[1]);

ChannelConnections.nodeToBlockNumberMap.put(hostAddress + port, number);
nodeToBlockNumberMap.put(hostAddress + port, number);
// get max blockNumber to set blocklimit
Integer maxBlockNumber = number;
for (String key : ChannelConnections.nodeToBlockNumberMap.keySet()) {
int blockNumber = ChannelConnections.nodeToBlockNumberMap.get(key);
for (String key : nodeToBlockNumberMap.keySet()) {
int blockNumber = nodeToBlockNumberMap.get(key);
if (blockNumber >= maxBlockNumber) {
maxBlockNumber = blockNumber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public class ChannelConnections {
new HashMap<String, ChannelHandlerContext>();
private int groupId;
private Bootstrap bootstrap = new Bootstrap();
public static ConcurrentHashMap<String, Integer> nodeToBlockNumberMap =
new ConcurrentHashMap<>();
ServerBootstrap serverBootstrap = new ServerBootstrap();

public int getGroupId() {
Expand Down Expand Up @@ -149,7 +147,8 @@ public void setCaCertPath(String caCertPath) {
this.caCertPath = caCertPath;
}

public ChannelHandlerContext randomNetworkConnection() throws Exception {
public ChannelHandlerContext randomNetworkConnection(
ConcurrentHashMap<String, Integer> nodeToBlockNumberMap) throws Exception {
List<ChannelHandlerContext> activeConnections = new ArrayList<ChannelHandlerContext>();

for (String key : networkConnections.keySet()) {
Expand All @@ -167,31 +166,33 @@ public ChannelHandlerContext randomNetworkConnection() throws Exception {
List<ChannelHandlerContext> maxBlockNumberConnections =
new ArrayList<ChannelHandlerContext>();
long maxBlockNumber = 0;
for (String key : nodeToBlockNumberMap.keySet()) {
int blockNumber = nodeToBlockNumberMap.get(key);
if (blockNumber >= maxBlockNumber) {
if (blockNumber > maxBlockNumber) {
maxBlockNumberConnections.clear();
}
if (nodeToBlockNumberMap != null) {
for (String key : nodeToBlockNumberMap.keySet()) {
int blockNumber = nodeToBlockNumberMap.get(key);
if (blockNumber >= maxBlockNumber) {
if (blockNumber > maxBlockNumber) {
maxBlockNumberConnections.clear();
}

Optional<ChannelHandlerContext> optionalCtx =
activeConnections
.stream()
.filter(
x ->
key.equals(
((SocketChannel) x.channel())
.remoteAddress()
.getAddress()
.getHostAddress()
+ ((SocketChannel) x.channel())
.remoteAddress()
.getPort()))
.findFirst();
if (optionalCtx.isPresent()) {
ChannelHandlerContext channelHandlerContext = optionalCtx.get();
maxBlockNumberConnections.add(channelHandlerContext);
maxBlockNumber = blockNumber;
Optional<ChannelHandlerContext> optionalCtx =
activeConnections
.stream()
.filter(
x ->
key.equals(
((SocketChannel) x.channel())
.remoteAddress()
.getAddress()
.getHostAddress()
+ ((SocketChannel) x.channel())
.remoteAddress()
.getPort()))
.findFirst();
if (optionalCtx.isPresent()) {
ChannelHandlerContext channelHandlerContext = optionalCtx.get();
maxBlockNumberConnections.add(channelHandlerContext);
maxBlockNumber = blockNumber;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ public void onResponse(BcosResponse response) {
response.getContent(), BlockNumber.class);
SocketChannel socketChannel = (SocketChannel) ctx.channel();
InetSocketAddress socketAddress = socketChannel.remoteAddress();
ChannelConnections.nodeToBlockNumberMap.put(
socketAddress.getAddress().getHostAddress()
+ socketAddress.getPort(),
blockNumber.getBlockNumber().intValue());
channelService
.getNodeToBlockNumberMap()
.put(
socketAddress.getAddress().getHostAddress()
+ socketAddress.getPort(),
blockNumber.getBlockNumber().intValue());
} catch (Exception e) {
throw new MessageDecodingException(response.getContent());
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/fisco/bcos/channel/proxy/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
import io.netty.util.Timer;
import java.io.UnsupportedEncodingException;
import java.security.SecureRandom;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLException;
import org.fisco.bcos.channel.handler.ChannelConnections;
Expand Down Expand Up @@ -462,7 +468,7 @@ public void onRemoteMessage(ChannelHandlerContext ctx, Message message) {
// 没有这个seq,可能是新发请求或者新收到的push

// 其他消息(链上链下一期),随机发
localCtx = localConnections.randomNetworkConnection();
localCtx = localConnections.randomNetworkConnection(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private String getWeb3jVersion() {

private FieldSpec createBinaryDefinition(String binary) {
return FieldSpec.builder(String.class, BINARY)
.addModifiers(Modifier.PRIVATE, Modifier.FINAL, Modifier.STATIC)
.addModifiers(Modifier.PUBLIC, Modifier.FINAL, Modifier.STATIC)
.initializer("$S", binary)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class PrecompiledCommon {
public static final int PermissionDenied_RC1 = 80;
public static final int PermissionDenied = 50000;
public static final int PermissionDenied_RC3 = -50000;
public static final int TableExist = -50001;
public static final int TableExist = 50001;
public static final int TableExist_RC3 = -50001;
public static final int TableNameAndAddressExist_RC1 = 56;
public static final int TableNameAndAddressExist = 51000;
public static final int TableNameAndAddressExist_RC3 = -51000;
Expand Down Expand Up @@ -77,6 +78,8 @@ public static String transferToJson(int code) throws IOException {
msg = "table name and address does not exist";
} else if (code == LastSealer) {
msg = "the last sealer cannot be removed";
} else if (code == TableExist) {
msg = "table already exist";
} else if (code == InvalidKey) {
msg = "invalid configuration entry";
}
Expand All @@ -89,6 +92,8 @@ public static String transferToJson(int code) throws IOException {
msg = "table name and address does not exist";
} else if (code == LastSealer_RC3) {
msg = "the last sealer cannot be removed";
} else if (code == TableExist_RC3) {
msg = "table already exist";
} else if (code == InvalidKey_RC3) {
msg = "invalid configuration entry";
}
Expand All @@ -107,8 +112,6 @@ public static String transferToJson(int code) throws IOException {
msg = "the node is already in the observer list";
} else if (code == ContractNameAndVersionExist) {
msg = "contract name and version already exist";
} else if (code == TableExist) {
msg = "table already exist";
} else if (code == VersionExceeds) {
msg = "version string length exceeds the maximum limit";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public Table desc(String tableName) throws Exception {
List<Map<String, String>> userTable = select(table, condition);
Table tableInfo = new Table();
if (userTable.size() != 0) {
tableInfo.setTableName(tableName);
tableInfo.setKey(userTable.get(0).get("key_field"));
tableInfo.setValueFields(userTable.get(0).get("value_field"));
} else {
Expand Down

0 comments on commit 0c575d6

Please sign in to comment.