Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ jobs:
!:trino-hive,!:trino-orc,!:trino-parquet,
!:trino-mongodb,!:trino-kafka,!:trino-elasticsearch,
!:trino-redis,
!:trino-pulsar,
!:trino-sqlserver,!:trino-postgresql,!:trino-mysql,!:trino-memsql,
!:trino-oracle,
!:trino-kudu,
Expand All @@ -258,6 +259,7 @@ jobs:
- ":trino-mongodb,:trino-kafka,:trino-elasticsearch"
- ":trino-elasticsearch -P test-stats"
- ":trino-redis"
- ":trino-pulsar"
- ":trino-sqlserver,:trino-postgresql,:trino-mysql"
- ":trino-oracle"
- ":trino-kudu"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import io.trino.spi.type.ArrayType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any changes to existing code should go as separate commit. See https://github.com/trinodb/trino/blob/master/DEVELOPMENT.md#git-merge-strategy

Also is it possible to extract a separate PR for them? How does it affect existing connectors that are using these? Like Kafka.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually as we still have to duplicate some code in PulsarAvroColumnDecoder, I'll just keep these additional types in PulsarAvroColumnDecoder so existing connector won't be affected. Will only make method protected so they can be override.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create separate pr once change finalized.

import io.trino.spi.type.BigintType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possilbe to undo changes in avro column decoder? And instead of inheritance to use composition so whenever you want different behaviour you can override it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that Kafka plugin writes its own AvroRowEncoder and actually Pulsar writes its own AvroColumnDecoder in pulsar-sql now. The major concern can be since PulsarAvroColumnDecoder is derive from AvroColumnDecoder, we may want to keep their logic synced. All significant fields and methods in AvroColumnDecoder seems private though.

import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
Expand All @@ -50,6 +53,8 @@
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.type.TimeType.TIME_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.Varchars.truncateToLength;
import static java.lang.Float.floatToIntBits;
import static java.lang.String.format;
Expand All @@ -65,6 +70,9 @@ public class AvroColumnDecoder
BigintType.BIGINT,
RealType.REAL,
DoubleType.DOUBLE,
TIMESTAMP_MILLIS,
DateType.DATE,
TIME_MILLIS,
VarbinaryType.VARBINARY);

private final Type columnType;
Expand All @@ -77,7 +85,6 @@ public AvroColumnDecoder(DecoderColumnHandle columnHandle)
requireNonNull(columnHandle, "columnHandle is null");
this.columnType = columnHandle.getType();
this.columnMapping = columnHandle.getMapping();

this.columnName = columnHandle.getName();
checkArgument(!columnHandle.isInternal(), "unexpected internal column '%s'", columnName);
checkArgument(columnHandle.getFormatHint() == null, "unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnName);
Expand All @@ -91,7 +98,7 @@ public AvroColumnDecoder(DecoderColumnHandle columnHandle)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to add support for short timestamp, date, time and real support to AvroColumnDecoder? Then we should be able to drop PulsarAvroColumnDecoder. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocker is conversion from shaded version of avro record to original avro record so pulsar connector can utilize AvroColumnDecoder, as the whole connector use shaded version of deps.
I tried again and was able to make the conversion this time.
I also move the support of additional type to AvroColumnDecoder, but I'm not sure how will this affect existing connectors? My understanding is it shouldn't be problem as it's "additional" suuport.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to add support for short timestamp, date, time and real support to AvroColumnDecoder? Then we should be able to drop PulsarAvroColumnDecoder. WDYT?

I like this idea. @kokosing is there any existing issue? Or I can try to drive one.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI #13069.

BTW pulsar-sql supports Decimal type in:

It's possible to add them to AvroColumnDecoder later.

Also I think that this is not a blocker to add pulsar plugin as we can duplicate code a bit for Pulsar decoders - they have their own logics.

}

private boolean isSupportedType(Type type)
protected boolean isSupportedType(Type type)
{
if (isSupportedPrimitive(type)) {
return true;
Expand Down Expand Up @@ -120,7 +127,7 @@ private boolean isSupportedType(Type type)
return false;
}

private boolean isSupportedPrimitive(Type type)
protected boolean isSupportedPrimitive(Type type)
{
return type instanceof VarcharType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
}
Expand Down Expand Up @@ -187,6 +194,10 @@ public long getLong()
if (value instanceof Long || value instanceof Integer) {
return ((Number) value).longValue();
}

if (columnType instanceof RealType) {
return floatToIntBits((Float) value);
}
throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), columnType, columnName));
}

Expand All @@ -203,7 +214,7 @@ public Block getBlock()
}
}

private static Slice getSlice(Object value, Type type, String columnName)
protected static Slice getSlice(Object value, Type type, String columnName)
{
if (type instanceof VarcharType && (value instanceof CharSequence || value instanceof GenericEnumSymbol)) {
return truncateToLength(utf8Slice(value.toString()), type);
Expand All @@ -221,7 +232,7 @@ else if (value instanceof GenericFixed) {
throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName));
}

private static Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName)
protected static Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName)
{
if (type instanceof ArrayType) {
return serializeList(builder, value, type, columnName);
Expand All @@ -236,7 +247,7 @@ private static Block serializeObject(BlockBuilder builder, Object value, Type ty
return null;
}

private static Block serializeList(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
protected static Block serializeList(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
{
if (value == null) {
checkState(parentBlockBuilder != null, "parentBlockBuilder is null");
Expand All @@ -258,7 +269,7 @@ private static Block serializeList(BlockBuilder parentBlockBuilder, Object value
return blockBuilder.build();
}

private static void serializePrimitive(BlockBuilder blockBuilder, Object value, Type type, String columnName)
protected static void serializePrimitive(BlockBuilder blockBuilder, Object value, Type type, String columnName)
{
requireNonNull(blockBuilder, "blockBuilder is null");

Expand Down Expand Up @@ -292,10 +303,15 @@ private static void serializePrimitive(BlockBuilder blockBuilder, Object value,
return;
}

if (type instanceof TimeType || type instanceof DateType || type instanceof TimestampType) {
type.writeLong(blockBuilder, (Long) value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not every timestamp can be a long. That depends on precision. Please make sure that precision allows to use long.

Copy link
Contributor Author

@MarvinCai MarvinCai Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change will be removed from AvroColumnDecoder, while just for Pulsar connector it should be safe

return;
}

throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName));
}

private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
protected static Block serializeMap(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
{
if (value == null) {
checkState(parentBlockBuilder != null, "parentBlockBuilder is null");
Expand Down Expand Up @@ -331,7 +347,7 @@ private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value,
return null;
}

private static Block serializeRow(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
protected static Block serializeRow(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
{
if (value == null) {
checkState(parentBlockBuilder != null, "parent block builder is null");
Expand Down
197 changes: 197 additions & 0 deletions plugin/trino-pulsar/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>trino-root</artifactId>
<groupId>io.trino</groupId>
<version>362-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>trino-pulsar</artifactId>
<description>Trino - Pulsar Connector</description>
<packaging>trino-plugin</packaging>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<pulsar.version>2.8.0</pulsar.version>
<bookkeeer.version>4.14.1</bookkeeer.version>
</properties>

<dependencies>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-record-decoder</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>stats</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-mledger</artifactId>
<version>${pulsar.version}</version>
</dependency>

<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>2.1.2</version>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpch</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading