Skip to content

Commit

Permalink
chore(datastore): use StorageAccessService directly insteadof ObjectS…
Browse files Browse the repository at this point in the history
…tore (#1483)
  • Loading branch information
xuchuan authored Nov 11, 2022
1 parent cad9298 commit c172676
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 810 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import ai.starwhale.mlops.exception.SwProcessException;
import ai.starwhale.mlops.exception.SwProcessException.ErrorType;
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.memory.SwBufferManager;
import ai.starwhale.mlops.storage.StorageAccessService;
import java.io.IOException;
import java.lang.ref.SoftReference;
Expand Down Expand Up @@ -62,7 +61,6 @@ public class DataStore {
private final DumpThread dumpThread;

public DataStore(StorageAccessService storageAccessService,
SwBufferManager swBufferManager,
@Value("${sw.datastore.walFileSize}") int walFileSize,
@Value("${sw.datastore.walMaxFileSize}") int walMaxFileSize,
@Value("${sw.datastore.walWaitIntervalMillis}") int walWaitIntervalMillis,
Expand All @@ -79,8 +77,7 @@ public DataStore(StorageAccessService storageAccessService,
dataRootPath += "/";
}
this.snapshotRootPath = dataRootPath + "snapshot/";
this.walManager = new WalManager(new ObjectStore(swBufferManager, this.storageAccessService),
swBufferManager,
this.walManager = new WalManager(this.storageAccessService,
walFileSize,
walMaxFileSize,
dataRootPath + "wal/",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.exception.SwProcessException;
import ai.starwhale.mlops.exception.SwProcessException.ErrorType;
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.memory.SwBuffer;
import ai.starwhale.mlops.memory.SwBufferInputStream;
import ai.starwhale.mlops.memory.SwBufferManager;
import ai.starwhale.mlops.memory.SwBufferOutputStream;
import ai.starwhale.mlops.storage.StorageAccessService;
import com.google.protobuf.CodedOutputStream;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -39,14 +39,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.FastByteArrayOutputStream;
import org.xerial.snappy.Snappy;

@Slf4j
public class WalManager extends Thread {

private final ObjectStore objectStore;

private final SwBufferManager bufferManager;
private final StorageAccessService storageAccessService;

private final int walFileSize;

Expand All @@ -60,11 +59,9 @@ public class WalManager extends Thread {

private final LinkedList<Wal.WalEntry> entriesToWrite = new LinkedList<>();

private SwBufferOutputStream outputStream;

private final SwBuffer outputBuffer;
private final FastByteArrayOutputStream outputStream;

private final SwBuffer compressedBuffer;
private final byte[] compressedBuffer;

private static final byte NO_COMPRESSION = 0;

Expand All @@ -84,24 +81,22 @@ public class WalManager extends Thread {

private final Map<String, Long> walLogFileMap = new ConcurrentHashMap<>();

public WalManager(ObjectStore objectStore,
SwBufferManager bufferManager,
public WalManager(StorageAccessService storageAccessService,
int walFileSize,
int walMaxFileSize,
String walPrefix,
int walWaitIntervalMillis,
int ossMaxAttempts) {
this.objectStore = objectStore;
this.bufferManager = bufferManager;
this.storageAccessService = storageAccessService;
this.walFileSize = walFileSize;
this.walMaxFileSize = walMaxFileSize;
this.walMaxFileSizeNoHeader = this.walMaxFileSize - this.header.length;
this.logFilePrefix = walPrefix + "wal.log.";
this.walWaitIntervalMillis = walWaitIntervalMillis;
this.ossMaxAttempts = ossMaxAttempts;
this.outputBuffer = this.bufferManager.allocate(this.walMaxFileSizeNoHeader);
this.compressedBuffer = this.bufferManager.allocate(this.walMaxFileSize);
this.outputStream = new SwBufferOutputStream(this.outputBuffer);
this.compressedBuffer = new byte[this.walMaxFileSize];
System.arraycopy(this.header, 0, this.compressedBuffer, 0, this.header.length);
this.outputStream = new FastByteArrayOutputStream(this.walMaxFileSize);
var walMap = new TreeMap<Integer, String>();
Iterator<String> it;
try {
Expand All @@ -111,7 +106,7 @@ public WalManager(ObjectStore objectStore,
.intervalFunction(IntervalFunction.ofExponentialRandomBackoff(100, 2.0, 0.5, 10000))
.retryOnException(e -> !terminated)
.build()),
() -> this.objectStore.list(this.logFilePrefix))
() -> this.storageAccessService.list(this.logFilePrefix).iterator())
.apply();
} catch (Throwable e) {
throw new SwProcessException(SwProcessException.ErrorType.DATASTORE, "fail to read WAL", e);
Expand Down Expand Up @@ -139,8 +134,8 @@ public Iterator<Wal.WalEntry> readAll() {
private final List<String> files = WalManager.this.walLogFileMap.keySet().stream()
.sorted(Comparator.comparingInt(fn -> Integer.parseInt(fn.substring(logFilePrefix.length()))))
.collect(Collectors.toList());
private final SwBuffer buf = WalManager.this.bufferManager.allocate(WalManager.this.walMaxFileSize);
private SwBufferInputStream inputStream;
private final byte[] buf = new byte[WalManager.this.walMaxFileSize];
private InputStream inputStream;
private String currentFile;

@Override
Expand All @@ -159,7 +154,7 @@ public Wal.WalEntry next() {
}
try {
var ret = Wal.WalEntry.parseDelimitedFrom(this.inputStream);
if (this.inputStream.remaining() == 0) {
if (this.inputStream.available() == 0) {
this.inputStream = null;
}
maxEntryId = ret.getId();
Expand All @@ -174,50 +169,58 @@ public Wal.WalEntry next() {
private void getNext() {
this.currentFile = this.files.get(0);
this.files.remove(0);
SwBuffer data;
byte[] data;
try {
data = Retry.decorateCheckedSupplier(
Retry.of("get", RetryConfig.custom()
.maxAttempts(WalManager.this.ossMaxAttempts)
.intervalFunction(IntervalFunction.ofExponentialRandomBackoff(
100, 2.0, 0.5, 10000))
.build()),
() -> objectStore.get(this.currentFile))
() -> {
var input = WalManager.this.storageAccessService.get(this.currentFile);
int len = Math.toIntExact(input.getSize());
var ret = new byte[len];
var n = input.readNBytes(ret, 0, len);
if (n != len) {
throw new RuntimeException(
MessageFormat.format("expected size {0}, actual {1}", len, n));
}
return ret;
})
.apply();
} catch (Throwable e) {
throw new SwProcessException(SwProcessException.ErrorType.DATASTORE,
"fail to read from object store", e);
throw new SwProcessException(ErrorType.DATASTORE, "fail to read from storage", e);
}
if (data.capacity() < WalManager.this.header.length) {
throw new SwProcessException(SwProcessException.ErrorType.DATASTORE,
MessageFormat.format("corrupted file, size={0}", data.capacity()));
if (data.length < WalManager.this.header.length) {
throw new SwProcessException(ErrorType.DATASTORE,
MessageFormat.format("corrupted file, size={0}", data.length));
}
if (data[0] != WalManager.this.header[0]
|| data[1] != WalManager.this.header[1]
|| data[2] != WalManager.this.header[2]) {
throw new SwProcessException(ErrorType.DATASTORE, "invalid wal log file header");
}
int uncompressedSize;
var h = new byte[4];
data.getBytes(0, h, 0, h.length);
var compressed = data.slice(h.length, data.capacity() - h.length);
if (h[3] == NO_COMPRESSION) {
compressed.copyTo(this.buf);
uncompressedSize = compressed.capacity();
} else {
if (data[3] == NO_COMPRESSION) {
this.inputStream = new ByteArrayInputStream(data,
WalManager.this.header.length,
data.length - WalManager.this.header.length);
} else if (data[3] == SNAPPY) {
try {
var inBuf = compressed.asByteBuffer();
var outBuf = this.buf.asByteBuffer();
if (inBuf.hasArray()) {
uncompressedSize = Snappy.uncompress(inBuf.array(),
inBuf.arrayOffset(),
inBuf.capacity(),
outBuf.array(),
outBuf.arrayOffset());
} else {
uncompressedSize = Snappy.uncompress(inBuf, outBuf);
}
uncompressedSize = Snappy.uncompress(data,
WalManager.this.header.length,
data.length - WalManager.this.header.length,
this.buf,
0);
this.inputStream = new ByteArrayInputStream(this.buf, 0, uncompressedSize);
} catch (IOException e) {
throw new SwProcessException(SwProcessException.ErrorType.DATASTORE, "fail to uncompress", e);
}
} else {
throw new SwProcessException(SwProcessException.ErrorType.DATASTORE,
"unknown compression code" + data[3]);
}
WalManager.this.bufferManager.release(data);
this.inputStream = new SwBufferInputStream(this.buf.slice(0, uncompressedSize));
}
};
}
Expand Down Expand Up @@ -261,7 +264,7 @@ public void terminate() {
/**
* Remove any WAL log files that contain no entry IDs greater than or equal to minWalLogIdToRetain, except the
* latest one.
*
* <p>
* The latest WAL log file should always be kept so that maxEntryId can be recovered from WAl log files when the
* data store starts up.
*
Expand All @@ -275,7 +278,7 @@ public void removeWalLogFiles(long minWalLogIdToRetain) throws IOException {
.filter(entry -> entry.getValue() < minValue)
.map(Entry::getKey)
.collect(Collectors.toList())) {
this.objectStore.delete(logFile);
this.storageAccessService.delete(logFile);
this.walLogFileMap.remove(logFile);
}
}
Expand All @@ -288,7 +291,7 @@ public void run() {
if (status == PopulationStatus.TERMINATED) {
return;
}
this.writeToObjectStore(status == PopulationStatus.BUFFER_FULL);
this.writeToStorage(status == PopulationStatus.BUFFER_FULL);
} catch (Throwable e) {
log.error("unexpected exception", e);
try {
Expand Down Expand Up @@ -394,9 +397,9 @@ private PopulationStatus populateOutput() {
}
entry = this.entriesToWrite.getFirst();
}
if (CodedOutputStream.computeMessageSizeNoTag(entry) + this.outputStream.getOffset()
if (CodedOutputStream.computeMessageSizeNoTag(entry) + this.outputStream.size()
> this.walMaxFileSizeNoHeader) {
if (this.outputStream.getOffset() == 0) {
if (this.outputStream.size() == 0) {
// huge single entry
log.error(
"data loss: discard unexpected huge entry. size={} table={} schema={} records count={}",
Expand All @@ -419,35 +422,30 @@ private PopulationStatus populateOutput() {
}
this.maxEntryIdInOutputBuffer = entry.getId();
}
if (this.outputStream.getOffset() >= this.walFileSize) {
if (this.outputStream.size() >= this.walFileSize) {
return PopulationStatus.BUFFER_FULL;
}
return PopulationStatus.NO_MORE_ENTRIES;
}

private void writeToObjectStore(boolean clearOutput) {
private void writeToStorage(boolean clearOutput) {
int compressedSize;
try {
var inBuf = this.outputBuffer.asByteBuffer();
var outBuf = this.compressedBuffer.asByteBuffer();
this.header[3] = WalManager.SNAPPY;
outBuf.put(header);
if (inBuf.hasArray()) {
compressedSize = Snappy.compress(inBuf.array(),
inBuf.arrayOffset(),
this.outputStream.getOffset(),
outBuf.array(),
outBuf.arrayOffset() + outBuf.position());
} else {
inBuf.limit(this.outputStream.getOffset());
compressedSize = Snappy.compress(inBuf, outBuf);
}
this.compressedBuffer[3] = WalManager.SNAPPY;
compressedSize = Snappy.compress(this.outputStream.toByteArrayUnsafe(),
0,
this.outputStream.size(),
this.compressedBuffer,
this.header.length);
} catch (IOException e) {
log.warn("failed to compress", e);
this.header[3] = WalManager.NO_COMPRESSION;
this.compressedBuffer.setBytes(0, this.header, 0, this.header.length);
this.outputBuffer.copyTo(this.compressedBuffer.slice(4, this.outputStream.getOffset()));
compressedSize = this.outputStream.getOffset();
this.compressedBuffer[3] = WalManager.NO_COMPRESSION;
compressedSize = this.outputStream.size();
System.arraycopy(this.outputStream.toByteArrayUnsafe(),
0,
this.compressedBuffer,
this.header.length,
compressedSize);
}
var key = this.logFilePrefix + this.logFileIndex;
try {
Expand All @@ -457,15 +455,17 @@ private void writeToObjectStore(boolean clearOutput) {
.maxAttempts(this.ossMaxAttempts)
.intervalFunction(IntervalFunction.ofExponentialRandomBackoff(100, 2.0, 0.5, 10000))
.build()),
() -> this.objectStore.put(key, this.compressedBuffer.slice(0, compressedBufferSize)))
() -> this.storageAccessService.put(key,
new ByteArrayInputStream(this.compressedBuffer),
compressedBufferSize))
.run();
} catch (Throwable e) {
log.error("data loss: failed to write wal log", e);
}
this.walLogFileMap.put(key, this.maxEntryIdInOutputBuffer);
if (clearOutput) {
++this.logFileIndex;
this.outputStream = new SwBufferOutputStream(this.outputBuffer);
this.outputStream.reset();
}
}

Expand Down
Loading

0 comments on commit c172676

Please sign in to comment.