Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
Expand All @@ -34,6 +37,7 @@
import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
Expand Down Expand Up @@ -304,7 +308,12 @@ public void customize(
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
startIndex = environment.getPipeTaskMeta().getProgressIndex();
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
startIndex =
tryToExtractLocalProgressIndexForIoTV2(environment.getPipeTaskMeta().getProgressIndex());
} else {
startIndex = environment.getPipeTaskMeta().getProgressIndex();
}

dataRegionId = environment.getRegionId();
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
Expand Down Expand Up @@ -426,6 +435,61 @@ private void flushDataRegionAllTsFiles() {
}
}

/**
* IoTV2 will only resend event that contains un-replicated local write data. So we only extract
* ProgressIndex containing local writes for comparison to prevent misjudgment on whether
* high-level tsFiles with mixed progressIndexes need to be retransmitted
*
* @return recoverProgressIndex dedicated in local DataNodeId or origin for fallback.
*/
private ProgressIndex tryToExtractLocalProgressIndexForIoTV2(ProgressIndex origin) {
// There are only 2 cases:
// 1. origin is RecoverProgressIndex
if (origin instanceof RecoverProgressIndex) {
RecoverProgressIndex toBeTransformed = (RecoverProgressIndex) origin;
return extractRecoverProgressIndex(toBeTransformed);
}
// 2. origin is HybridProgressIndex
else if (origin instanceof HybridProgressIndex) {
HybridProgressIndex toBeTransformed = (HybridProgressIndex) origin;
// if hybridProgressIndex contains recoverProgressIndex, which is what we expected.
if (toBeTransformed
.getType2Index()
.containsKey(ProgressIndexType.RECOVER_PROGRESS_INDEX.getType())) {
// 2.1. transform recoverProgressIndex
RecoverProgressIndex specificToBeTransformed =
(RecoverProgressIndex)
toBeTransformed
.getType2Index()
.get(ProgressIndexType.RECOVER_PROGRESS_INDEX.getType());
return extractRecoverProgressIndex(specificToBeTransformed);
}
// if hybridProgressIndex doesn't contain recoverProgressIndex, which is not what we expected,
// fallback.
return origin;
} else {
// fallback
LOGGER.warn(
"Pipe {}@{}: unexpected ProgressIndex type {}, fallback to origin {}.",
pipeName,
dataRegionId,
origin.getType(),
origin);
return origin;
}
}

private ProgressIndex extractRecoverProgressIndex(RecoverProgressIndex toBeTransformed) {
return new RecoverProgressIndex(
toBeTransformed.getDataNodeId2LocalIndex().entrySet().stream()
.filter(
entry ->
entry
.getKey()
.equals(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

@Override
public synchronized void start() {
if (!shouldExtractInsertion) {
Expand Down Expand Up @@ -617,8 +681,19 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) {
if (startIndex instanceof StateProgressIndex) {
startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
}
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&& !startIndex.equals(resource.getMaxProgressIndexAfterClose());

if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
// For consensus pipe, we only focus on the progressIndex that is generated from local write
// instead of replication or something else.
ProgressIndex dedicatedProgressIndex =
tryToExtractLocalProgressIndexForIoTV2(resource.getMaxProgressIndexAfterClose());
return greaterThanStartIndex(dedicatedProgressIndex);
}
return greaterThanStartIndex(resource.getMaxProgressIndexAfterClose());
}

private boolean greaterThanStartIndex(ProgressIndex progressIndex) {
return !startIndex.isAfter(progressIndex) && !startIndex.equals(progressIndex);
}

private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) {
Expand Down Expand Up @@ -712,12 +787,26 @@ private void extractDeletions(
// For deletions that are filtered and will not be sent, we should manually decrease its
// reference count. Because the initial value of referenceCount is `ReplicaNum - 1`
allDeletionResources.stream()
.filter(resource -> startIndex.isAfter(resource.getProgressIndex()))
.filter(
resource -> {
ProgressIndex toBeCompared = resource.getProgressIndex();
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
toBeCompared = tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
}
return !greaterThanStartIndex(toBeCompared);
})
.forEach(DeletionResource::decreaseReference);
// Get deletions that should be sent.
allDeletionResources =
allDeletionResources.stream()
.filter(resource -> !startIndex.isAfter(resource.getProgressIndex()))
.filter(
resource -> {
ProgressIndex toBeCompared = resource.getProgressIndex();
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
toBeCompared = tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
}
return greaterThanStartIndex(toBeCompared);
})
.collect(Collectors.toList());
resourceList.addAll(allDeletionResources);
LOGGER.info(
Expand Down
Loading