Skip to content

Commit 52d3455

Browse files
authored
Fix negative iot queue size & missing search index for deletion & missed request when performing empty table deleting (#16022)
* Fix double memory free of iotconsensus queue request during region deletion * Fix missing searchIndex and lost deletion when no TsFile is involved.
1 parent 5b3cd27 commit 52d3455

File tree

13 files changed

+203
-20
lines changed

13 files changed

+203
-20
lines changed

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import org.apache.thrift.TException;
4646
import org.apache.tsfile.read.common.Field;
47+
import org.apache.tsfile.utils.Pair;
4748
import org.awaitility.Awaitility;
4849
import org.awaitility.core.ConditionTimeoutException;
4950
import org.junit.After;
@@ -307,7 +308,7 @@ public void generalTestWithAllOptions(
307308
}
308309
}
309310

310-
protected Set<Integer> getAllDataNodes(Statement statement) throws Exception {
311+
public static Set<Integer> getAllDataNodes(Statement statement) throws Exception {
311312
ResultSet result = statement.executeQuery(SHOW_DATANODES);
312313
Set<Integer> allDataNodeId = new HashSet<>();
313314
while (result.next()) {
@@ -444,6 +445,26 @@ public static Map<Integer, Set<Integer>> getDataRegionMap(Statement statement) t
444445
return regionMap;
445446
}
446447

448+
public static Map<Integer, Pair<Integer, Set<Integer>>> getDataRegionMapWithLeader(
449+
Statement statement) throws Exception {
450+
ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
451+
Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>();
452+
while (showRegionsResult.next()) {
453+
if (String.valueOf(TConsensusGroupType.DataRegion)
454+
.equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
455+
int regionId = showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
456+
int dataNodeId = showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
457+
Pair<Integer, Set<Integer>> leaderNodesPair =
458+
regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new HashSet<>()));
459+
leaderNodesPair.getRight().add(dataNodeId);
460+
if (showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) {
461+
leaderNodesPair.setLeft(dataNodeId);
462+
}
463+
}
464+
}
465+
return regionMap;
466+
}
467+
447468
public static Map<Integer, Set<Integer>> getAllRegionMap(Statement statement) throws Exception {
448469
ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
449470
Map<Integer, Set<Integer>> regionMap = new HashMap<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
21+
22+
import org.apache.iotdb.consensus.ConsensusFactory;
23+
import org.apache.iotdb.it.env.EnvFactory;
24+
25+
import org.apache.tsfile.utils.Pair;
26+
import org.awaitility.Awaitility;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
31+
import java.sql.Connection;
32+
import java.sql.Statement;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.concurrent.TimeUnit;
36+
37+
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
38+
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
39+
40+
public class IoTDBRegionMigrateWithLastEmptyDeletionIT {
41+
@Before
42+
public void setUp() throws Exception {
43+
EnvFactory.getEnv()
44+
.getConfig()
45+
.getCommonConfig()
46+
.setDataReplicationFactor(2)
47+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
48+
EnvFactory.getEnv().initClusterEnvironment(1, 3);
49+
}
50+
51+
@After
52+
public void tearDown() throws Exception {
53+
EnvFactory.getEnv().cleanClusterEnvironment();
54+
}
55+
56+
@Test
57+
public void testWithLastEmptyDeletion() throws Exception {
58+
try (Connection connection = EnvFactory.getEnv().getTableConnection();
59+
Statement statement = connection.createStatement()) {
60+
statement.execute("CREATE DATABASE test");
61+
statement.execute("USE test");
62+
statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
63+
statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)");
64+
statement.execute("FLUSH");
65+
// the deletion does not involve any file
66+
statement.execute("DELETE FROM t1 WHERE time < 100");
67+
68+
Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
69+
getDataRegionMapWithLeader(statement);
70+
int dataRegionIdForTest =
71+
dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get();
72+
Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest);
73+
Set<Integer> allDataNodes = getAllDataNodes(statement);
74+
int leaderId = leaderAndNodes.getLeft();
75+
int followerId =
76+
leaderAndNodes.getRight().stream().filter(i -> i != leaderId).findAny().get();
77+
int newLeaderId =
78+
allDataNodes.stream().filter(i -> i != leaderId && i != followerId).findAny().get();
79+
80+
System.out.printf(
81+
"Old leader: %d, follower: %d, new leader: %d%n", leaderId, followerId, newLeaderId);
82+
83+
statement.execute(
84+
String.format(
85+
"migrate region %d from %d to %d", dataRegionIdForTest, leaderId, newLeaderId));
86+
87+
Awaitility.await()
88+
.atMost(10, TimeUnit.MINUTES)
89+
.pollDelay(1, TimeUnit.SECONDS)
90+
.until(
91+
() -> {
92+
Map<Integer, Pair<Integer, Set<Integer>>> regionMapWithLeader =
93+
getDataRegionMapWithLeader(statement);
94+
Pair<Integer, Set<Integer>> newLeaderAndNodes =
95+
regionMapWithLeader.get(dataRegionIdForTest);
96+
Set<Integer> nodes = newLeaderAndNodes.right;
97+
return nodes.size() == 2 && nodes.contains(newLeaderId);
98+
});
99+
}
100+
}
101+
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.Objects;
26+
import java.util.concurrent.atomic.AtomicLong;
2627

2728
/** only used for iot consensus. */
2829
public class IndexedConsensusRequest implements IConsensusRequest {
@@ -34,6 +35,7 @@ public class IndexedConsensusRequest implements IConsensusRequest {
3435
private final List<IConsensusRequest> requests;
3536
private final List<ByteBuffer> serializedRequests;
3637
private long memorySize = 0;
38+
private AtomicLong referenceCnt = new AtomicLong();
3739

3840
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
3941
this.searchIndex = searchIndex;
@@ -100,4 +102,12 @@ public boolean equals(Object o) {
100102
public int hashCode() {
101103
return Objects.hash(searchIndex, requests);
102104
}
105+
106+
public long incRef() {
107+
return referenceCnt.getAndIncrement();
108+
}
109+
110+
public long decRef() {
111+
return referenceCnt.getAndDecrement();
112+
}
103113
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
2323
import org.apache.iotdb.commons.memory.IMemoryBlock;
2424
import org.apache.iotdb.commons.service.metric.MetricService;
25+
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
2526

2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -40,6 +41,17 @@ private IoTConsensusMemoryManager() {
4041
MetricService.getInstance().addMetricSet(new IoTConsensusMemoryManagerMetrics(this));
4142
}
4243

44+
public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) {
45+
synchronized (request) {
46+
long prevRef = request.incRef();
47+
if (prevRef == 0) {
48+
return reserve(request.getMemorySize(), fromQueue);
49+
} else {
50+
return true;
51+
}
52+
}
53+
}
54+
4355
public boolean reserve(long size, boolean fromQueue) {
4456
boolean result =
4557
fromQueue
@@ -55,6 +67,15 @@ public boolean reserve(long size, boolean fromQueue) {
5567
return result;
5668
}
5769

70+
public void free(IndexedConsensusRequest request, boolean fromQueue) {
71+
synchronized (request) {
72+
long prevRef = request.decRef();
73+
if (prevRef == 0) {
74+
free(request.getMemorySize(), fromQueue);
75+
}
76+
}
77+
}
78+
5879
public void free(long size, boolean fromQueue) {
5980
long currentUsedMemory = memoryBlock.release(size);
6081
if (fromQueue) {

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -280,27 +280,27 @@ public int getBufferRequestSize() {
280280

281281
/** try to offer a request into queue with memory control. */
282282
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
283-
if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getMemorySize(), true)) {
283+
if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) {
284284
return false;
285285
}
286286
boolean success;
287287
try {
288288
success = pendingEntries.offer(indexedConsensusRequest);
289289
} catch (Throwable t) {
290290
// If exception occurs during request offer, the reserved memory should be released
291-
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
291+
iotConsensusMemoryManager.free(indexedConsensusRequest, true);
292292
throw t;
293293
}
294294
if (!success) {
295295
// If offer failed, the reserved memory should be released
296-
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
296+
iotConsensusMemoryManager.free(indexedConsensusRequest, true);
297297
}
298298
return success;
299299
}
300300

301301
/** try to remove a request from queue with memory control. */
302302
private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
303-
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
303+
iotConsensusMemoryManager.free(indexedConsensusRequest, true);
304304
}
305305

306306
public void stop() {
@@ -322,13 +322,23 @@ private void processStopped() {
322322
}
323323
long requestSize = 0;
324324
for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
325-
requestSize += indexedConsensusRequest.getMemorySize();
325+
synchronized (indexedConsensusRequest) {
326+
long prevRef = indexedConsensusRequest.decRef();
327+
if (prevRef == 1) {
328+
requestSize += indexedConsensusRequest.getMemorySize();
329+
}
330+
}
326331
}
327332
pendingEntries.clear();
328333
iotConsensusMemoryManager.free(requestSize, true);
329334
requestSize = 0;
330335
for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
331-
requestSize += indexedConsensusRequest.getMemorySize();
336+
synchronized (indexedConsensusRequest) {
337+
long prevRef = indexedConsensusRequest.decRef();
338+
if (prevRef == 1) {
339+
requestSize += indexedConsensusRequest.getMemorySize();
340+
}
341+
}
332342
}
333343
iotConsensusMemoryManager.free(requestSize, true);
334344
syncStatus.free();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
3232
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
3333
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
34+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
3435
import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
3536

3637
import org.apache.tsfile.enums.TSDataType;
@@ -225,8 +226,9 @@ public long getSearchIndex() {
225226
}
226227

227228
@Override
228-
public void setSearchIndex(final long searchIndex) {
229+
public SearchNode setSearchIndex(final long searchIndex) {
229230
insertNode.setSearchIndex(searchIndex);
231+
return this;
230232
}
231233

232234
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,9 +394,10 @@ public SearchNode merge(List<SearchNode> searchNodes) {
394394
.distinct()
395395
.collect(Collectors.toList());
396396
return new DeleteDataNode(
397-
firstOne.getPlanNodeId(),
398-
pathList,
399-
firstOne.getDeleteStartTime(),
400-
firstOne.getDeleteEndTime());
397+
firstOne.getPlanNodeId(),
398+
pathList,
399+
firstOne.getDeleteStartTime(),
400+
firstOne.getDeleteEndTime())
401+
.setSearchIndex(firstOne.searchIndex);
401402
}
402403
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,10 @@ public void addInsertTabletNode(InsertTabletNode node, Integer parentIndex) {
134134
}
135135

136136
@Override
137-
public void setSearchIndex(long index) {
137+
public SearchNode setSearchIndex(long index) {
138138
searchIndex = index;
139139
insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
140+
return this;
140141
}
141142

142143
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,10 @@ public void setMixingAlignment(boolean mixingAlignment) {
128128
}
129129

130130
@Override
131-
public void setSearchIndex(long index) {
131+
public SearchNode setSearchIndex(long index) {
132132
searchIndex = index;
133133
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
134+
return this;
134135
}
135136

136137
public Map<Integer, TSStatus> getResults() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,10 @@ public void clearResults() {
9898
}
9999

100100
@Override
101-
public void setSearchIndex(long index) {
101+
public SearchNode setSearchIndex(long index) {
102102
searchIndex = index;
103103
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
104+
return this;
104105
}
105106

106107
public TSStatus[] getFailingStatus() {

0 commit comments

Comments
 (0)