Skip to content

Commit e8d07c1

Browse files
Load: detect region migration during second phase (#15005)
1 parent 1adc74d commit e8d07c1

File tree

8 files changed

+63
-0
lines changed

8 files changed

+63
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,11 @@ public boolean updateRegionCache(final TRegionRouteReq req) {
292292
return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), req.getRegionRouteMap());
293293
}
294294

295+
@Override
296+
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
297+
return partitionCache.getRegionReplicaSet(id);
298+
}
299+
295300
@Override
296301
public void invalidAllCache() {
297302
partitionCache.invalidAllCache();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.analyze;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
2224
import org.apache.iotdb.commons.partition.DataPartition;
2325
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
2426
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
@@ -92,6 +94,8 @@ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
9294
/** Update region cache in partition cache when receive request from config node */
9395
boolean updateRegionCache(TRegionRouteReq req);
9496

97+
TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id);
98+
9599
/** Invalid all partition cache */
96100
void invalidAllCache();
97101

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ private boolean secondPhase(
418418
stateMachine.transitionToFailed(status);
419419
return false;
420420
}
421+
422+
checkAllReplicaSetsConsistency();
421423
} catch (IOException e) {
422424
LOGGER.warn(
423425
"Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, tsFile: {}",
@@ -433,6 +435,11 @@ private boolean secondPhase(
433435
LOGGER.warn("Interrupt or Execution error.", e);
434436
stateMachine.transitionToFailed(e);
435437
return false;
438+
} catch (Exception e) {
439+
LOGGER.warn(
440+
String.format("Exception occurred during second phase of loading TsFile %s.", tsFile), e);
441+
stateMachine.transitionToFailed(e);
442+
return false;
436443
}
437444
return true;
438445
}
@@ -447,6 +454,24 @@ private ByteBuffer assignProgressIndex(TsFileResource tsFileResource) throws IOE
447454
}
448455
}
449456

457+
public void checkAllReplicaSetsConsistency() throws RegionReplicaSetChangedException {
458+
for (final TRegionReplicaSet replicaSet : allReplicaSets) {
459+
final TConsensusGroupId regionId = replicaSet.getRegionId();
460+
if (regionId == null) {
461+
LOGGER.info(
462+
"region id is null during region consistency check, will skip this region: {}",
463+
replicaSet);
464+
continue;
465+
}
466+
467+
final TRegionReplicaSet currentReplicaSet =
468+
partitionFetcher.fetcher.getRegionReplicaSet(regionId);
469+
if (!Objects.equals(replicaSet, currentReplicaSet)) {
470+
throw new RegionReplicaSetChangedException(replicaSet, currentReplicaSet);
471+
}
472+
}
473+
}
474+
450475
private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
451476
LOGGER.info("Start load TsFile {} locally.", node.getTsFileResource().getTsFile().getPath());
452477

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
298298
return true;
299299
}
300300

301+
@Override
302+
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
303+
return null;
304+
}
305+
301306
@Override
302307
public void invalidAllCache() {}
303308

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
418418
return false;
419419
}
420420

421+
@Override
422+
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
423+
return null;
424+
}
425+
421426
@Override
422427
public void invalidAllCache() {}
423428

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
311311
return false;
312312
}
313313

314+
@Override
315+
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
316+
return null;
317+
}
318+
314319
@Override
315320
public void invalidAllCache() {}
316321

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSMetadata.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
2224
import org.apache.iotdb.commons.partition.DataPartition;
2325
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
2426
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
@@ -431,6 +433,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
431433
return false;
432434
}
433435

436+
@Override
437+
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
438+
return null;
439+
}
440+
434441
@Override
435442
public void invalidAllCache() {}
436443

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
2224
import org.apache.iotdb.commons.partition.DataPartition;
2325
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
2426
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
@@ -505,6 +507,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
505507
return false;
506508
}
507509

510+
@Override
511+
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
512+
return null;
513+
}
514+
508515
@Override
509516
public void invalidAllCache() {}
510517

0 commit comments

Comments
 (0)