diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java index 1bc1edbcd531..5ebdd2f92827 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java @@ -82,42 +82,45 @@ public RocksDBStateFactory( } public void bulkLoad(RocksDBState state, KeyValueIterator iterator) - throws IOException { - try { - long targetFileSize = options.targetFileSizeBase(); - - List files = new ArrayList<>(); - SstFileWriter writer = null; - long recordNum = 0; - while (iterator.advanceNext()) { - byte[] key = iterator.getKey(); - byte[] value = iterator.getValue(); - - if (writer == null) { - writer = new SstFileWriter(new EnvOptions(), options); - String path = new File(this.path, "sst-" + (sstIndex++)).getPath(); - writer.open(path); - files.add(path); - } + throws IOException, RocksDBException { + long targetFileSize = options.targetFileSizeBase(); + + List files = new ArrayList<>(); + SstFileWriter writer = null; + long recordNum = 0; + while (iterator.advanceNext()) { + byte[] key = iterator.getKey(); + byte[] value = iterator.getValue(); + + if (writer == null) { + writer = new SstFileWriter(new EnvOptions(), options); + String path = new File(this.path, "sst-" + (sstIndex++)).getPath(); + writer.open(path); + files.add(path); + } + try { writer.put(key, value); - recordNum++; - if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) { - writer.finish(); - writer = null; - recordNum = 0; - } + } catch (RocksDBException e) { + throw new RuntimeException( + "Exception in bulkLoad, the most suspicious reason is that " + + "your data contains duplicates, please check your sink table.", + e); } - - if (writer != null) { + recordNum++; + if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) { writer.finish(); + writer = null; + recordNum = 0; } + } - if (files.size() > 0) { - db.ingestExternalFile(state.columnFamily, files, new IngestExternalFileOptions()); - } - } catch (Exception e) { - throw new IOException(e); + if (writer != null) { + writer.finish(); + } + + if (files.size() > 0) { + db.ingestExternalFile(state.columnFamily, files, new IngestExternalFileOptions()); } }