Skip to content

Commit a8c80ec

Browse files
Load: Batched tablet insertion during conversion (#15125)
1 parent 680026e commit a8c80ec

File tree

10 files changed

+186
-75
lines changed

10 files changed

+186
-75
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,8 @@ public class IoTDBConfig {
10841084
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
10851085
0L; // 0 means that the decision will be adaptive based on the number of sequences
10861086

1087+
private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
1088+
10871089
private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000;
10881090

10891091
private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
@@ -3772,6 +3774,16 @@ public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
37723774
this.loadTsFileAnalyzeSchemaMemorySizeInBytes = loadTsFileAnalyzeSchemaMemorySizeInBytes;
37733775
}
37743776

3777+
public long getLoadTsFileTabletConversionBatchMemorySizeInBytes() {
3778+
return loadTsFileTabletConversionBatchMemorySizeInBytes;
3779+
}
3780+
3781+
public void setLoadTsFileTabletConversionBatchMemorySizeInBytes(
3782+
long loadTsFileTabletConversionBatchMemorySizeInBytes) {
3783+
this.loadTsFileTabletConversionBatchMemorySizeInBytes =
3784+
loadTsFileTabletConversionBatchMemorySizeInBytes;
3785+
}
3786+
37753787
public int getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex() {
37763788
return loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
37773789
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2172,6 +2172,11 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
21722172
properties.getProperty(
21732173
"load_tsfile_analyze_schema_memory_size_in_bytes",
21742174
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
2175+
conf.setLoadTsFileTabletConversionBatchMemorySizeInBytes(
2176+
Long.parseLong(
2177+
properties.getProperty(
2178+
"load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
2179+
String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes()))));
21752180
conf.setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex(
21762181
Integer.parseInt(
21772182
properties.getProperty(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3838
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
3939
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
40-
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
40+
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
4141
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
4242
import org.apache.iotdb.db.utils.ModificationUtils;
4343

@@ -79,7 +79,7 @@ public class LoadTsFileTableSchemaCache {
7979
: CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes();
8080
}
8181

82-
private final LoadTsFileAnalyzeSchemaMemoryBlock block;
82+
private final LoadTsFileMemoryBlock block;
8383

8484
private String database;
8585
private final Metadata metadata;
@@ -104,7 +104,7 @@ public LoadTsFileTableSchemaCache(Metadata metadata, MPPQueryContext context)
104104
throws LoadRuntimeOutOfMemoryException {
105105
this.block =
106106
LoadTsFileMemoryManager.getInstance()
107-
.allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
107+
.allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
108108
this.metadata = metadata;
109109
this.context = context;
110110
this.currentBatchTable2Devices = new HashMap<>();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3030
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
3131
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
32-
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
32+
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
3333
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
3434
import org.apache.iotdb.db.utils.ModificationUtils;
3535

@@ -70,7 +70,7 @@ public class LoadTsFileTreeSchemaCache {
7070
FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES = ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES >> 1;
7171
}
7272

73-
private final LoadTsFileAnalyzeSchemaMemoryBlock block;
73+
private final LoadTsFileMemoryBlock block;
7474

7575
private Map<IDeviceID, Set<MeasurementSchema>> currentBatchDevice2TimeSeriesSchemas;
7676
private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
@@ -90,7 +90,7 @@ public class LoadTsFileTreeSchemaCache {
9090
public LoadTsFileTreeSchemaCache() throws LoadRuntimeOutOfMemoryException {
9191
this.block =
9292
LoadTsFileMemoryManager.getInstance()
93-
.allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
93+
.allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
9494
this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
9595
this.tsFileDevice2IsAligned = new HashMap<>();
9696
this.alreadySetDatabases = new HashSet<>();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
2727
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
2828
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
29+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
2930
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
3031
import org.apache.iotdb.rpc.TSStatusCode;
3132

@@ -45,6 +46,12 @@ public TSStatus visitInsertTablet(
4546
return visitInsertBase(insertTabletStatement, context);
4647
}
4748

49+
@Override
50+
public TSStatus visitInsertMultiTablets(
51+
final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) {
52+
return visitInsertBase(insertMultiTabletsStatement, context);
53+
}
54+
4855
private TSStatus visitInsertBase(
4956
final InsertBaseStatement insertBaseStatement, final TSStatus context) {
5057
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public Optional<TSStatus> visitLoadTsFile(
7171

7272
LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.", loadTsFileStatement);
7373

74+
// TODO: Use batch insert after Table model supports insertMultiTablets
7475
for (final File file : loadTsFileStatement.getTsFiles()) {
7576
try (final TsFileInsertionEventTableParser parser =
7677
new TsFileInsertionEventTableParser(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java

Lines changed: 137 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
24+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2425
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
2526
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
2627
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
2728
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
2829
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
30+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
2931
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
3032
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
3133
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
3234
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
35+
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
36+
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
3337
import org.apache.iotdb.rpc.TSStatusCode;
3438

3539
import org.apache.commons.io.FileUtils;
@@ -39,16 +43,25 @@
3943
import org.slf4j.LoggerFactory;
4044

4145
import java.io.File;
46+
import java.util.ArrayList;
47+
import java.util.List;
4248
import java.util.Optional;
49+
import java.util.stream.Collectors;
50+
51+
import static org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes;
4352

4453
public class LoadTreeStatementDataTypeConvertExecutionVisitor
4554
extends StatementVisitor<Optional<TSStatus>, Void> {
46-
47-
private final StatementExecutor statementExecutor;
48-
4955
private static final Logger LOGGER =
5056
LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class);
5157

58+
private static final long TABLET_BATCH_MEMORY_SIZE_IN_BYTES =
59+
IoTDBDescriptor.getInstance()
60+
.getConfig()
61+
.getLoadTsFileTabletConversionBatchMemorySizeInBytes();
62+
63+
private final StatementExecutor statementExecutor;
64+
5265
@FunctionalInterface
5366
public interface StatementExecutor {
5467
TSStatus execute(final Statement statement);
@@ -70,60 +83,82 @@ public Optional<TSStatus> visitLoadFile(
7083

7184
LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", loadTsFileStatement);
7285

73-
for (final File file : loadTsFileStatement.getTsFiles()) {
74-
try (final TsFileInsertionEventScanParser parser =
75-
new TsFileInsertionEventScanParser(
76-
file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
77-
for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
78-
final LoadConvertedInsertTabletStatement statement =
79-
new LoadConvertedInsertTabletStatement(
80-
PipeTransferTabletRawReq.toTPipeTransferRawReq(
81-
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
82-
.constructStatement(),
83-
loadTsFileStatement.isConvertOnTypeMismatch());
84-
85-
TSStatus result;
86-
try {
87-
result =
88-
statement.accept(
89-
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
90-
statementExecutor.execute(statement));
91-
92-
// Retry max 5 times if the write process is rejected
93-
for (int i = 0;
94-
i < 5
95-
&& result.getCode()
96-
== TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
97-
i++) {
98-
Thread.sleep(100L * (i + 1));
99-
result =
100-
statement.accept(
101-
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
102-
statementExecutor.execute(statement));
86+
final LoadTsFileMemoryBlock block =
87+
LoadTsFileMemoryManager.getInstance()
88+
.allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES);
89+
final List<PipeTransferTabletRawReq> tabletRawReqs = new ArrayList<>();
90+
final List<Long> tabletRawReqSizes = new ArrayList<>();
91+
92+
try {
93+
for (final File file : loadTsFileStatement.getTsFiles()) {
94+
try (final TsFileInsertionEventScanParser parser =
95+
new TsFileInsertionEventScanParser(
96+
file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
97+
for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
98+
final PipeTransferTabletRawReq tabletRawReq =
99+
PipeTransferTabletRawReq.toTPipeTransferRawReq(
100+
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight());
101+
final long curMemory = calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1;
102+
if (block.hasEnoughMemory(curMemory)) {
103+
tabletRawReqs.add(tabletRawReq);
104+
tabletRawReqSizes.add(curMemory);
105+
block.addMemoryUsage(curMemory);
106+
continue;
103107
}
104-
} catch (final Exception e) {
105-
if (e instanceof InterruptedException) {
106-
Thread.currentThread().interrupt();
108+
109+
final TSStatus result =
110+
executeInsertMultiTabletsWithRetry(
111+
tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch());
112+
113+
for (final long memoryCost : tabletRawReqSizes) {
114+
block.reduceMemoryUsage(memoryCost);
115+
}
116+
tabletRawReqs.clear();
117+
tabletRawReqSizes.clear();
118+
119+
if (!handleTSStatus(result, loadTsFileStatement)) {
120+
return Optional.empty();
107121
}
108-
result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
122+
123+
tabletRawReqs.add(tabletRawReq);
124+
tabletRawReqSizes.add(curMemory);
125+
block.addMemoryUsage(curMemory);
126+
}
127+
} catch (final Exception e) {
128+
LOGGER.warn(
129+
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
130+
return Optional.empty();
131+
}
132+
}
133+
134+
if (!tabletRawReqs.isEmpty()) {
135+
try {
136+
final TSStatus result =
137+
executeInsertMultiTabletsWithRetry(
138+
tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch());
139+
140+
for (final long memoryCost : tabletRawReqSizes) {
141+
block.reduceMemoryUsage(memoryCost);
109142
}
143+
tabletRawReqs.clear();
144+
tabletRawReqSizes.clear();
110145

111-
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
112-
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
113-
|| result.getCode()
114-
== TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
115-
LOGGER.warn(
116-
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
117-
loadTsFileStatement,
118-
result.getCode());
146+
if (!handleTSStatus(result, loadTsFileStatement)) {
119147
return Optional.empty();
120148
}
149+
} catch (final Exception e) {
150+
LOGGER.warn(
151+
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
152+
return Optional.empty();
121153
}
122-
} catch (final Exception e) {
123-
LOGGER.warn(
124-
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
125-
return Optional.empty();
126154
}
155+
} finally {
156+
for (final long memoryCost : tabletRawReqSizes) {
157+
block.reduceMemoryUsage(memoryCost);
158+
}
159+
tabletRawReqs.clear();
160+
tabletRawReqSizes.clear();
161+
block.close();
127162
}
128163

129164
if (loadTsFileStatement.isDeleteAfterLoad()) {
@@ -144,4 +179,57 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
144179

145180
return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
146181
}
182+
183+
private TSStatus executeInsertMultiTabletsWithRetry(
184+
final List<PipeTransferTabletRawReq> tabletRawReqs, boolean isConvertOnTypeMismatch) {
185+
final InsertMultiTabletsStatement batchStatement = new InsertMultiTabletsStatement();
186+
batchStatement.setInsertTabletStatementList(
187+
tabletRawReqs.stream()
188+
.map(
189+
req ->
190+
new LoadConvertedInsertTabletStatement(
191+
req.constructStatement(), isConvertOnTypeMismatch))
192+
.collect(Collectors.toList()));
193+
194+
TSStatus result;
195+
try {
196+
result =
197+
batchStatement.accept(
198+
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
199+
statementExecutor.execute(batchStatement));
200+
201+
// Retry max 5 times if the write process is rejected
202+
for (int i = 0;
203+
i < 5
204+
&& result.getCode()
205+
== TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
206+
i++) {
207+
Thread.sleep(100L * (i + 1));
208+
result =
209+
batchStatement.accept(
210+
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
211+
statementExecutor.execute(batchStatement));
212+
}
213+
} catch (final Exception e) {
214+
if (e instanceof InterruptedException) {
215+
Thread.currentThread().interrupt();
216+
}
217+
result = batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
218+
}
219+
return result;
220+
}
221+
222+
private static boolean handleTSStatus(
223+
final TSStatus result, final LoadTsFileStatement loadTsFileStatement) {
224+
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
225+
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
226+
|| result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
227+
LOGGER.warn(
228+
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
229+
loadTsFileStatement,
230+
result.getCode());
231+
return false;
232+
}
233+
return true;
234+
}
147235
}

0 commit comments

Comments
 (0)