Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
Expand Down Expand Up @@ -259,16 +258,14 @@ private boolean checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
analysis.setFailStatus(
RpcUtils.getStatus(
TSStatusCode.LOAD_FILE_ERROR,
"TSFile encryption is enabled, and the Load TSFile function is disabled"));
"TsFile encryption is enabled, and the Load TsFile function is disabled"));
return false;
}

// check if the system is read only
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY, LoadReadOnlyException.MESSAGE));
return false;
LOGGER.info(
"LoadTsFileAnalyzer: Current datanode is read only, will try to convert to tablets and insert later.");
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ public void start() {
} catch (Exception e) {
isLoadSuccess = false;
failedTsFileNodeIndexes.add(i);
stateMachine.transitionToFailed(e);
LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, e);
} finally {
if (shouldRemoveFileFromLoadingSet) {
Expand Down Expand Up @@ -319,19 +318,16 @@ private boolean firstPhase(LoadSingleTsFileNode node) {
node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData)
.splitTsFileByDataPartition();
if (!tsFileDataManager.sendAllTsFileData()) {
stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
} catch (IllegalStateException e) {
stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format(
"Dispatch TsFileData error when parsing TsFile %s.",
node.getTsFileResource().getTsFile()),
e);
return false;
} catch (Exception e) {
stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format("Parse or send TsFile %s error.", node.getTsFileResource().getTsFile()), e);
return false;
Expand Down Expand Up @@ -361,7 +357,6 @@ private boolean dispatchOnePieceNode(
dispatchResultFuture.get(
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(), TimeUnit.SECONDS);
if (!result.isSuccessful()) {
// TODO: retry.
LOGGER.warn(
"Dispatch one piece to ReplicaSet {} error. Result status code {}. "
+ "Result status message {}. Dispatch piece node error:%n{}",
Expand All @@ -381,21 +376,18 @@ private boolean dispatchOnePieceNode(
status.setMessage(
String.format("Load %s piece error in 1st phase. Because ", pieceNode.getTsFile())
+ status.getMessage());
stateMachine.transitionToFailed(status); // TODO: record more status
return false;
}
} catch (InterruptedException | ExecutionException | CancellationException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
stateMachine.transitionToFailed(e);
return false;
} catch (TimeoutException e) {
dispatchResultFuture.cancel(true);
LOGGER.warn(
String.format("Wait for loading %s time out.", LoadTsFilePieceNode.class.getName()), e);
stateMachine.transitionToFailed(e);
return false;
}
return true;
Expand Down Expand Up @@ -436,7 +428,6 @@ private boolean secondPhase(

FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
// TODO: retry.
LOGGER.warn(
"Dispatch load command {} of TsFile {} error to replicaSets {} error. "
+ "Result status code {}. Result status message {}.",
Expand All @@ -450,19 +441,16 @@ private boolean secondPhase(
String.format(
"Load %s error in second phase. Because %s, first phase is %s",
tsFile, status.getMessage(), isFirstPhaseSuccess ? "success" : "failed"));
stateMachine.transitionToFailed(status);
return false;
}
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
stateMachine.transitionToFailed(e);
return false;
} catch (Exception e) {
LOGGER.warn("Exception occurred during second phase of loading TsFile {}.", tsFile, e);
stateMachine.transitionToFailed(e);
return false;
}
return true;
Expand Down Expand Up @@ -523,7 +511,6 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
node.getTsFileResource().getTsFile(),
TSStatusCode.representOf(e.getFailureStatus().getCode()).name(),
e.getFailureStatus().getMessage()));
stateMachine.transitionToFailed(e.getFailureStatus());
return false;
}

Expand Down Expand Up @@ -627,14 +614,16 @@ private void convertFailedTsFilesToTabletsAndRetry() {
// If all failed TsFiles are converted into tablets and inserted,
// we can consider the load process as successful.
if (failedTsFileNodeIndexes.isEmpty()) {
LOGGER.info("Load: all failed TsFiles are converted to tablets and inserted.");
stateMachine.transitionToFinished();
} else {
stateMachine.transitionToFailed(
new LoadFileException(
"Failed to load some TsFiles by converting them into tablets. Failed TsFiles: "
+ failedTsFileNodeIndexes.stream()
.map(i -> tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
.collect(Collectors.joining(", "))));
final String errorMsg =
"Load: failed to load some TsFiles by converting them into tablets. Failed TsFiles: "
+ failedTsFileNodeIndexes.stream()
.map(i -> tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
.collect(Collectors.joining(", "));
LOGGER.warn(errorMsg);
stateMachine.transitionToFailed(new LoadFileException(errorMsg));
}
}

Expand Down
Loading