Skip to content

Commit

Permalink
Adding multivalued column support in Pinot connector
Browse files Browse the repository at this point in the history
  • Loading branch information
dharakk authored and zhenxiao committed Sep 12, 2020
1 parent ec9b5cf commit 405ebe6
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DateType;
Expand All @@ -35,6 +36,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -110,9 +112,33 @@ private static Double parseDouble(String value)
}
}

protected void setValue(Type type, BlockBuilder blockBuilder, JsonNode value)
{
if (blockBuilder == null) {
return;
}
if (value == null) {
blockBuilder.appendNull();
return;
}
if (type instanceof ArrayType) {
checkState(value.isArray());

BlockBuilder childBuilder = blockBuilder.beginBlockEntry();
ArrayNode arrayNode = (ArrayNode) value;
for (int i = 0; i < arrayNode.size(); i++) {
setValue(((ArrayType) type).getElementType(), childBuilder, asText(arrayNode.get(i)));
}
blockBuilder.closeEntry();
}
else {
setValue(type, blockBuilder, asText(value));
}
}

protected void setValue(Type type, BlockBuilder blockBuilder, String value)
{
if (type == null || blockBuilder == null) {
if (blockBuilder == null) {
return;
}
if (value == null) {
Expand Down Expand Up @@ -226,7 +252,7 @@ protected void setRows(String query, List<BlockBuilder> blockBuilders, List<Type
String.format("Expected row of %d columns", blockBuilders.size()));
}
for (int columnNumber = 0; columnNumber < blockBuilders.size(); columnNumber++) {
setValue(types.get(columnNumber), blockBuilders.get(columnNumber), asText(result.get(columnNumber)));
setValue(types.get(columnNumber), blockBuilders.get(columnNumber), result.get(columnNumber));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private void setValuesForGroupby(
group.size(),
values.length));
for (int i = 0; i < group.size(); i++) {
setValue(types.get(i), blockBuilders.get(i), asText(group.get(i)));
setValue(types.get(i), blockBuilders.get(i), group.get(i));
}
for (int i = 0; i < values.length; i++) {
int metricColumnIndex = i + numGroupByClause;
Expand Down Expand Up @@ -164,7 +164,7 @@ public int populateFromQueryResults(
// simple aggregation
// TODO: Validate that this is expected semantically
checkState(numGroupByClause == 0, "Expected no group by columns in pinot");
setValue(types.get(aggregationIndex), blockBuilders.get(aggregationIndex), asText(result.get("value")));
setValue(types.get(aggregationIndex), blockBuilders.get(aggregationIndex), result.get("value"));
rowCount = 1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.pinot;

import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DateType;
Expand Down Expand Up @@ -107,7 +108,8 @@ public static Type getPrestoTypeFromPinotType(FieldSpec field, boolean inferDate
}
return getPrestoTypeFromPinotType(field.getDataType());
}
return VarcharType.VARCHAR;

return new ArrayType(getPrestoTypeFromPinotType(field.getDataType()));
}

public static Type getPrestoTypeFromPinotType(DataType dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.pinot.PinotScatterGatherQueryClient.ErrorCode;
import com.facebook.presto.spi.ConnectorPageSource;
Expand Down Expand Up @@ -49,6 +50,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.Slices.utf8Slice;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.LONG_ARRAY;

/**
* This class retrieves Pinot data from a Pinot client, and re-constructs the data into Presto Pages.
Expand Down Expand Up @@ -272,7 +274,10 @@ private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnIn
{
Class<?> javaType = columnType.getJavaType();
ColumnDataType pinotColumnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
if (javaType.equals(boolean.class)) {
if (columnType instanceof ArrayType) {
writeArrayBlock(blockBuilder, columnType, columnIndex);
}
else if (javaType.equals(boolean.class)) {
writeBooleanBlock(blockBuilder, columnType, columnIndex);
}
else if (javaType.equals(long.class)) {
Expand All @@ -295,6 +300,80 @@ else if (javaType.equals(Slice.class)) {
}
}

private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
{
for (int rowIndex = 0; rowIndex < currentDataTable.getDataTable().getNumberOfRows(); rowIndex++) {
ColumnDataType columnPinotType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
Type columnPrestoType = ((ArrayType) columnType).getElementType();
BlockBuilder childBuilder = blockBuilder.beginBlockEntry();
switch (columnPinotType) {
case INT_ARRAY:
int[] intArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
for (int i = 0; i < intArray.length; i++) {
// Both the numeric types implement a writeLong method which write if the bounds for
// the type allows else throw exception.
columnPrestoType.writeLong(childBuilder, intArray[i]);
completedBytes += Long.BYTES;
}
break;
case LONG_ARRAY:
long[] longArray = currentDataTable.getDataTable().getLongArray(rowIndex, columnIndex);
for (int i = 0; i < longArray.length; i++) {
columnPrestoType.writeLong(childBuilder, longArray[i]);
completedBytes += Long.BYTES;
}
break;
case FLOAT_ARRAY:
float[] floatArray = currentDataTable.getDataTable().getFloatArray(rowIndex, columnIndex);
if (columnPrestoType.getJavaType().equals(long.class)) {
for (int i = 0; i < floatArray.length; i++) {
columnPrestoType.writeLong(childBuilder, (long) floatArray[i]);
completedBytes += Long.BYTES;
}
}
else {
for (int i = 0; i < floatArray.length; i++) {
columnPrestoType.writeDouble(childBuilder, floatArray[i]);
completedBytes += Double.BYTES;
}
}
break;
case DOUBLE_ARRAY:
double[] doubleArray = currentDataTable.getDataTable().getDoubleArray(rowIndex, columnIndex);
if (columnPrestoType.getJavaType().equals(long.class)) {
for (int i = 0; i < doubleArray.length; i++) {
columnPrestoType.writeLong(childBuilder, (long) doubleArray[i]);
completedBytes += Long.BYTES;
}
}
else {
for (int i = 0; i < doubleArray.length; i++) {
columnPrestoType.writeDouble(childBuilder, doubleArray[i]);
completedBytes += Double.BYTES;
}
}
break;
case STRING_ARRAY:
String[] stringArray = currentDataTable.getDataTable().getStringArray(rowIndex, columnIndex);
for (int i = 0; i < stringArray.length; i++) {
Slice slice = Slices.utf8Slice(stringArray[i]);
childBuilder.writeBytes(slice, 0, slice.length()).closeEntry();
completedBytes += slice.getBytes().length;
}
break;
default:
throw new PrestoException(
PINOT_UNSUPPORTED_COLUMN_TYPE,
String.format(
"Failed to write column %s. pinotColumnType %s, prestoType %s",
split.getExpectedColumnHandles().get(columnIndex).getColumnName(),
columnPinotType,
columnPrestoType));
}
blockBuilder.closeEntry();
}
}

private void writeBooleanBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
{
for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.pinot.PinotColumnHandle;
import com.facebook.presto.pinot.PinotConfig;
import com.facebook.presto.pinot.PinotException;
Expand Down Expand Up @@ -422,6 +424,7 @@ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQu
case GROUP_BY: {
GroupByColumnNode groupByColumn = (GroupByColumnNode) expression;
VariableReferenceExpression groupByInputColumn = getVariableReference(groupByColumn.getInputColumn());
checkState(groupByInputColumn.getType() instanceof FixedWidthType || groupByInputColumn.getType() instanceof VarcharType);
variablesInAggregation.add(groupByInputColumn);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.JsonCodecFactory;
import com.facebook.airlift.json.ObjectMapperProvider;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.type.TypeRegistry;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
Expand All @@ -27,12 +28,7 @@
import java.util.Map;

import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static java.util.Locale.ENGLISH;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static java.util.Objects.requireNonNull;

public final class MetadataUtil
Expand All @@ -53,29 +49,25 @@ private MetadataUtil()
public static final class TestingTypeDeserializer
extends FromStringDeserializer<Type>
{
private final Map<String, Type> types = ImmutableMap.of(
StandardTypes.BOOLEAN, BOOLEAN,
StandardTypes.BIGINT, BIGINT,
StandardTypes.INTEGER, INTEGER,
StandardTypes.DOUBLE, DOUBLE,
StandardTypes.VARCHAR, VARCHAR);
private final TypeManager typeManager;

public TestingTypeDeserializer()
public TestingTypeDeserializer(TypeManager typeManager)
{
super(Type.class);
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
protected Type _deserialize(String value, DeserializationContext context)
{
Type type = types.get(value.toLowerCase(ENGLISH));
Type type = typeManager.getType(parseTypeSignature(value));
return requireNonNull(type, "Unknown type " + value);
}
}

static {
ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider();
objectMapperProvider.setJsonDeserializers(ImmutableMap.<Class<?>, JsonDeserializer<?>>of(Type.class, new TestingTypeDeserializer()));
objectMapperProvider.setJsonDeserializers(ImmutableMap.<Class<?>, JsonDeserializer<?>>of(Type.class, new TestingTypeDeserializer(new TypeRegistry())));
JsonCodecFactory codecFactory = new JsonCodecFactory(objectMapperProvider);
CATALOG_CODEC = codecFactory.mapJsonCodec(String.class, listJsonCodec(PinotTable.class));
TABLE_CODEC = codecFactory.jsonCodec(PinotTable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -173,7 +174,19 @@ public static Object[][] pqlResponsesProvider()
ImmutableList.of(bigint("activeTrips"), bigint("numDrivers"), varchar("region"), bigint("rowtime"), secondsSinceEpoch, fraction("utilization"), bigint("utilizedDrivers"), bigint("vehicleViewId"), bigint("windowEnd"), bigint("windowStart")),
ImmutableList.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
ImmutableList.of(bigint("activeTrips"), bigint("numDrivers"), varchar("region"), bigint("rowtime"), secondsSinceEpoch, fraction("utilization"), bigint("utilizedDrivers"), bigint("vehicleViewId"), bigint("windowEnd"), bigint("windowStart")),
Optional.empty()}
Optional.empty()},
{"SELECT tag_value FROM eats_job_state LIMIT 10",
"{\"selectionResults\":{\"columns\":[\"tag_value\"],\"results\":[[[\"tag1\"]],[[\"tag1\", \"tag2\"]],[[\"tag2\", \"tag3\"]],[[\"null\"]],[[\"tag4\", \"tag2\", \"tag1\", \"tag5\"]],[[\"tag2\"]],[[\"null\"]],[[\"null\"]],[[\"tag1\", \"tag6\"]],[[\"null\"]]]},\"exceptions\":[],\"numServersQueried\":7,\"numServersResponded\":7,\"numDocsScanned\":380,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":760,\"totalDocs\":55988817,\"numGroupsLimitReached\":false,\"timeUsedMs\":2,\"segmentStatistics\":[],\"traceInfo\":{}}",
ImmutableList.of(array(VARCHAR, "tag_value")),
ImmutableList.of(0),
ImmutableList.of(array(VARCHAR, "tag_value")),
Optional.empty()},
{"SELECT num_values FROM eats_job_state LIMIT 10",
"{\"selectionResults\":{\"columns\":[\"tag_value\"],\"results\":[[[\"123\"]],[[\"456\", \"567\"]],[[\"2345\", \"8907\"]],[[\"0\"]],[[\"123\", \"1234\", \"987\", \"1678\"]],[[\"98\"]],[[\"0\"]],[[\"0\"]],[[\"1\", \"0\"]],[[\"0\"]]]},\"exceptions\":[],\"numServersQueried\":7,\"numServersResponded\":7,\"numDocsScanned\":380,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":760,\"totalDocs\":55988817,\"numGroupsLimitReached\":false,\"timeUsedMs\":2,\"segmentStatistics\":[],\"traceInfo\":{}}",
ImmutableList.of(array(BIGINT, "num_values")),
ImmutableList.of(0),
ImmutableList.of(array(BIGINT, "num_values")),
Optional.empty()},
};
}

Expand Down
Loading

0 comments on commit 405ebe6

Please sign in to comment.