Skip to content

Commit

Permalink
[Improve] Speed up ClickhouseFile Local generate a mmap object (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
viverlxl authored Nov 16, 2023
1 parent 2233b3a commit cf39e29
Showing 1 changed file with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class ClickhouseFileSinkWriter
private final ClickhouseTable clickhouseTable;
private final Map<Shard, List<String>> shardLocalDataPaths;
private final Map<Shard, FileChannel> rowCache;
private final Map<Shard, MappedByteBuffer> bufferCache;
private final Integer bufferSize = 1024 * 128;

private final Map<Shard, String> shardTempFile;

Expand All @@ -91,6 +93,7 @@ public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Contex
this.readerOption.getShardMetadata().getDatabase(),
this.readerOption.getShardMetadata().getTable());
rowCache = new HashMap<>(Common.COLLECTION_SIZE);
bufferCache = new HashMap<>(Common.COLLECTION_SIZE);
shardTempFile = new HashMap<>();
nodePasswordCheck();

Expand Down Expand Up @@ -141,7 +144,7 @@ public void write(SeaTunnelRow element) throws IOException {
e);
}
});
saveDataToFile(channel, element);
saveDataToFile(channel, element, shard);
}

private void nodePasswordCheck() {
Expand Down Expand Up @@ -209,7 +212,8 @@ public void close() throws IOException {
}
}

private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) throws IOException {
private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row, Shard shard)
throws IOException {
String data =
this.readerOption.getFields().stream()
.map(
Expand All @@ -227,12 +231,28 @@ private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) throws IO
})
.collect(Collectors.joining(readerOption.getFileFieldsDelimiter()))
+ "\n";

MappedByteBuffer buffer =
fileChannel.map(
FileChannel.MapMode.READ_WRITE,
fileChannel.size(),
data.getBytes(StandardCharsets.UTF_8).length);
buffer.put(data.getBytes(StandardCharsets.UTF_8));
bufferCache.computeIfAbsent(
shard,
k -> {
try {
return fileChannel.map(
FileChannel.MapMode.READ_WRITE, 0, bufferSize);
} catch (IOException e) {
throw new ClickhouseConnectorException(
CommonErrorCodeDeprecated.FILE_OPERATION_FAILED,
"data_local file write failed",
e);
}
});
byte[] byteData = data.getBytes(StandardCharsets.UTF_8);
if (buffer.position() + byteData.length > buffer.capacity()) {
buffer =
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), bufferSize);
bufferCache.put(shard, buffer);
}
buffer.put(byteData);
}

private List<String> generateClickhouseLocalFiles(String clickhouseLocalFileTmpFile)
Expand Down

0 comments on commit cf39e29

Please sign in to comment.