From 405ebe6f8c4af2d0ee9aa4717c0da2d57bc2e42c Mon Sep 17 00:00:00 2001 From: Dharak Kharod Date: Wed, 26 Aug 2020 23:03:35 -0700 Subject: [PATCH] Adding multivalued column support in Pinot connector --- .../pinot/PinotBrokerPageSourceBase.java | 30 ++++++- .../pinot/PinotBrokerPageSourcePql.java | 4 +- .../presto/pinot/PinotColumnUtils.java | 4 +- .../presto/pinot/PinotSegmentPageSource.java | 81 ++++++++++++++++++- .../pinot/query/PinotQueryGenerator.java | 3 + .../facebook/presto/pinot/MetadataUtil.java | 24 ++---- .../pinot/TestPinotBrokerPageSourcePql.java | 15 +++- .../pinot/TestPinotBrokerPageSourceSql.java | 13 +++ .../presto/pinot/TestPinotColumnHandle.java | 10 +++ .../presto/pinot/TestPinotColumnMetadata.java | 25 +++--- .../presto/pinot/TestPinotQueryBase.java | 6 ++ .../pinot/TestPinotSegmentPageSource.java | 54 +++++++++++++ 12 files changed, 234 insertions(+), 35 deletions(-) diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java index 9a4ff6e75fd3c..6e127973e6bde 100644 --- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java +++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java @@ -17,6 +17,7 @@ import com.facebook.presto.common.Page; import com.facebook.presto.common.PageBuilder; import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.type.ArrayType; import com.facebook.presto.common.type.BigintType; import com.facebook.presto.common.type.BooleanType; import com.facebook.presto.common.type.DateType; @@ -35,6 +36,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.slice.Slice; @@ -110,9 +112,33 @@ private static Double parseDouble(String value) } } + protected void setValue(Type type, BlockBuilder blockBuilder, JsonNode value) + { + if (blockBuilder == null) { + return; + } + if (value == null) { + blockBuilder.appendNull(); + return; + } + if (type instanceof ArrayType) { + checkState(value.isArray()); + + BlockBuilder childBuilder = blockBuilder.beginBlockEntry(); + ArrayNode arrayNode = (ArrayNode) value; + for (int i = 0; i < arrayNode.size(); i++) { + setValue(((ArrayType) type).getElementType(), childBuilder, asText(arrayNode.get(i))); + } + blockBuilder.closeEntry(); + } + else { + setValue(type, blockBuilder, asText(value)); + } + } + protected void setValue(Type type, BlockBuilder blockBuilder, String value) { - if (type == null || blockBuilder == null) { + if (blockBuilder == null) { return; } if (value == null) { @@ -226,7 +252,7 @@ protected void setRows(String query, List blockBuilders, List javaType = columnType.getJavaType(); ColumnDataType pinotColumnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex); - if (javaType.equals(boolean.class)) { + if (columnType instanceof ArrayType) { + writeArrayBlock(blockBuilder, columnType, columnIndex); + } + else if (javaType.equals(boolean.class)) { writeBooleanBlock(blockBuilder, columnType, columnIndex); } else if (javaType.equals(long.class)) { @@ -295,6 +300,80 @@ else if (javaType.equals(Slice.class)) { } } + private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex) + { + for (int rowIndex = 0; rowIndex < currentDataTable.getDataTable().getNumberOfRows(); rowIndex++) { + ColumnDataType columnPinotType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex); + Type columnPrestoType = ((ArrayType) columnType).getElementType(); + BlockBuilder childBuilder = blockBuilder.beginBlockEntry(); + switch (columnPinotType) { + case INT_ARRAY: + int[] intArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex); + for (int i = 0; i < intArray.length; i++) { + // Both the numeric types implement a writeLong method which write if the bounds for + // the type allows else throw exception. + columnPrestoType.writeLong(childBuilder, intArray[i]); + completedBytes += Long.BYTES; + } + break; + case LONG_ARRAY: + long[] longArray = currentDataTable.getDataTable().getLongArray(rowIndex, columnIndex); + for (int i = 0; i < longArray.length; i++) { + columnPrestoType.writeLong(childBuilder, longArray[i]); + completedBytes += Long.BYTES; + } + break; + case FLOAT_ARRAY: + float[] floatArray = currentDataTable.getDataTable().getFloatArray(rowIndex, columnIndex); + if (columnPrestoType.getJavaType().equals(long.class)) { + for (int i = 0; i < floatArray.length; i++) { + columnPrestoType.writeLong(childBuilder, (long) floatArray[i]); + completedBytes += Long.BYTES; + } + } + else { + for (int i = 0; i < floatArray.length; i++) { + columnPrestoType.writeDouble(childBuilder, floatArray[i]); + completedBytes += Double.BYTES; + } + } + break; + case DOUBLE_ARRAY: + double[] doubleArray = currentDataTable.getDataTable().getDoubleArray(rowIndex, columnIndex); + if (columnPrestoType.getJavaType().equals(long.class)) { + for (int i = 0; i < doubleArray.length; i++) { + columnPrestoType.writeLong(childBuilder, (long) doubleArray[i]); + completedBytes += Long.BYTES; + } + } + else { + for (int i = 0; i < doubleArray.length; i++) { + columnPrestoType.writeDouble(childBuilder, doubleArray[i]); + completedBytes += Double.BYTES; + } + } + break; + case STRING_ARRAY: + String[] stringArray = currentDataTable.getDataTable().getStringArray(rowIndex, columnIndex); + for (int i = 0; i < stringArray.length; i++) { + Slice slice = Slices.utf8Slice(stringArray[i]); + childBuilder.writeBytes(slice, 0, slice.length()).closeEntry(); + completedBytes += slice.getBytes().length; + } + break; + default: + throw new PrestoException( + PINOT_UNSUPPORTED_COLUMN_TYPE, + String.format( + "Failed to write column %s. pinotColumnType %s, prestoType %s", + split.getExpectedColumnHandles().get(columnIndex).getColumnName(), + columnPinotType, + columnPrestoType)); + } + blockBuilder.closeEntry(); + } + } + private void writeBooleanBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex) { for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) { diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java index c5e49325ccbb4..d2ca84aa55f6a 100644 --- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java +++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java @@ -15,7 +15,9 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.FixedWidthType; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.pinot.PinotColumnHandle; import com.facebook.presto.pinot.PinotConfig; import com.facebook.presto.pinot.PinotException; @@ -422,6 +424,7 @@ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQu case GROUP_BY: { GroupByColumnNode groupByColumn = (GroupByColumnNode) expression; VariableReferenceExpression groupByInputColumn = getVariableReference(groupByColumn.getInputColumn()); + checkState(groupByInputColumn.getType() instanceof FixedWidthType || groupByInputColumn.getType() instanceof VarcharType); variablesInAggregation.add(groupByInputColumn); break; } diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/MetadataUtil.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/MetadataUtil.java index e3255a3913e91..f7ccfaf462e43 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/MetadataUtil.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/MetadataUtil.java @@ -16,8 +16,9 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonCodecFactory; import com.facebook.airlift.json.ObjectMapperProvider; -import com.facebook.presto.common.type.StandardTypes; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.type.TypeRegistry; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; @@ -27,12 +28,7 @@ import java.util.Map; import static com.facebook.airlift.json.JsonCodec.listJsonCodec; -import static com.facebook.presto.common.type.BigintType.BIGINT; -import static com.facebook.presto.common.type.BooleanType.BOOLEAN; -import static com.facebook.presto.common.type.DoubleType.DOUBLE; -import static com.facebook.presto.common.type.IntegerType.INTEGER; -import static com.facebook.presto.common.type.VarcharType.VARCHAR; -import static java.util.Locale.ENGLISH; +import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature; import static java.util.Objects.requireNonNull; public final class MetadataUtil @@ -53,29 +49,25 @@ private MetadataUtil() public static final class TestingTypeDeserializer extends FromStringDeserializer { - private final Map types = ImmutableMap.of( - StandardTypes.BOOLEAN, BOOLEAN, - StandardTypes.BIGINT, BIGINT, - StandardTypes.INTEGER, INTEGER, - StandardTypes.DOUBLE, DOUBLE, - StandardTypes.VARCHAR, VARCHAR); + private final TypeManager typeManager; - public TestingTypeDeserializer() + public TestingTypeDeserializer(TypeManager typeManager) { super(Type.class); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @Override protected Type _deserialize(String value, DeserializationContext context) { - Type type = types.get(value.toLowerCase(ENGLISH)); + Type type = typeManager.getType(parseTypeSignature(value)); return requireNonNull(type, "Unknown type " + value); } } static { ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider(); - objectMapperProvider.setJsonDeserializers(ImmutableMap., JsonDeserializer>of(Type.class, new TestingTypeDeserializer())); + objectMapperProvider.setJsonDeserializers(ImmutableMap., JsonDeserializer>of(Type.class, new TestingTypeDeserializer(new TypeRegistry()))); JsonCodecFactory codecFactory = new JsonCodecFactory(objectMapperProvider); CATALOG_CODEC = codecFactory.mapJsonCodec(String.class, listJsonCodec(PinotTable.class)); TABLE_CODEC = codecFactory.jsonCodec(PinotTable.class); diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourcePql.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourcePql.java index dff5adc2878c1..c74eac48ef529 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourcePql.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourcePql.java @@ -33,6 +33,7 @@ import java.util.Optional; import java.util.Set; +import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -173,7 +174,19 @@ public static Object[][] pqlResponsesProvider() ImmutableList.of(bigint("activeTrips"), bigint("numDrivers"), varchar("region"), bigint("rowtime"), secondsSinceEpoch, fraction("utilization"), bigint("utilizedDrivers"), bigint("vehicleViewId"), bigint("windowEnd"), bigint("windowStart")), ImmutableList.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), ImmutableList.of(bigint("activeTrips"), bigint("numDrivers"), varchar("region"), bigint("rowtime"), secondsSinceEpoch, fraction("utilization"), bigint("utilizedDrivers"), bigint("vehicleViewId"), bigint("windowEnd"), bigint("windowStart")), - Optional.empty()} + Optional.empty()}, + {"SELECT tag_value FROM eats_job_state LIMIT 10", + "{\"selectionResults\":{\"columns\":[\"tag_value\"],\"results\":[[[\"tag1\"]],[[\"tag1\", \"tag2\"]],[[\"tag2\", \"tag3\"]],[[\"null\"]],[[\"tag4\", \"tag2\", \"tag1\", \"tag5\"]],[[\"tag2\"]],[[\"null\"]],[[\"null\"]],[[\"tag1\", \"tag6\"]],[[\"null\"]]]},\"exceptions\":[],\"numServersQueried\":7,\"numServersResponded\":7,\"numDocsScanned\":380,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":760,\"totalDocs\":55988817,\"numGroupsLimitReached\":false,\"timeUsedMs\":2,\"segmentStatistics\":[],\"traceInfo\":{}}", + ImmutableList.of(array(VARCHAR, "tag_value")), + ImmutableList.of(0), + ImmutableList.of(array(VARCHAR, "tag_value")), + Optional.empty()}, + {"SELECT num_values FROM eats_job_state LIMIT 10", + "{\"selectionResults\":{\"columns\":[\"tag_value\"],\"results\":[[[\"123\"]],[[\"456\", \"567\"]],[[\"2345\", \"8907\"]],[[\"0\"]],[[\"123\", \"1234\", \"987\", \"1678\"]],[[\"98\"]],[[\"0\"]],[[\"0\"]],[[\"1\", \"0\"]],[[\"0\"]]]},\"exceptions\":[],\"numServersQueried\":7,\"numServersResponded\":7,\"numDocsScanned\":380,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":760,\"totalDocs\":55988817,\"numGroupsLimitReached\":false,\"timeUsedMs\":2,\"segmentStatistics\":[],\"traceInfo\":{}}", + ImmutableList.of(array(BIGINT, "num_values")), + ImmutableList.of(0), + ImmutableList.of(array(BIGINT, "num_values")), + Optional.empty()}, }; } diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourceSql.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourceSql.java index d72be6b1e3f71..f2bb8db99e2e5 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourceSql.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotBrokerPageSourceSql.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static org.testng.Assert.assertEquals; @@ -140,6 +141,18 @@ public static Object[][] sqlResponsesProvider() "{ \"resultTable\": { \"dataSchema\": { \"columnDataTypes\": [\"STRING\", \"STRING\", \"LONG\", \"STRING\", \"STRING\", \"LONG\", \"STRING\", \"LONG\", \"INT\", \"STRING\"], \"columnNames\": [\"event_id\", \"event_name\", \"event_time\", \"group_city\", \"group_country\", \"group_id\", \"group_name\", \"mtime\", \"rsvp_count\", \"venue_name\"] }, \"rows\": [ [\"271449847\", \"☀️Boat trip for expats ☀️Summer is back on Saturday 27☀️\", 1593253800000, \"Amsterdam\", \"nl\", 22032818, \"AIC: Amsterdam International Community for expats\", 1592908606814, 1, \"Boat Trip-Pick-up point \"], [\"271468544\", \"How To Start A Career In Privacy & Data Protection\", 1593266400000, \"Edison\", \"us\", 32769803, \"Cybersecurity Careers\", 1592908608026, 1, \"Online event\"], [\"270836483\", \"Episode #2 Power Platform Pub Quiz - Virtual #PPPQ\", 1594800000000, \"Sydney\", \"au\", 33746135, \"Power Platform Pub Quiz #PPPQ\", 1592908588000, 1, \"Online event\"], [\"271472391\", \"Ben Nevis , Three Sisters , Aonach Eagach (hiking, scrambling and bbq ing)\uD83E\uDD17\", 1594998000000, \"Loxley\", \"gb\", 33286887, \"⭐ \uD83D\uDC10Mountain Pervs\uD83D\uDC12 ⭐\", 1592908609397, 1, \"Online event\"], [\"mksgjrybcjbjc\", \"홍대 펍파티! HONGDAE International Pub Party! Meet local and foreign friends!\", 1593165600000, \"Seoul\", \"kr\", 26806362, \"HONGDAE LANGUAGE EXCHANGE CAFE & PUB: GSM TERRACE HONGDAE\", 1592908609962, 1, \"HONGDAE GSM Terrace\"], [\"271336915\", \"\uD83C\uDF34 Un día en Cadaqués / A day in Cadaqués \uD83D\uDC1F 20€\", 1593324000000, \"Barcelona\", \"es\", 15442262, \"Barcelona Language Exchange\", 1592908610993, 1, \"Barcelona\"], [\"271312704\", \"Create your startup: The Checklist (online Event)\", 1593012600000, \"Dublin\", \"ie\", 33262613, \"Startup advice: Build My Unicorn (Dublin - Online)\", 1592908611199, 1, \"Online event\"], [\"271351670\", \"[5] Planeamiento Ágil (Desarrollo de Software Ágil en 10Pines)\", 1593034200000, \"Buenos Aires\", \"ar\", 2811362, \"Ágiles Argentina\", 1592908611403, 1, \"Online event\"], [\"270213022\", \"MARATÓN VIRTUAL POWER PLATFORM\", 1594623600000, \"Madrid\", \"es\", 19418102, \"Power BI Spain Users Group\", 1592908611966, 1, \"Online event\"], [\"mksgjrybcjbjc\", \"홍대 펍파티! HONGDAE International Pub Party! Meet local and foreign friends!\", 1593165600000, \"Seoul\", \"kr\", 26806362, \"HONGDAE LANGUAGE EXCHANGE CAFE & PUB: GSM TERRACE HONGDAE\", 1592908612111, 1, \"HONGDAE GSM Terrace\"] ] }, \"exceptions\": [], \"numServersQueried\": 1, \"numServersResponded\": 1, \"numSegmentsQueried\": 1, \"numSegmentsProcessed\": 1, \"numSegmentsMatched\": 1, \"numConsumingSegmentsQueried\": 1, \"numDocsScanned\": 10, \"numEntriesScannedInFilter\": 0, \"numEntriesScannedPostFilter\": 100, \"numGroupsLimitReached\": false, \"totalDocs\": 1425, \"timeUsedMs\": 10, \"segmentStatistics\": [], \"traceInfo\": {}, \"minConsumingFreshnessTimeMs\": 1592910063563 }", ImmutableList.of(varchar("event_id"), varchar("event_name"), bigint("event_time"), groupCity, groupCountry, bigint("group_id"), varchar("group_name"), bigint("mtime"), integer("rsvp_count"), varchar("venue_name")), ImmutableList.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + Optional.empty()}, + { + "select event_tags from meetupRsvp", + "{ \"resultTable\": { \"dataSchema\": { \"columnDataTypes\": [\"STRING\"], \"columnNames\": [\"event_tags\"] }, \"rows\": [ [[\"social\"]], [[\"social\", \"corporate\"]], [[\"cinema\", \"corporate\"]], [[\"null\"]], [[\"outdoor\", \"music\", \"large_gathering\", \"expensive\"]], [[\"null\"]] ] }, \"exceptions\": [], \"numServersQueried\": 1, \"numServersResponded\": 1, \"numSegmentsQueried\": 1, \"numSegmentsProcessed\": 1, \"numSegmentsMatched\": 1, \"numConsumingSegmentsQueried\": 1, \"numDocsScanned\": 10, \"numEntriesScannedInFilter\": 0, \"numEntriesScannedPostFilter\": 100, \"numGroupsLimitReached\": false, \"totalDocs\": 1425, \"timeUsedMs\": 10, \"segmentStatistics\": [], \"traceInfo\": {}, \"minConsumingFreshnessTimeMs\": 1592910063563 }", + ImmutableList.of(array(VARCHAR, "event_tags")), + ImmutableList.of(0), + Optional.empty()}, + { + "select tag_nums from meetupRsvp", + "{ \"resultTable\": { \"dataSchema\": { \"columnDataTypes\": [\"BIGINT\"], \"columnNames\": [\"tag_nums\"] }, \"rows\": [ [[\"1\"]], [[\"1\", \"2\"]], [[\"3\", \"2\"]], [[\"0\"]], [[\"4\", \"5\", \"6\", \"7\"]], [[\"0\"]] ] }, \"exceptions\": [], \"numServersQueried\": 1, \"numServersResponded\": 1, \"numSegmentsQueried\": 1, \"numSegmentsProcessed\": 1, \"numSegmentsMatched\": 1, \"numConsumingSegmentsQueried\": 1, \"numDocsScanned\": 10, \"numEntriesScannedInFilter\": 0, \"numEntriesScannedPostFilter\": 100, \"numGroupsLimitReached\": false, \"totalDocs\": 1425, \"timeUsedMs\": 10, \"segmentStatistics\": [], \"traceInfo\": {}, \"minConsumingFreshnessTimeMs\": 1592910063563 }", + ImmutableList.of(array(BIGINT, "tag_nums")), + ImmutableList.of(0), Optional.empty()} }; } diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnHandle.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnHandle.java index 47a2e4f26d061..6ea815348a9fd 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnHandle.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnHandle.java @@ -14,6 +14,7 @@ package com.facebook.presto.pinot; import com.facebook.airlift.testing.EquivalenceTester; +import com.facebook.presto.common.type.ArrayType; import org.testng.annotations.Test; import static com.facebook.presto.common.type.BigintType.BIGINT; @@ -26,6 +27,7 @@ public class TestPinotColumnHandle { private final PinotColumnHandle columnHandle = new PinotColumnHandle("columnName", VARCHAR, REGULAR); + private final PinotColumnHandle arrayColumnHandle = new PinotColumnHandle("arrayColumn", new ArrayType(VARCHAR), REGULAR); @Test public void testJsonRoundTrip() @@ -35,6 +37,14 @@ public void testJsonRoundTrip() assertEquals(copy, columnHandle); } + @Test + public void testJsonRoundTripWithArrays() + { + String json = COLUMN_CODEC.toJson(arrayColumnHandle); + PinotColumnHandle copy = COLUMN_CODEC.fromJson(json); + assertEquals(copy, arrayColumnHandle); + } + @Test public void testEquivalence() { diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java index 1303fcf4515fa..771942fc04665 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java @@ -13,8 +13,8 @@ */ package com.facebook.presto.pinot; +import com.facebook.presto.common.type.ArrayType; import com.facebook.presto.common.type.DateType; -import com.facebook.presto.common.type.DoubleType; import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.Type; import com.google.common.collect.ImmutableMap; @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.common.type.VarcharType.VARCHAR; @@ -72,22 +73,22 @@ public void testParsePinotSchemaToPinotColumns() Map expectedTypeMap = new ImmutableMap.Builder() .put("singleValueIntDimension", INTEGER) .put("singleValueLongDimension", BIGINT) - .put("singleValueFloatDimension", DoubleType.DOUBLE) - .put("singleValueDoubleDimension", DoubleType.DOUBLE) + .put("singleValueFloatDimension", DOUBLE) + .put("singleValueDoubleDimension", DOUBLE) .put("singleValueBytesDimension", VARBINARY) .put("singleValueBooleanDimension", VARCHAR) .put("singleValueStringDimension", VARCHAR) - .put("multiValueIntDimension", VARCHAR) - .put("multiValueLongDimension", VARCHAR) - .put("multiValueFloatDimension", VARCHAR) - .put("multiValueDoubleDimension", VARCHAR) - .put("multiValueBytesDimension", VARCHAR) - .put("multiValueBooleanDimension", VARCHAR) - .put("multiValueStringDimension", VARCHAR) + .put("multiValueIntDimension", new ArrayType(INTEGER)) + .put("multiValueLongDimension", new ArrayType(BIGINT)) + .put("multiValueFloatDimension", new ArrayType(DOUBLE)) + .put("multiValueDoubleDimension", new ArrayType(DOUBLE)) + .put("multiValueBytesDimension", new ArrayType(VARBINARY)) + .put("multiValueBooleanDimension", new ArrayType(VARCHAR)) + .put("multiValueStringDimension", new ArrayType(VARCHAR)) .put("intMetric", INTEGER) .put("longMetric", BIGINT) - .put("floatMetric", DoubleType.DOUBLE) - .put("doubleMetric", DoubleType.DOUBLE) + .put("floatMetric", DOUBLE) + .put("doubleMetric", DOUBLE) .put("bytesMetric", VARBINARY) .put("daysSinceEpoch", DateType.DATE) .put("epochDayDateTime", DateType.DATE) diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java index 19f79f3bbb11f..ec1aed0dd1180 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java @@ -17,6 +17,7 @@ import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.block.BlockEncodingManager; import com.facebook.presto.common.block.SortOrder; +import com.facebook.presto.common.type.ArrayType; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.metadata.FunctionManager; @@ -271,4 +272,9 @@ protected static PinotColumnHandle varchar(String name) { return new PinotColumnHandle(name, VARCHAR, PinotColumnHandle.PinotColumnType.REGULAR); } + + protected static PinotColumnHandle array(Type type, String name) + { + return new PinotColumnHandle(name, new ArrayType(type), PinotColumnHandle.PinotColumnType.REGULAR); + } } diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java index 78df5f3498284..d01551d9f1b68 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java @@ -15,7 +15,11 @@ import com.facebook.presto.common.Page; import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.VariableWidthBlock; +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.IntegerType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VariableWidthType; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.testing.TestingConnectorSession; import com.facebook.presto.testing.assertions.Assert; @@ -397,4 +401,54 @@ public void testAllDataTypes() } } } + + @Test + public void testMultivaluedType() + { + String[] columnNames = {"col1", "col2"}; + DataSchema.ColumnDataType[] columnDataTypes = {DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.STRING_ARRAY}; + DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); + SimpleDataTable dataTable = new SimpleDataTable(1, dataSchema); + + int numRows = 1; + String[] stringArray = {"stringVal1", "stringVal2"}; + int[] intArray = {10, 34, 67}; + dataTable.set(0, 0, intArray); + dataTable.set(0, 1, stringArray); + + PinotSessionProperties pinotSessionProperties = new PinotSessionProperties(pinotConfig); + ConnectorSession session = new TestingConnectorSession(pinotSessionProperties.getSessionProperties()); + PinotScatterGatherQueryClient mockPinotQueryClient = new MockPinotScatterGatherQueryClient(new PinotScatterGatherQueryClient.Config( + pinotConfig.getIdleTimeout().toMillis(), + pinotConfig.getThreadPoolSize(), + pinotConfig.getMinConnectionsPerServer(), + pinotConfig.getMaxBacklogPerServer(), + pinotConfig.getMaxConnectionsPerServer()), ImmutableList.of(dataTable)); + + List pinotColumnHandles = ImmutableList.of( + new PinotColumnHandle(columnNames[0], PinotColumnUtils.getPrestoTypeFromPinotType(getFieldSpec(columnNames[0], columnDataTypes[0]), false, false), PinotColumnHandle.PinotColumnType.REGULAR), + new PinotColumnHandle(columnNames[1], PinotColumnUtils.getPrestoTypeFromPinotType(getFieldSpec(columnNames[1], columnDataTypes[1]), false, false), PinotColumnHandle.PinotColumnType.REGULAR)); + PinotSplit mockPinotSplit = new PinotSplit(pinotConnectorId.toString(), PinotSplit.SplitType.SEGMENT, pinotColumnHandles, Optional.empty(), Optional.of("blah"), ImmutableList.of("seg"), Optional.of("host")); + PinotSegmentPageSource pinotSegmentPageSource = new PinotSegmentPageSource(session, pinotConfig, mockPinotQueryClient, mockPinotSplit, pinotColumnHandles); + + Page page = requireNonNull(pinotSegmentPageSource.getNextPage(), "Expected a valid page"); + + for (int i = 0; i < columnDataTypes.length; i++) { + Block block = page.getBlock(i); + Type type = PinotColumnUtils.getPrestoTypeFromPinotType(getFieldSpec(columnNames[i], columnDataTypes[i]), false, false); + Assert.assertTrue(type instanceof ArrayType, "presto type should be array"); + if (((ArrayType) type).getElementType() instanceof IntegerType) { + Assert.assertTrue(block.getBlock(0).getInt(0) == 10, "Array element not matching"); + Assert.assertTrue(block.getBlock(0).getInt(1) == 34, "Array element not matching"); + Assert.assertTrue(block.getBlock(0).getInt(2) == 67, "Array element not matching"); + } + else if (((ArrayType) type).getElementType() instanceof VariableWidthType) { + Type type1 = ((ArrayType) type).getElementType(); + Assert.assertTrue(block.getBlock(0) instanceof VariableWidthBlock); + VariableWidthBlock variableWidthBlock = (VariableWidthBlock) block.getBlock(0); + Assert.assertTrue("stringVal1".equals(new String(variableWidthBlock.getSlice(0, 0, variableWidthBlock.getSliceLength(0)).getBytes())), "Array element not matching"); + Assert.assertTrue("stringVal2".equals(new String(variableWidthBlock.getSlice(1, 0, variableWidthBlock.getSliceLength(1)).getBytes())), "Array element not matching"); + } + } + } }