Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datastore): do not compress write ahead log file in case compression fails #992

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class WalManager extends Thread {

private final int walMaxFileSize;

private final int walMaxFileSizeNoHeader;

private final String logFilePrefix;

private final int walWaitIntervalMillis;
Expand All @@ -61,6 +63,12 @@ public class WalManager extends Thread {

private final SwBuffer compressedBuffer;

private static final byte NO_COMPRESSION = 0;

private static final byte SNAPPY = 1;

private final byte[] header = new byte[]{'s', 'w', 'l', 0};

private boolean terminated;

private int logFileIndex;
Expand All @@ -77,9 +85,10 @@ public WalManager(ObjectStore objectStore,
this.bufferManager = bufferManager;
this.walFileSize = walFileSize;
this.walMaxFileSize = walMaxFileSize;
this.walMaxFileSizeNoHeader = this.walMaxFileSize - this.header.length;
this.logFilePrefix = walPrefix + "wal.log.";
this.walWaitIntervalMillis = walWaitIntervalMillis;
this.outputBuffer = this.bufferManager.allocate(this.walMaxFileSize);
this.outputBuffer = this.bufferManager.allocate(this.walMaxFileSizeNoHeader);
this.compressedBuffer = this.bufferManager.allocate(this.walMaxFileSize);
this.outputStream = new SwBufferOutputStream(this.outputBuffer);
var walMap = new TreeMap<Integer, String>();
Expand Down Expand Up @@ -143,18 +152,34 @@ private void getNext() {
log.error("fail to read from object store", e);
throw new SWProcessException(SWProcessException.ErrorType.DATASTORE);
}
if (data.capacity() < WalManager.this.header.length) {
log.error("corrupted file, size={}", data.capacity());
throw new SWProcessException(SWProcessException.ErrorType.DATASTORE);
}
int uncompressedSize;
try {
var inBuf = data.asByteBuffer();
var outBuf = this.buf.asByteBuffer();
if (inBuf.hasArray()) {
uncompressedSize = Snappy.uncompress(inBuf.array(), 0, inBuf.capacity(), outBuf.array(), 0);
} else {
uncompressedSize = Snappy.uncompress(inBuf, outBuf);
var h = new byte[4];
goldenxinxing marked this conversation as resolved.
Show resolved Hide resolved
data.getBytes(0, h, 0, h.length);
var compressed = data.slice(h.length, data.capacity() - h.length);
if (h[3] == NO_COMPRESSION) {
compressed.copyTo(this.buf);
uncompressedSize = compressed.capacity();
} else {
try {
var inBuf = compressed.asByteBuffer();
var outBuf = this.buf.asByteBuffer();
if (inBuf.hasArray()) {
uncompressedSize = Snappy.uncompress(inBuf.array(),
inBuf.arrayOffset(),
inBuf.capacity(),
outBuf.array(),
outBuf.arrayOffset());
} else {
uncompressedSize = Snappy.uncompress(inBuf, outBuf);
}
} catch (IOException e) {
log.error("fail to uncompress", e);
throw new SWProcessException(SWProcessException.ErrorType.DATASTORE);
}
} catch (IOException e) {
log.error("fail to uncompress", e);
throw new SWProcessException(SWProcessException.ErrorType.DATASTORE);
}
WalManager.this.bufferManager.release(data);
this.inputStream = new SwBufferInputStream(this.buf.slice(0, uncompressedSize));
Expand All @@ -163,7 +188,7 @@ private void getNext() {
}

public void append(Wal.WalEntry entry) {
if (entry.getSerializedSize() > this.walMaxFileSize) {
if (entry.getSerializedSize() > this.walMaxFileSizeNoHeader) {
for (var e : this.splitEntry(entry)) {
this.append(e);
}
Expand Down Expand Up @@ -241,7 +266,7 @@ private PopulationStatus populateOutput() {
entry = this.entries.getFirst();
}
if (CodedOutputStream.computeMessageSizeNoTag(entry) + this.outputStream.getOffset()
> this.walMaxFileSize) {
> this.walMaxFileSizeNoHeader) {
if (this.outputStream.getOffset() == 0) {
// huge single entry
log.error(
Expand Down Expand Up @@ -275,25 +300,35 @@ private void writeToObjectStore(boolean clearOutput) {
try {
var inBuf = this.outputBuffer.asByteBuffer();
var outBuf = this.compressedBuffer.asByteBuffer();
this.header[3] = WalManager.SNAPPY;
outBuf.put(header);
if (inBuf.hasArray()) {
compressedSize = Snappy.compress(inBuf.array(), 0, this.outputStream.getOffset(), outBuf.array(), 0);
compressedSize = Snappy.compress(inBuf.array(),
inBuf.arrayOffset(),
this.outputStream.getOffset(),
outBuf.array(),
outBuf.arrayOffset() + outBuf.position());
} else {
inBuf.limit(this.outputStream.getOffset());
compressedSize = Snappy.compress(inBuf, outBuf);
}
} catch (IOException e) {
log.error("data loss: failed to compress", e);
return;
log.warn("failed to compress", e);
this.header[3] = WalManager.NO_COMPRESSION;
this.compressedBuffer.setBytes(0, this.header, 0, this.header.length);
this.outputBuffer.copyTo(this.compressedBuffer.slice(4, this.outputStream.getOffset()));
goldenxinxing marked this conversation as resolved.
Show resolved Hide resolved
compressedSize = this.outputStream.getOffset();
}
try {
int compressedBufferSize = compressedSize + this.header.length;
Retry.decorateCheckedRunnable(
Retry.of("put", RetryConfig.custom()
.maxAttempts(10000)
.intervalFunction(IntervalFunction.ofExponentialRandomBackoff(100, 2.0, 0.5, 10000))
.retryOnException(e -> !terminated)
.build()),
() -> this.objectStore.put(this.logFilePrefix + this.logFileIndex,
this.compressedBuffer.slice(0, compressedSize)))
this.compressedBuffer.slice(0, compressedBufferSize)))
.run();
} catch (Throwable e) {
log.error("data loss: failed to write wal log", e);
Expand All @@ -314,22 +349,23 @@ private List<Wal.WalEntry> splitEntry(Wal.WalEntry entry) {
builder.setTableSchema(entry.getTableSchema());
}
int currentEntrySize = builder.build().getSerializedSize();
if (currentEntrySize > this.walMaxFileSize) {
if (currentEntrySize > this.walMaxFileSizeNoHeader) {
throw new SWValidationException(SWValidationException.ValidSubject.DATASTORE,
"schema is too large or walMaxFileSize is too small. size=" + currentEntrySize
+ " walMaxFileSize=" + this.walMaxFileSize);
+ " walMaxFileSizeNoHeader=" + this.walMaxFileSizeNoHeader);
}
for (var record : entry.getRecordsList()) {
// field number is less than 128, so simply use 1 instead
var recordSize = CodedOutputStream.computeMessageSize(1, record);
currentEntrySize += recordSize;
if (currentEntrySize + CodedOutputStream.computeUInt32SizeNoTag(currentEntrySize) > this.walMaxFileSize) {
if (currentEntrySize + CodedOutputStream.computeUInt32SizeNoTag(currentEntrySize)
> this.walMaxFileSizeNoHeader) {
ret.add(builder.build());
builder.clearTableSchema();
builder.clearRecords();
currentEntrySize = headerSize + recordSize;
if (currentEntrySize + CodedOutputStream.computeUInt32SizeNoTag(currentEntrySize)
> this.walMaxFileSize) {
> this.walMaxFileSizeNoHeader) {
throw new SWValidationException(SWValidationException.ValidSubject.DATASTORE,
"huge single record. size=" + currentEntrySize);
}
Expand All @@ -340,7 +376,7 @@ private List<Wal.WalEntry> splitEntry(Wal.WalEntry entry) {
ret.add(builder.build());
}
for (var e : ret) {
if (e.getSerializedSize() > this.walMaxFileSize) {
if (e.getSerializedSize() > this.walMaxFileSizeNoHeader) {
throw new SWProcessException(SWProcessException.ErrorType.DATASTORE,
"invalid entry size " + e.getSerializedSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void put(String name, SwBuffer buf) throws IOException {
var temp = File.createTempFile("sw_tmp", null);
try (var channel = new FileOutputStream(temp).getChannel()) {
channel.write(buf.asByteBuffer());
channel.close();
Files.move(temp.toPath(), f.toPath(), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
} finally {
//noinspection ResultOfMethodCallIgnored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void testAppendSplitSizeCalculation() throws IOException {
this.walManager = new WalManager(this.objectStore,
this.bufferManager,
256,
entry1.getSerializedSize() + CodedOutputStream.computeUInt32SizeNoTag(entry1.getSerializedSize()),
entry1.getSerializedSize() + CodedOutputStream.computeUInt32SizeNoTag(entry1.getSerializedSize()) + 4,
"test/",
10);
builder.addAllRecords(entry2.getRecordsList());
Expand Down