Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.transport.AbstractClient;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -86,79 +88,100 @@ public ChannelPipeline getPipeline() {
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
InetSocketAddress connectAddress = getConnectAddress();
ChannelFuture future = bootstrap.connect(connectAddress);
long connectTimeout = getConnectTimeout();
long deadline = start + connectTimeout;
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel "
+ newChannel);
while (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should limit the maximum times here

boolean ret = future.awaitUninterruptibly(connectTimeout, TimeUnit.MILLISECONDS);

if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// copy reference
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel
+ " on create new netty channel " + newChannel);
}
// Close old channel
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info(
"Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
} else {
NettyClient.this.channel = newChannel;
}
}
break;
} else if (future.getCause() != null) {
Throwable cause = future.getCause();

if (cause instanceof ClosedChannelException) {
// Netty3.2.10 ClosedChannelException issue, see https://github.com/netty/netty/issues/138
connectTimeout = deadline - System.currentTimeMillis();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlbumenJ
connectTimeout will be recalculated at here.

if (connectTimeout > 0) {
// 6-1 - Retry connect to provider server by Netty3.2.10 ClosedChannelException issue#138.
logger.warn(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"Netty3.2.10 ClosedChannelException issue#138",
"",
"Retry connect to provider server.");
future = bootstrap.connect(connectAddress);
continue;
}
} else {
NettyClient.this.channel = newChannel;
}
RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress()
+ ", error message is:" + cause.getMessage(),
cause);

// 6-1 - Failed to connect to provider server by other reason.
logger.error(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"network disconnected",
"",
"Failed to connect to provider server by other reason.",
cause);

throw remotingException;
} else {

RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress()
+ " client-side timeout " + getConnectTimeout() + "ms (elapsed: "
+ (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());

// 6-2 - Client-side timeout.
logger.error(
TRANSPORT_CLIENT_CONNECT_TIMEOUT,
"provider crash",
"",
"Client-side timeout.",
remotingException);

throw remotingException;
}
} else if (future.getCause() != null) {
Throwable cause = future.getCause();

RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress()
+ ", error message is:" + cause.getMessage(),
cause);

// 6-1 - Failed to connect to provider server by other reason.
logger.error(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"network disconnected",
"",
"Failed to connect to provider server by other reason.",
cause);

throw remotingException;
} else {

RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());

// 6-2 - Client-side timeout.
logger.error(
TRANSPORT_CLIENT_CONNECT_TIMEOUT,
"provider crash",
"",
"Client-side timeout.",
remotingException);

throw remotingException;
}
} finally {
if (!isConnected()) {
Expand Down
Loading