Skip to content

[To dev/1.3] Pipe: Fix connection leak caused by clients not closed after task dropped (2 situations) (#15910) #15929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private void doTransferWrapper(

private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = getClientManager().getClient();

final TPipeTransferResp resp;
try {
Expand All @@ -164,7 +164,7 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri
final TSStatus status = resp.getStatus();
// Send handshake req and then re-transfer the event
if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
clientManager.sendHandshakeReq(clientAndStatus);
getClientManager().sendHandshakeReq(clientAndStatus);
}
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
Expand Down Expand Up @@ -203,7 +203,7 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
final long creationTime = snapshotEvent.getCreationTime();
final File snapshotFile = snapshotEvent.getSnapshotFile();
final File templateFile = snapshotEvent.getTemplateFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = getClientManager().getClient();

// 1. Transfer snapshotFile, and template File if exists
transferFilePieces(
Expand Down Expand Up @@ -250,7 +250,7 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
final TSStatus status = resp.getStatus();
// Send handshake req and then re-transfer the event
if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
clientManager.sendHandshakeReq(clientAndStatus);
getClientManager().sendHandshakeReq(clientAndStatus);
}
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ protected boolean executeOnce() {
}

private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
// DO NOT call heartbeat or transfer after closed, or will cause connection leak
if (isClosed.get()) {
return;
}

try {
outputPipeConnector.heartbeat();
outputPipeConnector.transfer(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,18 @@ public void onError(final Exception e) {
client.resetMethodStateIfStopped();
throw e;
} finally {
if (isClosed) {
try {
client.close();
client.invalidateAll();
} catch (final Exception e) {
LOGGER.warn(
"Failed to close client {}:{} after handshake failure when the manager is closed.",
targetNodeUrl.getIp(),
targetNodeUrl.getPort(),
e);
}
}
client.setShouldReturnSelf(true);
client.returnSelf();
}
Expand Down Expand Up @@ -348,8 +360,14 @@ public void close() {
if (clientManager != null) {
try {
clientManager.close();
LOGGER.info(
"Closed AsyncPipeDataTransferServiceClientManager for receiver attributes: {}",
receiverAttributes);
} catch (final Exception e) {
LOGGER.warn("Failed to close client manager.", e);
LOGGER.warn(
"Failed to close AsyncPipeDataTransferServiceClientManager for receiver attributes: {}",
receiverAttributes,
e);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ public synchronized void handshake() throws Exception {

@Override
public void heartbeat() throws Exception {
syncConnector.heartbeat();
if (!isClosed()) {
syncConnector.heartbeat();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ protected boolean tryTransfer(
if (connector.isClosed()) {
clearEventsReferenceCount();
connector.eliminateHandler(this);
client.setShouldReturnSelf(true);
client.returnSelf();
return false;
}
doTransfer(client, req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,25 @@ protected void onErrorInternal(final Exception exception) {
}

private void returnClientIfNecessary() {
if (client != null) {
client.setShouldReturnSelf(true);
client.returnSelf();
client = null;
if (client == null) {
return;
}

if (connector.isClosed()) {
try {
client.close();
client.invalidateAll();
} catch (final Exception e) {
LOGGER.warn(
"Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}",
client,
e.getMessage(),
e);
}
}
client.setShouldReturnSelf(true);
client.returnSelf();
client = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,19 @@ public V borrowClient(K node) throws ClientManagerException {
* return of a client is automatic whenever a particular client is used.
*/
public void returnClient(K node, V client) {
Optional.ofNullable(node)
.ifPresent(
x -> {
try {
pool.returnObject(node, client);
} catch (Exception e) {
LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e);
}
});
if (node != null) {
try {
pool.returnObject(node, client);
} catch (Exception e) {
LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e);
}
} else if (client instanceof ThriftClient) {
((ThriftClient) client).invalidateAll();
LOGGER.warn(
"Return client {} to pool failed because the node is null. "
+ "This may cause resource leak, please check your code.",
client);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void setTimeoutDynamically(final int timeout) {
}
}

private void close() {
public void close() {
___transport.close();
___currentMethod = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSslSyncConnector.class);

protected IoTDBSyncClientManager clientManager;
private volatile IoTDBSyncClientManager clientManager;

protected IoTDBSyncClientManager getClientManager() {
if (clientManager == null) {
throw new IllegalStateException("IoTDB sync client manager has been closed");
}
return clientManager;
}

@Override
public void validate(final PipeParameterValidator validator) throws Exception {
Expand Down Expand Up @@ -147,7 +154,7 @@ protected abstract IoTDBSyncClientManager constructClient(

@Override
public void handshake() throws Exception {
clientManager.checkClientStatusAndTryReconstructIfNecessary();
getClientManager().checkClientStatusAndTryReconstructIfNecessary();
}

@Override
Expand Down Expand Up @@ -222,7 +229,7 @@ protected void transferFilePieces(
// Send handshake req and then re-transfer the event
if (status.getCode()
== TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
clientManager.sendHandshakeReq(clientAndStatus);
getClientManager().sendHandshakeReq(clientAndStatus);
}
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
Expand All @@ -246,6 +253,7 @@ protected abstract PipeTransferFilePieceReq getTransferMultiFilePieceReq(
public void close() {
if (clientManager != null) {
clientManager.close();
clientManager = null;
}

super.close();
Expand Down
Loading