Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,20 @@ public class TransportClient implements Closeable {
private final Channel channel;
private final TransportResponseHandler handler;
@Nullable private String clientId;
private volatile boolean timedOut;

public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
this.timedOut = false;
}

public Channel getChannel() {
return channel;
}

public boolean isActive() {
return channel.isOpen() || channel.isActive();
return !timedOut && (channel.isOpen() || channel.isActive());
}

public SocketAddress getSocketAddress() {
Expand Down Expand Up @@ -263,6 +265,11 @@ public void onFailure(Throwable e) {
}
}

/** Mark this channel as having timed out. */
public void timeOut() {
this.timedOut = true;
}

@Override
public void close() {
// close is a local operation and should finish with milliseconds; timeout just to be safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,19 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
TransportClient cachedClient = clientPool.clients[clientIndex];

if (cachedClient != null && cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address, cachedClient);
return cachedClient;
// Make sure that the channel will not timeout by updating the last use time of the
// handler. Then check that the client is still alive, in case it timed out before
// this code was able to update things.
TransportChannelHandler handler = cachedClient.getChannel().pipeline()
.get(TransportChannelHandler.class);
synchronized (handler) {
handler.getResponseHandler().updateTimeOfLastRequest();
}

if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address, cachedClient);
return cachedClient;
}
}

// If we reach here, we don't have an existing connection open. Let's create a new one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public TransportResponseHandler(Channel channel) {
}

public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
updateTimeOfLastRequest();
outstandingFetches.put(streamChunkId, callback);
}

Expand All @@ -78,7 +78,7 @@ public void removeFetchRequest(StreamChunkId streamChunkId) {
}

public void addRpcRequest(long requestId, RpcResponseCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
updateTimeOfLastRequest();
outstandingRpcs.put(requestId, callback);
}

Expand Down Expand Up @@ -216,4 +216,9 @@ public long getTimeOfLastRequestNs() {
return timeOfLastRequestNs.get();
}

/** Updates the time of the last request to the current system time. */
public void updateTimeOfLastRequest() {
timeOfLastRequestNs.set(System.nanoTime());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,32 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
// there are outstanding requests, we also do a secondary consistency check to ensure
// there's no race between the idle timeout and incrementing the numOutstandingRequests
// (see SPARK-7003).
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
if (responseHandler.numOutstandingRequests() > 0) {
String address = NettyUtils.getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
"is wrong.", address, requestTimeoutNs / 1000 / 1000);
ctx.close();
} else if (closeIdleConnections) {
// While CloseIdleConnections is enable, we also close idle connection
ctx.close();
//
// To avoid a race between TransportClientFactory.createClient() and this code which could
// result in an inactive client being returned, this needs to run in a synchronized block.
synchronized (this) {
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
if (responseHandler.numOutstandingRequests() > 0) {
String address = NettyUtils.getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
"is wrong.", address, requestTimeoutNs / 1000 / 1000);
client.timeOut();
ctx.close();
} else if (closeIdleConnections) {
// While CloseIdleConnections is enable, we also close idle connection
client.timeOut();
ctx.close();
}
}
}
}
}

public TransportResponseHandler getResponseHandler() {
return responseHandler;
}

}