Skip to content

Commit

Permalink
[core] Improve RocksDBStateFactory bulkLoad exception (apache#2356)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Nov 21, 2023
1 parent c391db0 commit 116789c
Showing 1 changed file with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,42 +82,45 @@ public RocksDBStateFactory(
}

public void bulkLoad(RocksDBState<?, ?, ?> state, KeyValueIterator<byte[], byte[]> iterator)
throws IOException {
try {
long targetFileSize = options.targetFileSizeBase();

List<String> files = new ArrayList<>();
SstFileWriter writer = null;
long recordNum = 0;
while (iterator.advanceNext()) {
byte[] key = iterator.getKey();
byte[] value = iterator.getValue();

if (writer == null) {
writer = new SstFileWriter(new EnvOptions(), options);
String path = new File(this.path, "sst-" + (sstIndex++)).getPath();
writer.open(path);
files.add(path);
}
throws IOException, RocksDBException {
long targetFileSize = options.targetFileSizeBase();

List<String> files = new ArrayList<>();
SstFileWriter writer = null;
long recordNum = 0;
while (iterator.advanceNext()) {
byte[] key = iterator.getKey();
byte[] value = iterator.getValue();

if (writer == null) {
writer = new SstFileWriter(new EnvOptions(), options);
String path = new File(this.path, "sst-" + (sstIndex++)).getPath();
writer.open(path);
files.add(path);
}

try {
writer.put(key, value);
recordNum++;
if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
writer.finish();
writer = null;
recordNum = 0;
}
} catch (RocksDBException e) {
throw new RuntimeException(
"Exception in bulkLoad, the most suspicious reason is that "
+ "your data contains duplicates, please check your sink table.",
e);
}

if (writer != null) {
recordNum++;
if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
writer.finish();
writer = null;
recordNum = 0;
}
}

if (files.size() > 0) {
db.ingestExternalFile(state.columnFamily, files, new IngestExternalFileOptions());
}
} catch (Exception e) {
throw new IOException(e);
if (writer != null) {
writer.finish();
}

if (files.size() > 0) {
db.ingestExternalFile(state.columnFamily, files, new IngestExternalFileOptions());
}
}

Expand Down

0 comments on commit 116789c

Please sign in to comment.