Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: using clusterId to judge whether the target cluster is source cluster #11994

Merged
merged 24 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
SteveYurongSu committed Feb 2, 2024
commit cb63937b2c880289ea09a77e4a287e8655e13138
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,21 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class PipeRuntimeAgent implements IService {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private static volatile String clusterId = null;

private final AtomicBoolean isShutdown = new AtomicBoolean(false);

private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();
private final AtomicReference<String> clusterId = new AtomicReference<>(null);

private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
new SimpleConsensusProgressIndexAssigner();

//////////////////////////// Getter ////////////////////////////

public static synchronized String getClusterId() {
if (clusterId == null) {
try (ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
clusterId = configNodeClient.getClusterId().clusterId;
} catch (Exception e) {
LOGGER.warn("Unable to get clusterId, because: {}", e.getMessage(), e);
}
}

return clusterId;
}
private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();

//////////////////////////// System Service Interface ////////////////////////////

Expand Down Expand Up @@ -122,6 +108,22 @@ public ServiceType getID() {
return ServiceType.PIPE_RUNTIME_AGENT;
}

public String getClusterIdIfPossible() {
if (clusterId.get() == null) {
synchronized (clusterId) {
if (clusterId.get() == null) {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
clusterId.set(configNodeClient.getClusterId().getClusterId());
} catch (Exception e) {
LOGGER.warn("Unable to get clusterId, because: {}", e.getMessage(), e);
}
}
}
}
return clusterId.get();
}

////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////

public void assignSimpleProgressIndexIfNeeded(InsertNode insertNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeConnectorConstant;
Expand Down Expand Up @@ -200,7 +200,9 @@ public void handshake() throws Exception {

// Try to handshake by PipeTransferHandshakeV2Req.
HashMap<String, String> params = new HashMap<>();
params.put(PipeConnectorConstant.HANDSHAKE_KEY_CLUSTER_ID, PipeRuntimeAgent.getClusterId());
params.put(
PipeConnectorConstant.HANDSHAKE_KEY_CLUSTER_ID,
PipeAgent.runtime().getClusterIdIfPossible());
params.put(
PipeConnectorConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
Expand Down Expand Up @@ -175,7 +175,9 @@ public void onError(Exception e) {

// Try to handshake by PipeTransferHandshakeV2Req.
HashMap<String, String> params = new HashMap<>();
params.put(PipeConnectorConstant.HANDSHAKE_KEY_CLUSTER_ID, PipeRuntimeAgent.getClusterId());
params.put(
PipeConnectorConstant.HANDSHAKE_KEY_CLUSTER_ID,
PipeAgent.runtime().getClusterIdIfPossible());
params.put(
PipeConnectorConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
Expand Down Expand Up @@ -152,7 +152,9 @@ public void sendHandshakeReq(
params.put(
PipeConnectorConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
params.put(PipeConnectorConstant.HANDSHAKE_KEY_CLUSTER_ID, PipeRuntimeAgent.getClusterId());
params.put(
PipeConnectorConstant.HANDSHAKE_KEY_CLUSTER_ID,
PipeAgent.runtime().getClusterIdIfPossible());

TPipeTransferResp resp =
clientAndStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
Expand Down Expand Up @@ -254,7 +254,7 @@ private TPipeTransferResp handleTransferHandshakeV1(PipeTransferHandshakeV1Req r
private TPipeTransferResp handleTransferHandshakeV2(PipeTransferHandshakeV2Req req)
throws IOException {
// Reject to handshake if the receiver can not take clusterId from configNode.
String clusterId = PipeRuntimeAgent.getClusterId();
String clusterId = PipeAgent.runtime().getClusterIdIfPossible();
if (clusterId == null) {
final TSStatus status =
RpcUtils.getStatus(
Expand Down
Loading