Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2dc5a5c
Sorter fix
Caideyipi May 12, 2025
e05fb5b
Merge branch 'master' of https://github.com/apache/iotdb into sorter-fix
Caideyipi May 12, 2025
a9d59fe
partial
Caideyipi May 12, 2025
452a8e2
Update PipeTableModelTabletEventSorter.java
Caideyipi May 12, 2025
dbe421e
Update PipeTableModelTabletEventSorter.java
Caideyipi May 12, 2025
dedf3cd
half
Caideyipi May 12, 2025
dd59159
Refactor
Caideyipi May 12, 2025
008c008
partial
Caideyipi May 12, 2025
a61bab5
May final
Caideyipi May 12, 2025
73cbf99
Merge branch 'master' of https://github.com/apache/iotdb into sorter-fix
Caideyipi May 12, 2025
e372a16
Fix
Caideyipi May 12, 2025
2672a58
partial
Caideyipi May 12, 2025
00e2012
Update PipeTransferTabletRawReqV2.java
Caideyipi May 12, 2025
2164f00
Update PipeTabletEventTsFileBatch.java
Caideyipi May 12, 2025
3f37b0e
Update PipeTransferTabletRawReq.java
Caideyipi May 12, 2025
93c6199
Update OpcDaServerHandle.java
Caideyipi May 12, 2025
696c192
Update OpcUaNameSpace.java
Caideyipi May 12, 2025
11b213d
partial fix
Caideyipi May 12, 2025
88087e7
Update PipeTabletEventSorterTest.java
Caideyipi May 12, 2025
93acc8f
Merge branch 'master' of https://github.com/apache/iotdb into sorter-fix
Caideyipi May 13, 2025
4381da8
Merge branch 'master' of https://github.com/apache/iotdb into sorter-fix
Caideyipi May 13, 2025
319b6e6
Update PipeTabletEventSorterTest.java
Caideyipi May 13, 2025
416c24a
Merge branch 'master' of https://github.com/apache/iotdb into sorter-fix
Caideyipi May 13, 2025
104a7f3
Update PipeTableModelTabletEventSorter.java
Caideyipi May 13, 2025
bf49672
Refactor
Caideyipi May 14, 2025
b72ab38
refactor
Caideyipi May 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public InsertTabletStatement constructStatement() {
if (Objects.isNull(dataBaseName)) {
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
} else {
new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByTimestampIfNecessary();
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
transferTabletRowForClientServerModel(
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values);
} else {
new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByTimestampIfNecessary();
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();

final List<Integer> columnIndexes = new ArrayList<>();
for (int i = 0; i < schemas.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,20 @@
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class PipeTableModelTabletEventSorter {

private final Tablet tablet;

private Integer[] index;
private boolean isUnSorted = false;
private boolean hasDuplicates = false;
private int deduplicatedSize;
public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter {
private int initIndexSize;

public PipeTableModelTabletEventSorter(final Tablet tablet) {
this.tablet = tablet;
deduplicatedSize = tablet == null ? 0 : tablet.getRowSize();
super(tablet);
deDuplicatedSize = tablet == null ? 0 : tablet.getRowSize();
}

/**
Expand All @@ -72,24 +62,24 @@ public void sortAndDeduplicateByDevIdTimestamp() {
final int deviceComparison = deviceID.compareTo(lastDevice);
if (deviceComparison == 0) {
if (previousTimestamp == currentTimestamp) {
hasDuplicates = true;
isDeDuplicated = false;
continue;
}
if (previousTimestamp > currentTimestamp) {
isUnSorted = true;
isSorted = false;
}
previousTimestamp = currentTimestamp;
continue;
}
if (deviceComparison < 0) {
isUnSorted = true;
isSorted = false;
}

final List<Pair<Integer, Integer>> list =
deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new ArrayList<>());

if (!list.isEmpty()) {
isUnSorted = true;
isSorted = false;
}
list.add(new Pair<>(lasIndex, i));
lastDevice = deviceID;
Expand All @@ -100,85 +90,69 @@ public void sortAndDeduplicateByDevIdTimestamp() {
final List<Pair<Integer, Integer>> list =
deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new ArrayList<>());
if (!list.isEmpty()) {
isUnSorted = true;
isSorted = false;
}
list.add(new Pair<>(lasIndex, tablet.getRowSize()));

if (!isUnSorted && !hasDuplicates) {
if (isSorted && isDeDuplicated) {
return;
}

initIndexSize = 0;
deduplicatedSize = 0;
deDuplicatedSize = 0;
index = new Integer[tablet.getRowSize()];
deDuplicatedIndex = new int[tablet.getRowSize()];
deviceIDToIndexMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(
entry -> {
final int start = initIndexSize;
int i = initIndexSize;
for (Pair<Integer, Integer> pair : entry.getValue()) {
for (int j = pair.left; j < pair.right; j++) {
index[i++] = j;
}
}
if (isUnSorted) {
sortTimestamps(start, i);
deduplicateTimestamps(start, i);
if (!isSorted) {
sortTimestamps(initIndexSize, i);
deDuplicateTimestamps(initIndexSize, i);
initIndexSize = i;
return;
}

if (hasDuplicates) {
deduplicateTimestamps(start, i);
if (!isDeDuplicated) {
deDuplicateTimestamps(initIndexSize, i);
}
initIndexSize = i;
});

sortAndDeduplicateValuesAndBitMaps();
sortAndDeduplicateValuesAndBitMapsWithTimestamp();
}

private void sortAndDeduplicateValuesAndBitMaps() {
int columnIndex = 0;
private void sortAndDeduplicateValuesAndBitMapsWithTimestamp() {
tablet.setTimestamps(
(long[])
PipeTabletEventSorter.reorderValueList(
deduplicatedSize, tablet.getTimestamps(), TSDataType.TIMESTAMP, index));
for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
final IMeasurementSchema schema = tablet.getSchemas().get(i);
if (schema != null) {
tablet.getValues()[columnIndex] =
PipeTabletEventSorter.reorderValueList(
deduplicatedSize, tablet.getValues()[columnIndex], schema.getType(), index);
if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] != null) {
tablet.getBitMaps()[columnIndex] =
PipeTabletEventSorter.reorderBitMap(
deduplicatedSize, tablet.getBitMaps()[columnIndex], index);
}
columnIndex++;
}
}

tablet.setRowSize(deduplicatedSize);
reorderValueListAndBitMap(tablet.getTimestamps(), TSDataType.TIMESTAMP, null, null));
sortAndMayDeduplicateValuesAndBitMaps();
tablet.setRowSize(deDuplicatedSize);
}

private void sortTimestamps(final int startIndex, final int endIndex) {
Arrays.sort(this.index, startIndex, endIndex, Comparator.comparingLong(tablet::getTimestamp));
}

private void deduplicateTimestamps(final int startIndex, final int endIndex) {
private void deDuplicateTimestamps(final int startIndex, final int endIndex) {
final long[] timestamps = tablet.getTimestamps();
long lastTime = timestamps[index[startIndex]];
index[deduplicatedSize++] = index[startIndex];
for (int i = startIndex + 1; i < endIndex; i++) {
if (lastTime != (lastTime = timestamps[index[i]])) {
index[deduplicatedSize++] = index[i];
deDuplicatedIndex[deDuplicatedSize++] = i - 1;
}
}
deDuplicatedIndex[deDuplicatedSize++] = endIndex - 1;
}

/** Sort by time only, and remove only rows with the same DeviceID and time. */
public void sortAndDeduplicateByTimestampIfNecessary() {
/** Sort by time only. */
public void sortByTimestampIfNecessary() {
if (tablet == null || tablet.getRowSize() == 0) {
return;
}
Expand All @@ -189,15 +163,12 @@ public void sortAndDeduplicateByTimestampIfNecessary() {
final long previousTimestamp = timestamps[i - 1];

if (currentTimestamp < previousTimestamp) {
isUnSorted = true;
isSorted = false;
break;
}
if (currentTimestamp == previousTimestamp) {
hasDuplicates = true;
}
}

if (!isUnSorted && !hasDuplicates) {
if (isSorted) {
return;
}

Expand All @@ -206,68 +177,15 @@ public void sortAndDeduplicateByTimestampIfNecessary() {
index[i] = i;
}

if (isUnSorted) {
if (!isSorted) {
sortTimestamps();

// Do deduplicate anyway.
// isDeduplicated may be false positive when isUnSorted is true.
deduplicateTimestamps();
hasDuplicates = false;
}

if (hasDuplicates) {
deduplicateTimestamps();
}

sortAndDeduplicateValuesAndBitMapsIgnoreTimestamp();
sortAndMayDeduplicateValuesAndBitMaps();
}

private void sortTimestamps() {
Arrays.sort(this.index, Comparator.comparingLong(tablet::getTimestamp));
Arrays.sort(tablet.getTimestamps(), 0, tablet.getRowSize());
}

private void deduplicateTimestamps() {
deduplicatedSize = 1;
final long[] timestamps = tablet.getTimestamps();
long lastTime = timestamps[0];
IDeviceID deviceID = tablet.getDeviceID(index[0]);
final Set<IDeviceID> deviceIDSet = new HashSet<>();
deviceIDSet.add(deviceID);
for (int i = 1, size = tablet.getRowSize(); i < size; i++) {
deviceID = tablet.getDeviceID(index[i]);
if ((lastTime == (lastTime = timestamps[i]))) {
if (!deviceIDSet.contains(deviceID)) {
timestamps[deduplicatedSize] = lastTime;
index[deduplicatedSize++] = index[i];
deviceIDSet.add(deviceID);
}
} else {
timestamps[deduplicatedSize] = lastTime;
index[deduplicatedSize++] = index[i];
deviceIDSet.clear();
deviceIDSet.add(deviceID);
}
}
}

private void sortAndDeduplicateValuesAndBitMapsIgnoreTimestamp() {
int columnIndex = 0;
for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
final IMeasurementSchema schema = tablet.getSchemas().get(i);
if (schema != null) {
tablet.getValues()[columnIndex] =
PipeTabletEventSorter.reorderValueList(
deduplicatedSize, tablet.getValues()[columnIndex], schema.getType(), index);
if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] != null) {
tablet.getBitMaps()[columnIndex] =
PipeTabletEventSorter.reorderBitMap(
deduplicatedSize, tablet.getBitMaps()[columnIndex], index);
}
columnIndex++;
}
}

tablet.setRowSize(deduplicatedSize);
}
}
Loading
Loading