Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Pinot Connector Libraries to 0.8.0 #9098

Merged
merged 5 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove PinotColumn
Remove PinotColumn and moved used methods to PinotColumnHandle instead
  • Loading branch information
elonazoulay committed Nov 4, 2021
commit ad0a5d78f39e1a5f14ab2817bbfcc72757864364

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,26 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static java.util.Objects.requireNonNull;

public class PinotColumnHandle
Expand All @@ -47,6 +62,57 @@ public PinotColumnHandle(
this.returnNullOnEmptyGroup = returnNullOnEmptyGroup;
}

public static List<PinotColumnHandle> getPinotColumnsForPinotSchema(Schema pinotTableSchema)
{
return pinotTableSchema.getColumnNames().stream()
.filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL
.map(columnName -> new PinotColumnHandle(columnName, getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName))))
.collect(toImmutableList());
}

public static Type getTrinoTypeFromPinotType(FieldSpec field)
{
Type type = getTrinoTypeFromPinotType(field.getDataType());
if (field.isSingleValueField()) {
return type;
}
else {
return new ArrayType(type);
}
}

public static Type getTrinoTypeFromPinotType(TransformResultMetadata transformResultMetadata)
{
Type type = getTrinoTypeFromPinotType(transformResultMetadata.getDataType());
if (transformResultMetadata.isSingleValue()) {
return type;
}
return new ArrayType(type);
}

public static Type getTrinoTypeFromPinotType(FieldSpec.DataType dataType)
{
switch (dataType) {
case BOOLEAN:
return BooleanType.BOOLEAN;
case FLOAT:
return RealType.REAL;
case DOUBLE:
return DoubleType.DOUBLE;
case INT:
return IntegerType.INTEGER;
case LONG:
return BigintType.BIGINT;
case STRING:
return VarcharType.VARCHAR;
case BYTES:
return VarbinaryType.VARBINARY;
default:
break;
}
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType);
}

@JsonProperty
public String getColumnName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import static com.google.common.cache.CacheLoader.asyncReloading;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.pinot.PinotColumn.getPinotColumnsForPinotSchema;
import static io.trino.plugin.pinot.PinotColumnHandle.getPinotColumnsForPinotSchema;
import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -88,7 +88,7 @@ public class PinotMetadata
private static final String SCHEMA_NAME = "default";
private static final String PINOT_COLUMN_NAME_PROPERTY = "pinotColumnName";

private final LoadingCache<String, List<PinotColumn>> pinotTableColumnCache;
private final LoadingCache<String, List<PinotColumnHandle>> pinotTableColumnCache;
private final LoadingCache<Object, List<String>> allTablesCache;
private final int maxRowsPerBrokerQuery;
private final AggregateFunctionRewriter aggregateFunctionRewriter;
Expand All @@ -111,7 +111,7 @@ public PinotMetadata(
.build(asyncReloading(new CacheLoader<>()
{
@Override
public List<PinotColumn> load(String tableName)
public List<PinotColumnHandle> load(String tableName)
throws Exception
{
Schema tablePinotSchema = pinotClient.getTableSchema(tableName);
Expand Down Expand Up @@ -442,7 +442,7 @@ public boolean usesLegacyTableLayouts()
}

@VisibleForTesting
public List<PinotColumn> getPinotColumns(String tableName)
public List<PinotColumnHandle> getPinotColumns(String tableName)
{
String pinotTableName = getPinotTableNameFromTrinoTableName(tableName);
return getFromCache(pinotTableColumnCache, pinotTableName);
Expand Down Expand Up @@ -518,19 +518,19 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)

private List<ColumnMetadata> getColumnsMetadata(String tableName)
{
List<PinotColumn> columns = getPinotColumns(tableName);
List<PinotColumnHandle> columns = getPinotColumns(tableName);
return columns.stream()
.map(PinotMetadata::createPinotColumnMetadata)
.collect(toImmutableList());
}

private static ColumnMetadata createPinotColumnMetadata(PinotColumn pinotColumn)
private static ColumnMetadata createPinotColumnMetadata(PinotColumnHandle pinotColumn)
{
return ColumnMetadata.builder()
.setName(pinotColumn.getName().toLowerCase(ENGLISH))
.setType(pinotColumn.getType())
.setName(pinotColumn.getColumnName().toLowerCase(ENGLISH))
.setType(pinotColumn.getDataType())
.setProperties(ImmutableMap.<String, Object>builder()
.put(PINOT_COLUMN_NAME_PROPERTY, pinotColumn.getName())
.put(PINOT_COLUMN_NAME_PROPERTY, pinotColumn.getColumnName())
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TestPinotQueryBase
protected List<String> getColumnNames(String table)
{
return pinotMetadata.getPinotColumns(table).stream()
.map(PinotColumn::getName)
.map(PinotColumnHandle::getColumnName)
.collect(toImmutableList());
}

Expand Down