Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -100,8 +101,9 @@ private void scan() throws IOException {

final boolean isGeneratedByPipe =
listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
final File listeningDirFile = new File(listeningDir);
try (final Stream<File> fileStream =
FileUtils.streamFiles(new File(listeningDir), true, (String[]) null)) {
FileUtils.streamFiles(listeningDirFile, true, (String[]) null)) {
try {
fileStream
.filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file))
Expand All @@ -114,7 +116,15 @@ private void scan() throws IOException {
.filter(this::isTsFileCompleted)
.limit(currentAllowedPendingSize)
.forEach(
file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, isGeneratedByPipe));
file -> {
final File parentFile = new File(file).getParentFile();
activeLoadTsFileLoader.tryTriggerTsFileLoad(
file,
parentFile != null
&& !Objects.equals(
parentFile.getAbsoluteFile(), listeningDirFile.getAbsoluteFile()),
isGeneratedByPipe);
});
} catch (final Exception e) {
LOGGER.warn("Exception occurred during scanning dir: {}", listeningDir, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.commons.conf.CommonDescriptor;

import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,41 +43,41 @@ public class ActiveLoadFailedMessageHandler {
filePair ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to memory constraints, will retry later.",
filePair.getLeft(),
filePair.getRight()));
filePair.getFile(),
filePair.isGeneratedByPipe()));
// system is read only
put(
"read only",
filePair ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the system is read only, will retry later.",
filePair.getLeft(),
filePair.getRight()));
filePair.getFile(),
filePair.isGeneratedByPipe()));
// Timed out to wait for procedure return. The procedure is still running.
put(
"procedure return",
filePair ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to time out to wait for procedure return, will retry later.",
filePair.getLeft(),
filePair.getRight()));
filePair.getFile(),
filePair.isGeneratedByPipe()));
// DataNode is not enough, please register more.
put(
"not enough",
filePair ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the datanode is not enough, will retry later.",
filePair.getLeft(),
filePair.getRight()));
filePair.getFile(),
filePair.isGeneratedByPipe()));
// Fail to connect to any config node. Please check status of ConfigNodes or logs of
// connected DataNode.
put(
"any config node",
filePair ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to fail to connect to any config node, will retry later.",
filePair.getLeft(),
filePair.getRight()));
filePair.getFile(),
filePair.isGeneratedByPipe()));
// Current query is time out, query start time is 1729653161797, ddl is
// -3046040214706, current time is 1729653184210, please check your statement or
// modify timeout parameter
Expand All @@ -87,26 +86,26 @@ public class ActiveLoadFailedMessageHandler {
filePair ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to current query is time out, will retry later.",
filePair.getLeft(),
filePair.getRight()));
filePair.getFile(),
filePair.isGeneratedByPipe()));
}
});

@FunctionalInterface
private interface ExceptionMessageHandler {
void handle(final Pair<String, Boolean> filePair);
void handle(final ActiveLoadPendingQueue.ActiveLoadEntry entry);
}

public static boolean isExceptionMessageShouldRetry(
final Pair<String, Boolean> filePair, final String message) {
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String message) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(filePair);
EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(entry);
return true;
}

for (String key : EXCEPTION_MESSAGE_HANDLER_MAP.keySet()) {
if (message != null && message.contains(key)) {
EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(filePair);
EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(entry);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;

import org.apache.tsfile.utils.Pair;

import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
Expand All @@ -31,30 +29,31 @@
public class ActiveLoadPendingQueue {

private final Set<String> pendingFileSet = new HashSet<>();
private final Queue<Pair<String, Boolean>> pendingFileQueue = new ConcurrentLinkedQueue<>();
private final Queue<ActiveLoadEntry> pendingFileQueue = new ConcurrentLinkedQueue<>();

private final Set<String> loadingFileSet = new HashSet<>();

public synchronized boolean enqueue(final String file, final boolean isGeneratedByPipe) {
public synchronized boolean enqueue(
final String file, final boolean isGeneratedByPipe, final boolean isTableModel) {
if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
pendingFileQueue.offer(new ActiveLoadEntry(file, isGeneratedByPipe, isTableModel));

ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
return true;
}
return false;
}

public synchronized Pair<String, Boolean> dequeueFromPending() {
final Pair<String, Boolean> pair = pendingFileQueue.poll();
if (pair != null) {
pendingFileSet.remove(pair.left);
loadingFileSet.add(pair.left);
public synchronized ActiveLoadEntry dequeueFromPending() {
final ActiveLoadEntry entry = pendingFileQueue.poll();
if (entry != null) {
pendingFileSet.remove(entry.getFile());
loadingFileSet.add(entry.getFile());

ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1);
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1);
}
return pair;
return entry;
}

public synchronized void removeFromLoading(final String file) {
Expand All @@ -74,4 +73,28 @@ public int size() {
public boolean isEmpty() {
return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
}

public static class ActiveLoadEntry {
private final String file;
private final boolean isGeneratedByPipe;
private final boolean isTableModel;

public ActiveLoadEntry(String file, boolean isGeneratedByPipe, boolean isTableModel) {
this.file = file;
this.isGeneratedByPipe = isGeneratedByPipe;
this.isTableModel = isTableModel;
}

public String getFile() {
return file;
}

public boolean isGeneratedByPipe() {
return isGeneratedByPipe;
}

public boolean isTableModel() {
return isTableModel;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,12 +83,13 @@ public int getCurrentAllowedPendingSize() {
return MAX_PENDING_SIZE - pendingQueue.size();
}

public void tryTriggerTsFileLoad(String absolutePath, boolean isGeneratedByPipe) {
public void tryTriggerTsFileLoad(
String absolutePath, boolean isTabletMode, boolean isGeneratedByPipe) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
return;
}

if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe)) {
if (pendingQueue.enqueue(absolutePath, isTabletMode, isGeneratedByPipe)) {
initFailDirIfNecessary();
adjustExecutorIfNecessary();
}
Expand Down Expand Up @@ -165,44 +165,44 @@ private void tryLoadPendingTsFiles() {

try {
while (true) {
final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
if (!filePair.isPresent()) {
final Optional<ActiveLoadPendingQueue.ActiveLoadEntry> loadEntry = tryGetNextPendingFile();
if (!loadEntry.isPresent()) {
return;
}

try {
final TSStatus result = loadTsFile(filePair.get(), session);
final TSStatus result = loadTsFile(loadEntry.get(), session);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
LOGGER.info(
"Successfully auto load tsfile {} (isGeneratedByPipe = {})",
filePair.get().getLeft(),
filePair.get().getRight());
loadEntry.get().getFile(),
loadEntry.get().isGeneratedByPipe());
} else {
handleLoadFailure(filePair.get(), result);
handleLoadFailure(loadEntry.get(), result);
}
} catch (final FileNotFoundException e) {
handleFileNotFoundException(filePair.get());
handleFileNotFoundException(loadEntry.get());
} catch (final Exception e) {
handleOtherException(filePair.get(), e);
handleOtherException(loadEntry.get(), e);
} finally {
pendingQueue.removeFromLoading(filePair.get().getLeft());
pendingQueue.removeFromLoading(loadEntry.get().getFile());
}
}
} finally {
SESSION_MANAGER.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
}
}

private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
private Optional<ActiveLoadPendingQueue.ActiveLoadEntry> tryGetNextPendingFile() {
final long maxRetryTimes =
Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds() << 1);
long currentRetryTimes = 0;

while (true) {
final Pair<String, Boolean> filePair = pendingQueue.dequeueFromPending();
if (Objects.nonNull(filePair)) {
return Optional.of(filePair);
final ActiveLoadPendingQueue.ActiveLoadEntry entry = pendingQueue.dequeueFromPending();
if (Objects.nonNull(entry)) {
return Optional.of(entry);
}

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
Expand All @@ -213,17 +213,20 @@ private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
}
}

private TSStatus loadTsFile(final Pair<String, Boolean> filePair, final IClientSession session)
private TSStatus loadTsFile(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final IClientSession session)
throws FileNotFoundException {
final LoadTsFileStatement statement = new LoadTsFileStatement(filePair.getLeft());
final LoadTsFileStatement statement = new LoadTsFileStatement(entry.getFile());
final List<File> files = statement.getTsFiles();

// It should be noted here that the instructions in this code block do not need to use the
// DataBase, so the DataBase is assigned a value of null. If the DataBase is used later, an
// exception will be thrown.
final File parentFile;
statement.setDatabase(
files.isEmpty() || (parentFile = files.get(0).getParentFile()) == null
files.isEmpty()
|| !entry.isTableModel()
|| (parentFile = files.get(0).getParentFile()) == null
? null
: parentFile.getName());
statement.setDeleteAfterLoad(true);
Expand All @@ -232,7 +235,7 @@ private TSStatus loadTsFile(final Pair<String, Boolean> filePair, final IClientS
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
return executeStatement(
filePair.getRight() ? new PipeEnrichedStatement(statement) : statement, session);
entry.isGeneratedByPipe() ? new PipeEnrichedStatement(statement) : statement, session);
}

private TSStatus executeStatement(final Statement statement, final IClientSession session) {
Expand All @@ -254,34 +257,35 @@ private TSStatus executeStatement(final Statement statement, final IClientSessio
}
}

private void handleLoadFailure(final Pair<String, Boolean> filePair, final TSStatus status) {
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(
filePair, status.getMessage())) {
private void handleLoadFailure(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus status) {
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, status.getMessage())) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. File will be moved to fail directory.",
filePair.getLeft(),
filePair.getRight(),
entry.getFile(),
entry.isGeneratedByPipe(),
status);
removeFileAndResourceAndModsToFailDir(filePair.getLeft());
removeFileAndResourceAndModsToFailDir(entry.getFile());
}
}

private void handleFileNotFoundException(final Pair<String, Boolean> filePair) {
private void handleFileNotFoundException(final ActiveLoadPendingQueue.ActiveLoadEntry entry) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}) due to file not found, will skip this file.",
filePair.getLeft(),
filePair.getRight());
removeFileAndResourceAndModsToFailDir(filePair.getLeft());
entry.getFile(),
entry.isGeneratedByPipe());
removeFileAndResourceAndModsToFailDir(entry.getFile());
}

private void handleOtherException(final Pair<String, Boolean> filePair, final Exception e) {
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(filePair, e.getMessage())) {
private void handleOtherException(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final Exception e) {
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, e.getMessage())) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of an unexpected exception. File will be moved to fail directory.",
filePair.getLeft(),
filePair.getRight(),
entry.getFile(),
entry.isGeneratedByPipe(),
e);
removeFileAndResourceAndModsToFailDir(filePair.getLeft());
removeFileAndResourceAndModsToFailDir(entry.getFile());
}
}

Expand Down
Loading