Skip to content

[To dev/1.3] Throw CANNOT_FETCH_FI_STATE(722) instead of 301/305 while DN restarting #15809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ public enum TSStatusCode {
QUERY_WAS_KILLED(715),
EXPLAIN_ANALYZE_FETCH_ERROR(716),
TOO_MANY_CONCURRENT_QUERIES_ERROR(717),
OPERATOR_NOT_FOUND(718),

QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719),
QUERY_TIMEOUT(720),
PLAN_FAILED_NETWORK_PARTITION(721),
CANNOT_FETCH_FI_STATE(722),

// Arithmetic
NUMERIC_VALUE_OUT_OF_RANGE(750),
DIVISION_BY_ZERO(751),
DATE_OUT_OF_RANGE(752),

// Authentication
INIT_AUTH_ERROR(800),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -48,19 +47,13 @@
public class QueryStateMachine {
private final StateMachine<QueryState> queryState;

// The executor will be used in all the state machines belonged to this query.
private Executor stateMachineExecutor;
private Throwable failureException;
private TSStatus failureStatus;

public QueryStateMachine(QueryId queryId, ExecutorService executor) {
this.stateMachineExecutor = executor;
this.queryState =
new StateMachine<>(
queryId.toString(),
this.stateMachineExecutor,
QUEUED,
QueryState.TERMINAL_INSTANCE_STATES);
queryId.toString(), executor, QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
}

public void addStateChangeListener(
Expand Down Expand Up @@ -109,9 +102,10 @@ public void transitionToCanceled() {
}

public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
this.failureException = throwable;
this.failureStatus = failureStatus;
transitionToDoneState(CANCELED);
if (transitionToDoneState(CANCELED)) {
this.failureException = throwable;
this.failureStatus = failureStatus;
}
}

public void transitionToAborted() {
Expand All @@ -123,20 +117,22 @@ public void transitionToFailed() {
}

public void transitionToFailed(Throwable throwable) {
this.failureException = throwable;
transitionToDoneState(FAILED);
if (transitionToDoneState(FAILED)) {
this.failureException = throwable;
}
}

public void transitionToFailed(TSStatus failureStatus) {
this.failureStatus = failureStatus;
transitionToDoneState(FAILED);
if (transitionToDoneState(FAILED)) {
this.failureStatus = failureStatus;
}
}

private void transitionToDoneState(QueryState doneState) {
private boolean transitionToDoneState(QueryState doneState) {
requireNonNull(doneState, "doneState is null");
checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);

queryState.setIf(doneState, currentState -> !currentState.isDone());
return queryState.setIf(doneState, currentState -> !currentState.isDone());
}

public String getFailureMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public Optional<TSStatus> getErrorCode() {
return Optional.ofNullable(errorCode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public enum FragmentInstanceState {
/** Instance execution failed. */
FAILED(true, true),
/** Instance is not found. */
NO_SUCH_INSTANCE(false, true);
NO_SUCH_INSTANCE(true, true);

public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
Stream.of(FragmentInstanceState.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,12 @@ private ExecutionResult getExecutionResult(QueryState state) {

// If RETRYING is triggered by this QueryExecution, the stateMachine.getFailureStatus() is also
// not null. We should only return the failure status when QueryExecution is in Done state.
if (state.isDone() && stateMachine.getFailureStatus() != null) {
tsstatus = stateMachine.getFailureStatus();
if (state.isDone()) {
if (analysis.getFailStatus() != null) {
tsstatus = analysis.getFailStatus();
} else if (stateMachine.getFailureStatus() != null) {
tsstatus = stateMachine.getFailureStatus();
}
}

// collect redirect info to client for writing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand All @@ -45,13 +46,15 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState.NO_SUCH_INSTANCE;

public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {

private static final Logger logger = LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);

private static final long SAME_STATE_PRINT_RATE_IN_MS = 10L * 60 * 1000;

// TODO: (xingtanzjr) consider how much Interval is OK for state tracker
// consider how much Interval is OK for state tracker
private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
private ScheduledFuture<?> trackTask;
private final Map<FragmentInstanceId, InstanceStateMetrics> instanceStateMap;
Expand Down Expand Up @@ -112,8 +115,8 @@ public synchronized void abort() {
aborted = true;
if (trackTask != null) {
boolean cancelResult = trackTask.cancel(true);
// TODO: (xingtanzjr) a strange case here is that sometimes
// the cancelResult is false but the trackTask is definitely cancelled
// a strange case here is that sometimes the cancelResult is false but the trackTask is
// definitely cancelled
if (!cancelResult) {
logger.debug("cancel state tracking task failed. {}", trackTask.isCancelled());
}
Expand Down Expand Up @@ -144,53 +147,71 @@ private void fetchStateAndUpdate() {
updateQueryState(instance.getId(), instanceInfo);
}
} catch (ClientManagerException | TException e) {
// TODO: do nothing ?
logger.warn("error happened while fetching query state", e);
// network exception, should retry
InstanceStateMetrics metrics =
instanceStateMap.computeIfAbsent(
instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
if (metrics.reachMaxRetryCount()) {
// if reach max retry count, we think that the DN is down, and FI in that node won't
// exist
FragmentInstanceInfo instanceInfo = new FragmentInstanceInfo(NO_SUCH_INSTANCE);
instanceInfo.setMessage(
String.format(
"Failed to fetch state, has retried %s times",
InstanceStateMetrics.MAX_STATE_FETCH_RETRY_COUNT));
updateQueryState(instance.getId(), instanceInfo);
} else {
// if not reaching max retry count, add retry count, and wait for next fetching schedule
metrics.addRetryCount();
logger.warn("error happened while fetching query state", e);
}
}
}
}
}

private void updateQueryState(FragmentInstanceId instanceId, FragmentInstanceInfo instanceInfo) {
// no such instance may be caused by DN restarting
if (instanceInfo.getState() == FragmentInstanceState.NO_SUCH_INSTANCE) {
if (instanceInfo.getState() == NO_SUCH_INSTANCE) {
stateMachine.transitionToFailed(
new RuntimeException(
new IoTDBException(
String.format(
"FragmentInstance[%s] is failed. %s, may be caused by DN restarting.",
instanceId, instanceInfo.getMessage())));
}
if (instanceInfo.getState().isFailed()) {
if (instanceInfo.getFailureInfoList() == null
instanceId, instanceInfo.getMessage()),
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode(),
true));
} else if (instanceInfo.getState().isFailed()) {
if (instanceInfo.getErrorCode().isPresent()) {
stateMachine.transitionToFailed(
new IoTDBException(
instanceInfo.getErrorCode().get().getMessage(),
instanceInfo.getErrorCode().get().getCode()));
} else if (instanceInfo.getFailureInfoList() == null
|| instanceInfo.getFailureInfoList().isEmpty()) {
stateMachine.transitionToFailed(
new RuntimeException(
String.format(
"FragmentInstance[%s] is failed. %s", instanceId, instanceInfo.getMessage())));
} else if (instanceInfo.getErrorCode().isPresent()) {
stateMachine.transitionToFailed(
new IoTDBException(
instanceInfo.getErrorCode().get().getMessage(),
instanceInfo.getErrorCode().get().getCode()));
} else {
stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException());
}
}
boolean queryFinished = false;
List<InstanceStateMetrics> rootInstanceStateMetricsList =
instanceStateMap.values().stream()
.filter(instanceStateMetrics -> instanceStateMetrics.isRootInstance)
.collect(Collectors.toList());
if (!rootInstanceStateMetricsList.isEmpty()) {
queryFinished =
rootInstanceStateMetricsList.stream()
.allMatch(
instanceStateMetrics ->
instanceStateMetrics.lastState == FragmentInstanceState.FINISHED);
}
} else {
boolean queryFinished = false;
List<InstanceStateMetrics> rootInstanceStateMetricsList =
instanceStateMap.values().stream()
.filter(instanceStateMetrics -> instanceStateMetrics.isRootInstance)
.collect(Collectors.toList());
if (!rootInstanceStateMetricsList.isEmpty()) {
queryFinished =
rootInstanceStateMetricsList.stream()
.allMatch(
instanceStateMetrics ->
instanceStateMetrics.lastState == FragmentInstanceState.FINISHED);
}

if (queryFinished) {
stateMachine.transitionToFinished();
if (queryFinished) {
stateMachine.transitionToFinished();
}
}
}

Expand All @@ -203,23 +224,39 @@ private boolean needPrintState(
}

private static class InstanceStateMetrics {
private static final long MAX_STATE_FETCH_RETRY_COUNT = 5;
private final boolean isRootInstance;
private FragmentInstanceState lastState;
private long durationToLastPrintInMS;
// we only record the continuous retry count
private int retryCount;

private InstanceStateMetrics(boolean isRootInstance) {
this.isRootInstance = isRootInstance;
this.lastState = null;
this.durationToLastPrintInMS = 0L;
this.retryCount = 0;
}

private void reset(FragmentInstanceState newState) {
this.lastState = newState;
this.durationToLastPrintInMS = 0L;
// each successful fetch, we need to reset the retry count
this.retryCount = 0;
}

private void addRetryCount() {
this.retryCount++;
}

private boolean reachMaxRetryCount() {
return retryCount >= MAX_STATE_FETCH_RETRY_COUNT;
}

private void addDuration(long duration) {
durationToLastPrintInMS += duration;
// each successful fetch, we need to reset the retry count
this.retryCount = 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,15 @@ public static TSStatus onQueryException(Exception e, String operation, TSStatusC
if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.SEMANTIC_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()
|| status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()) {
LOGGER.warn(message);
|| status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()
|| status.getCode() == TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode()
|| status.getCode() == TSStatusCode.DIVISION_BY_ZERO.getStatusCode()
|| status.getCode() == TSStatusCode.DATE_OUT_OF_RANGE.getStatusCode()
|| status.getCode() == TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()
|| status.getCode() == TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()) {
LOGGER.info(message);
} else {
LOGGER.warn(message, e);
}
Expand Down
Loading