Skip to content

Commit

Permalink
enhance(datastore): add parquet read/write support for memory table
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuan committed Nov 8, 2022
1 parent e88ba85 commit 215f900
Show file tree
Hide file tree
Showing 20 changed files with 1,299 additions and 70 deletions.
31 changes: 31 additions & 0 deletions server/controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,37 @@
<version>${mockito.inline.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ public ColumnSchema(@NonNull ColumnSchemaDesc schema, int index) {
this.index = index;
}

public ColumnSchema(Wal.ColumnSchema schema) {
this(WalManager.parseColumnSchema(schema), schema.getColumnIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@

package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.datastore.parquet.ValueSetter;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.NonNull;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.Types;

@Getter
public abstract class ColumnType {
Expand Down Expand Up @@ -174,4 +184,59 @@ public static int compare(ColumnType type1, Object value1, ColumnType type2, Obj
public abstract Object fromWal(Wal.Column col);

public abstract Wal.Column.Builder toWal(int columnIndex, Object value);

public Type toParquetType(String name) {
var builder = Types.optionalGroup();
builder.addField(Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.REQUIRED).named("null"));
builder.addField(this.buildParquetType().named("value"));
return builder.named(name);
}

protected abstract Types.Builder<?, ? extends Type> buildParquetType();

public void writeParquetValue(RecordConsumer recordConsumer, Object value) {
recordConsumer.startGroup();
recordConsumer.startField("null", 0);
recordConsumer.addBoolean(value == null);
recordConsumer.endField("null", 0);
if (value != null) {
recordConsumer.startField("value", 1);
this.writeNonNullParquetValue(recordConsumer, value);
recordConsumer.endField("value", 1);
}
recordConsumer.endGroup();
}

protected abstract void writeNonNullParquetValue(RecordConsumer recordConsumer, @NonNull Object value);

public Converter getParquetConverter(ValueSetter valueSetter) {
var nullConverter = new PrimitiveConverter() {
@Override
public void addBoolean(boolean value) {
if (value) {
valueSetter.setValue(null);
}
}
};
var valueConverter = getParquetValueConverter(valueSetter);
return new GroupConverter() {
@Override
public Converter getConverter(int index) {
if (index == 0) {
return nullConverter;
}
return valueConverter;
}

@Override
public void start() {
}

@Override
public void end() {
}
};
}

protected abstract Converter getParquetValueConverter(ValueSetter valueSetter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@

package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.datastore.parquet.ValueSetter;
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.exception.SwValidationException.ValidSubject;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

@Getter
@EqualsAndHashCode(callSuper = false)
Expand Down Expand Up @@ -106,4 +116,62 @@ public Wal.Column.Builder toWal(int columnIndex, Object value) {
.map(element -> this.elementType.toWal(0, element).build())
.collect(Collectors.toList()));
}

@Override
public Types.Builder<?, ? extends Type> buildParquetType() {
return Types.optionalGroup().addField(
Types.repeatedGroup().addField(elementType.toParquetType("element")).named("list"));
}

@Override
public void writeNonNullParquetValue(RecordConsumer recordConsumer, @NonNull Object value) {
recordConsumer.startField("list", 0);
var listValue = (List<?>) value;
for (var element : listValue) {
recordConsumer.startGroup();
recordConsumer.startField("element", 0);
this.elementType.writeParquetValue(recordConsumer, element);
recordConsumer.endField("element", 0);
recordConsumer.endGroup();
}
recordConsumer.endField("list", 0);
}

@Override
protected Converter getParquetValueConverter(ValueSetter valueSetter) {
var list = new AtomicReference<List<Object>>();
var elementConverter = elementType.getParquetConverter(v -> {
list.get().add(v);
});
var listConverter = new GroupConverter() {
@Override
public Converter getConverter(int i) {
return elementConverter;
}

@Override
public void start() {
}

@Override
public void end() {
}
};
return new GroupConverter() {
@Override
public Converter getConverter(int i) {
return listConverter;
}

@Override
public void start() {
list.set(new ArrayList<>());
}

@Override
public void end() {
valueSetter.setValue(list.get());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@

package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.datastore.parquet.ValueSetter;
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.exception.SwValidationException.ValidSubject;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

@Getter
@EqualsAndHashCode(callSuper = false)
Expand Down Expand Up @@ -155,4 +166,67 @@ public Wal.Column.Builder toWal(int columnIndex, Object value) {
.toWal(0, entry.getValue())
.build()))));
}

@Override
public Types.Builder<?, ? extends Type> buildParquetType() {
var builder = Types.optionalGroup();
for (var entry : this.attributes.entrySet()) {
builder.addField(entry.getValue().toParquetType(entry.getKey()));
}
return builder;
}

@Override
public void writeNonNullParquetValue(RecordConsumer recordConsumer, @NonNull Object value) {
recordConsumer.startGroup();
ColumnTypeObject.writeMapValue(recordConsumer, this.attributes, (Map<?, ?>) value);
recordConsumer.endGroup();
}

public static void writeMapValue(RecordConsumer recordConsumer,
Map<String, ColumnType> schema,
Map<?, ?> value) {
int index = 0;
for (var entry : new TreeMap<>(schema).entrySet()) {
var attrType = entry.getValue();
var attrValue = value.get(entry.getKey());
if (value.containsKey(entry.getKey())) {
recordConsumer.startField(entry.getKey(), index);
attrType.writeParquetValue(recordConsumer, attrValue);
recordConsumer.endField(entry.getKey(), index);
}
++index;
}
}

@Override
protected Converter getParquetValueConverter(ValueSetter valueSetter) {
return ColumnTypeObject.getObjectConverter(valueSetter, this.attributes);
}

public static GroupConverter getObjectConverter(ValueSetter valueSetter, Map<String, ColumnType> schema) {
var converters = new ArrayList<Converter>();
var map = new AtomicReference<Map<String, Object>>();
for (var entry : new TreeMap<>(schema).entrySet()) {
var name = entry.getKey();
var type = entry.getValue();
converters.add(type.getParquetConverter(value -> map.get().put(name, value)));
}
return new GroupConverter() {
@Override
public Converter getConverter(int i) {
return converters.get(i);
}

@Override
public void start() {
map.set(new HashMap<>());
}

@Override
public void end() {
valueSetter.setValue(map.get());
}
};
}
}
Loading

0 comments on commit 215f900

Please sign in to comment.