Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,17 @@ private String generatePseudoQueryId() {

@Override
protected String getClusterId() {
return ConfigNode.getInstance().getConfigManager().getClusterManager().getClusterId();
return configManager.getClusterManager().getClusterId();
}

@Override
protected TSStatus tryLogin() {
// Do nothing. Login check will be done in the data node receiver.
return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
protected boolean shouldLogin() {
return lastSuccessfulLoginTime == Long.MIN_VALUE || super.shouldLogin();
}

@Override
protected TSStatus login() {
return configManager.login(username, password).getStatus();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand Down Expand Up @@ -147,8 +146,6 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {

private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();

private long lastSuccessfulLoginTime = Long.MIN_VALUE;

private PipeMemoryBlock allocatedMemoryBlock;

static {
Expand Down Expand Up @@ -415,29 +412,10 @@ protected String getClusterId() {
}

@Override
protected TSStatus tryLogin() {
final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
final long loginPeriodicVerificationIntervalMs =
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
if (clientSession == null
|| !clientSession.isLogin()
|| (loginPeriodicVerificationIntervalMs >= 0
&& lastSuccessfulLoginTime
< System.currentTimeMillis() - loginPeriodicVerificationIntervalMs)) {
final TSStatus status =
SESSION_MANAGER.login(
SESSION_MANAGER.getCurrSession(),
username,
password,
ZoneId.systemDefault().toString(),
SessionManager.CURRENT_RPC_VERSION,
IoTDBConstant.ClientVersion.V_1_0);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
lastSuccessfulLoginTime = System.currentTimeMillis();
}
return status;
}
return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
protected boolean shouldLogin() {
// The idle time is updated per request
final IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
return clientSession == null || !clientSession.isLogin() || super.shouldLogin();
}

@Override
Expand Down Expand Up @@ -715,6 +693,9 @@ private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement sta
}

final TSStatus status = executeStatement(statement);

// Try to convert data type if the statement is a tree model statement
// and the status code is not success
return shouldConvertDataTypeOnTypeMismatch
&& ((statement instanceof InsertBaseStatement
&& ((InsertBaseStatement) statement).hasFailedMeasurements())
Expand All @@ -725,34 +706,14 @@ private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement sta
}

private TSStatus executeStatement(final Statement statement) {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
final long loginPeriodicVerificationIntervalMs =
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
if (clientSession == null
|| !clientSession.isLogin()
|| (loginPeriodicVerificationIntervalMs >= 0
&& lastSuccessfulLoginTime
< System.currentTimeMillis() - loginPeriodicVerificationIntervalMs)) {
final BasicOpenSessionResp openSessionResp =
SESSION_MANAGER.login(
SESSION_MANAGER.getCurrSession(),
username,
password,
ZoneId.systemDefault().toString(),
SessionManager.CURRENT_RPC_VERSION,
IoTDBConstant.ClientVersion.V_1_0);
if (openSessionResp.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Receiver id = {}: Failed to open session, username = {}, response = {}.",
receiverId.get(),
username,
openSessionResp);
return RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
}
lastSuccessfulLoginTime = System.currentTimeMillis();
clientSession = SESSION_MANAGER.getCurrSession();
// Permission check
final TSStatus loginStatus = loginIfNecessary();
if (loginStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return loginStatus;
}

final IClientSession clientSession = SESSION_MANAGER.getCurrSession();

final TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
Expand All @@ -777,6 +738,19 @@ private TSStatus executeStatement(final Statement statement) {
.status;
}

@Override
protected TSStatus login() {
final BasicOpenSessionResp openSessionResp =
SESSION_MANAGER.login(
SESSION_MANAGER.getCurrSession(),
username,
password,
ZoneId.systemDefault().toString(),
SessionManager.CURRENT_RPC_VERSION,
IoTDBConstant.ClientVersion.V_1_0);
return RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
}

@Override
public synchronized void handleExit() {
if (Objects.nonNull(configReceiverId.get())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver {
protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;

private static final long LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS =
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
protected long lastSuccessfulLoginTime = Long.MIN_VALUE;

private File writingFile;
private RandomAccessFile writingFileWriter;

Expand Down Expand Up @@ -260,7 +264,7 @@ protected TPipeTransferResp handleTransferHandshakeV2(final PipeTransferHandshak
if (passwordString != null) {
password = passwordString;
}
final TSStatus status = tryLogin();
final TSStatus status = loginIfNecessary();
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Receiver id = {}: Handshake failed because login failed, response status = {}.",
Expand Down Expand Up @@ -313,7 +317,30 @@ protected PipeRequestType getPlanType() {

protected abstract String getClusterId();

protected abstract TSStatus tryLogin();
protected boolean shouldLogin() {
return LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
&& lastSuccessfulLoginTime
< System.currentTimeMillis() - LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS;
}

protected TSStatus loginIfNecessary() {
if (shouldLogin()) {
final TSStatus permissionCheckStatus = login();
if (permissionCheckStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Receiver id = {}: Failed to login, username = {}, response = {}.",
receiverId.get(),
username,
permissionCheckStatus);
return permissionCheckStatus;
} else {
lastSuccessfulLoginTime = System.currentTimeMillis();
}
}
return StatusUtils.OK;
}

protected abstract TSStatus login();

protected final TPipeTransferResp handleTransferFilePiece(
final PipeTransferFilePieceReq req,
Expand Down
Loading