Skip to content

Commit

Permalink
feat(datastore): add write ahead log for data store (#935)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuan authored Aug 23, 2022
1 parent 69bb612 commit 2a9424d
Show file tree
Hide file tree
Showing 38 changed files with 2,058 additions and 237 deletions.
26 changes: 26 additions & 0 deletions server/controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.8.4</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down Expand Up @@ -216,6 +226,22 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<optimizeCodegen>false</optimizeCodegen>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
@ToString
@EqualsAndHashCode
public class ColumnSchema {
private String name;
private ColumnType type;
private final String name;
private final ColumnType type;
private final int index;

public ColumnSchema(@NonNull ColumnSchemaDesc schema) {
public ColumnSchema(@NonNull ColumnSchemaDesc schema, int index) {
if (schema.getName() == null) {
throw new SWValidationException(SWValidationException.ValidSubject.DATASTORE).tip(
"column name should not be null");
Expand All @@ -44,5 +45,6 @@ public ColumnSchema(@NonNull ColumnSchemaDesc schema) {
throw new SWValidationException(SWValidationException.ValidSubject.DATASTORE).tip(
"invalid column type " + schema.getType());
}
this.index = index;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.exception.SWValidationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import ai.starwhale.mlops.datastore.impl.MemoryTableImpl;
import ai.starwhale.mlops.exception.SWValidationException;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -26,14 +26,31 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Component
public class DataStore {
private final WalManager walManager;

private final Map<String, MemoryTable> tables = new ConcurrentHashMap<>();

public DataStore(WalManager walManager) {
this.walManager = walManager;
var it = this.walManager.readAll();
while (it.hasNext()) {
var entry = it.next();
var tableName = entry.getTableName();
var table = this.tables.computeIfAbsent(tableName, k -> new MemoryTableImpl(tableName, this.walManager));
table.updateFromWal(entry);
}
}

public void terminate() {
this.walManager.terminate();
}

public void update(String tableName,
TableSchemaDesc schema,
List<Map<String, String>> records) {
var table = this.tables.computeIfAbsent(tableName, k -> new MemoryTableImpl());
var table = this.tables.computeIfAbsent(tableName, k -> new MemoryTableImpl(tableName, this.walManager));
table.update(schema, records);
}

Expand Down Expand Up @@ -75,8 +92,9 @@ public RecordList scan(DataStoreScanRequest req) {
List<Map<String, String>> ret = new ArrayList<>();
while (!iters.isEmpty() && (req.getLimit() < 0 || ret.size() < req.getLimit())) {
lastKey = Collections.min(iters, (a, b) -> {
var x = (Comparable) a.getKey();
var y = (Comparable) b.getKey();
@SuppressWarnings("rawtypes") var x = (Comparable) a.getKey();
@SuppressWarnings("rawtypes") var y = (Comparable) b.getKey();
//noinspection unchecked
return x.compareTo(y);
}).getKey();
var record = new HashMap<String, String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
public interface MemoryTable {
TableSchema getSchema();

void updateFromWal(Wal.WalEntry entry);

void update(TableSchemaDesc schema, List<Map<String, String>> records);

RecordList query(Map<String, String> columns,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.starwhale.mlops.datastore;

import lombok.AllArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class TableSchema {
@Getter
private final ColumnType keyColumnType;
private final Map<String, ColumnSchema> columnSchemaMap;
private int maxColumnIndex;

public TableSchema(@NonNull TableSchemaDesc schema) {
this.keyColumn = schema.getKeyColumn();
Expand All @@ -48,7 +49,7 @@ public TableSchema(@NonNull TableSchemaDesc schema) {
this.columnSchemaMap = new HashMap<>();
if (schema.getColumnSchemaList() != null) {
for (var col : schema.getColumnSchemaList()) {
var colSchema = new ColumnSchema(col);
var colSchema = new ColumnSchema(col, this.maxColumnIndex++);
if (!TableSchema.COLUMN_NAME_PATTERN.matcher(col.getName()).matches()) {
throw new SWValidationException(SWValidationException.ValidSubject.DATASTORE).tip(
"invalid column name " + col.getName());
Expand All @@ -75,6 +76,7 @@ public TableSchema(@NonNull TableSchema schema) {
this.keyColumn = schema.keyColumn;
this.keyColumnType = schema.keyColumnType;
this.columnSchemaMap = new HashMap<>(schema.columnSchemaMap);
this.maxColumnIndex = schema.maxColumnIndex;
}

public ColumnSchema getColumnSchemaByName(@NonNull String name) {
Expand All @@ -85,7 +87,7 @@ public List<ColumnSchema> getColumnSchemas() {
return List.copyOf(this.columnSchemaMap.values());
}

public void merge(@NonNull TableSchemaDesc schema) {
public List<ColumnSchema> merge(@NonNull TableSchemaDesc schema) {
if (schema.getKeyColumn() != null && !this.keyColumn.equals(schema.getKeyColumn())) {
throw new SWValidationException(SWValidationException.ValidSubject.DATASTORE).tip(
MessageFormat.format(
Expand All @@ -94,9 +96,10 @@ public void merge(@NonNull TableSchemaDesc schema) {
schema.getKeyColumn()));
}
var columnSchemaMap = new HashMap<String, ColumnSchema>();
var columnIndex = this.maxColumnIndex;
for (var col : schema.getColumnSchemaList()) {
var current = this.columnSchemaMap.get(col.getName());
var colSchema = new ColumnSchema(col);
var colSchema = new ColumnSchema(col, current == null ? columnIndex++ : current.getIndex());
if (current != null
&& current.getType() != ColumnType.UNKNOWN
&& colSchema.getType() != ColumnType.UNKNOWN
Expand All @@ -105,11 +108,14 @@ public void merge(@NonNull TableSchemaDesc schema) {
MessageFormat.format("conflicting type for column {0}, expected {1}, actual {2}",
col.getName(), current.getType(), col.getType()));
}
if (current == null || colSchema.getType() != ColumnType.UNKNOWN) {
if (current == null
|| current.getType() != colSchema.getType() && colSchema.getType() != ColumnType.UNKNOWN) {
columnSchemaMap.put(col.getName(), colSchema);
}
}
this.columnSchemaMap.putAll(columnSchemaMap);
this.maxColumnIndex = columnIndex;
return List.copyOf(columnSchemaMap.values());
}

public Map<String, ColumnType> getColumnTypeMapping() {
Expand Down
Loading

0 comments on commit 2a9424d

Please sign in to comment.