Skip to content

Commit b3a2142

Browse files
Pipe: Support syncing table model data between clusters (airgap) (#13828)
1 parent 48593d7 commit b3a2142

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
2828
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
2929
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
30+
import org.apache.iotdb.db.protocol.session.ClientSession;
31+
import org.apache.iotdb.db.protocol.session.SessionManager;
32+
import org.apache.iotdb.db.queryengine.plan.Coordinator;
3033
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
3134
import org.apache.iotdb.rpc.TSStatusCode;
3235
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -73,6 +76,9 @@ public void runMayThrow() throws Throwable {
7376

7477
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, socket);
7578

79+
final ClientSession session = new ClientSession(socket);
80+
SessionManager.getInstance().registerSession(session);
81+
7682
try {
7783
while (!socket.isClosed()) {
7884
isELanguagePayload = false;
@@ -91,6 +97,8 @@ public void runMayThrow() throws Throwable {
9197
throw e;
9298
} finally {
9399
PipeDataNodeAgent.receiver().thrift().handleClientExit();
100+
SessionManager.getInstance()
101+
.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
94102
socket.close();
95103
}
96104
}

0 commit comments

Comments
 (0)