Skip to content

feat(db/trans-cache): verify cached files with crc32c #5523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 70 additions & 16 deletions chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteSource;
import com.google.common.primitives.Longs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
Expand All @@ -18,6 +22,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -79,6 +84,8 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {

private final Path cacheFile0;
private final Path cacheFile1;
private String crc32c0;
private String crc32c1;
private final Path cacheProperties;
private final Path cacheDir;
private AtomicBoolean isValid = new AtomicBoolean(false);
Expand Down Expand Up @@ -281,13 +288,18 @@ private boolean recovery() {
CompletableFuture<Boolean> tk1 = loadProperties.thenApplyAsync(
v -> recovery(1, this.cacheFile1));

return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(this::handleException).join();
try {
return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(this::handleException).join();
} finally {
clearCrc32c();
}
}

private boolean recovery(int index, Path file) {
checkCrc32c(index, file);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this error happened in tests? Why will it happen? Will some error info printed when this kind of errors be found?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To catch accidents like disk write errors, service crash etc.

Will some error info printed when this kind of errors be found?

Yes, there will be an exception.

try (InputStream in = new BufferedInputStream(Files.newInputStream(file,
StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) {
logger.info("recovery bloomFilter[{}] from file.", index);
Expand Down Expand Up @@ -329,24 +341,38 @@ private void dump() {
() -> dump(0, this.cacheFile0));
CompletableFuture<Void> task1 = CompletableFuture.runAsync(
() -> dump(1, this.cacheFile1));
CompletableFuture.allOf(task0, task1).thenRun(() -> {
writeProperties();
logger.info("dump bloomFilters done.");

}).exceptionally(e -> {
logger.info("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
try {
CompletableFuture.allOf(task0, task1).thenRun(() -> {
writeProperties();
logger.info("dump bloomFilters done.");
}).exceptionally(e -> {
logger.info("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
} finally {
clearCrc32c();
}
}

private void dump(int index, Path file) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
bloomFilters[index].writeTo(out);
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.",
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
String crc32c = getCrc32c(file);
if (index == 0) {
this.crc32c0 = crc32c;
} else {
this.crc32c1 = crc32c;
}
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, "
+ "crc32c: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
crc32c, System.currentTimeMillis() - start);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -367,6 +393,8 @@ private boolean loadProperties() {
String.format("currentBlockNum not match. filter: %d, db: %d",
currentBlockNum, currentBlockNumFromDB));
}
this.crc32c0 = properties.getProperty("crc32c0");
this.crc32c1 = properties.getProperty("crc32c1");
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
return true;
Expand All @@ -382,6 +410,8 @@ private void writeProperties() {
properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock));
properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum));
properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex));
properties.setProperty("crc32c0", this.crc32c0);
properties.setProperty("crc32c1", this.crc32c1);
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
Expand All @@ -390,6 +420,30 @@ private void writeProperties() {
}
}

private String getCrc32c(Path file) throws IOException {
ByteSource byteSource = com.google.common.io.Files.asByteSource(file.toFile());
HashCode hc = byteSource.hash(Hashing.crc32c());
return hc.toString();
}

private void checkCrc32c(int index, Path file) {
try {
String actual = getCrc32c(file);
String expect = index == 0 ? this.crc32c0 : this.crc32c1;
if (!Objects.equals(actual, expect)) {
throw new IllegalStateException("crc32c not match. index: " + index + ", expect: " + expect
+ ", actual: " + actual);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void clearCrc32c() {
this.crc32c0 = null;
this.crc32c1 = null;
}

@Override
public TxCacheDB newInstance() {
return new TxCacheDB(name, recentTransactionStore, dynamicPropertiesStore);
Expand Down