-
Notifications
You must be signed in to change notification settings - Fork 3.4k
[Connector]Add plugin for Apache Pulsar. #8020
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,13 +25,16 @@ | |
| import io.trino.spi.type.ArrayType; | ||
|
||
| import io.trino.spi.type.BigintType; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it be possilbe to undo changes in avro column decoder? And instead of inheritance to use composition so whenever you want different behaviour you can override it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I saw that Kafka plugin writes its own |
||
| import io.trino.spi.type.BooleanType; | ||
| import io.trino.spi.type.DateType; | ||
| import io.trino.spi.type.DoubleType; | ||
| import io.trino.spi.type.IntegerType; | ||
| import io.trino.spi.type.MapType; | ||
| import io.trino.spi.type.RealType; | ||
| import io.trino.spi.type.RowType; | ||
| import io.trino.spi.type.RowType.Field; | ||
| import io.trino.spi.type.SmallintType; | ||
| import io.trino.spi.type.TimeType; | ||
| import io.trino.spi.type.TimestampType; | ||
| import io.trino.spi.type.TinyintType; | ||
| import io.trino.spi.type.Type; | ||
| import io.trino.spi.type.VarbinaryType; | ||
|
|
@@ -50,6 +53,8 @@ | |
| import static io.airlift.slice.Slices.utf8Slice; | ||
| import static io.trino.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED; | ||
| import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; | ||
| import static io.trino.spi.type.TimeType.TIME_MILLIS; | ||
| import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; | ||
| import static io.trino.spi.type.Varchars.truncateToLength; | ||
| import static java.lang.Float.floatToIntBits; | ||
| import static java.lang.String.format; | ||
|
|
@@ -65,6 +70,9 @@ public class AvroColumnDecoder | |
| BigintType.BIGINT, | ||
| RealType.REAL, | ||
| DoubleType.DOUBLE, | ||
| TIMESTAMP_MILLIS, | ||
| DateType.DATE, | ||
| TIME_MILLIS, | ||
| VarbinaryType.VARBINARY); | ||
|
|
||
| private final Type columnType; | ||
|
|
@@ -77,7 +85,6 @@ public AvroColumnDecoder(DecoderColumnHandle columnHandle) | |
| requireNonNull(columnHandle, "columnHandle is null"); | ||
| this.columnType = columnHandle.getType(); | ||
| this.columnMapping = columnHandle.getMapping(); | ||
|
|
||
| this.columnName = columnHandle.getName(); | ||
| checkArgument(!columnHandle.isInternal(), "unexpected internal column '%s'", columnName); | ||
| checkArgument(columnHandle.getFormatHint() == null, "unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnName); | ||
|
|
@@ -91,7 +98,7 @@ public AvroColumnDecoder(DecoderColumnHandle columnHandle) | |
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not to add support for short timestamp, date, time and real support to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The blocker is conversion from shaded version of avro record to original avro record so pulsar connector can utilize There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I like this idea. @kokosing is there any existing issue? Or I can try to drive one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI #13069. BTW pulsar-sql supports Decimal type in: It's possible to add them to AvroColumnDecoder later. Also I think that this is not a blocker to add pulsar plugin as we can duplicate code a bit for Pulsar decoders - they have their own logics. |
||
| } | ||
|
|
||
| private boolean isSupportedType(Type type) | ||
| protected boolean isSupportedType(Type type) | ||
| { | ||
| if (isSupportedPrimitive(type)) { | ||
| return true; | ||
|
|
@@ -120,7 +127,7 @@ private boolean isSupportedType(Type type) | |
| return false; | ||
| } | ||
|
|
||
| private boolean isSupportedPrimitive(Type type) | ||
| protected boolean isSupportedPrimitive(Type type) | ||
| { | ||
| return type instanceof VarcharType || SUPPORTED_PRIMITIVE_TYPES.contains(type); | ||
| } | ||
|
|
@@ -187,6 +194,10 @@ public long getLong() | |
| if (value instanceof Long || value instanceof Integer) { | ||
| return ((Number) value).longValue(); | ||
| } | ||
|
|
||
| if (columnType instanceof RealType) { | ||
| return floatToIntBits((Float) value); | ||
| } | ||
| throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), columnType, columnName)); | ||
| } | ||
|
|
||
|
|
@@ -203,7 +214,7 @@ public Block getBlock() | |
| } | ||
| } | ||
|
|
||
| private static Slice getSlice(Object value, Type type, String columnName) | ||
| protected static Slice getSlice(Object value, Type type, String columnName) | ||
| { | ||
| if (type instanceof VarcharType && (value instanceof CharSequence || value instanceof GenericEnumSymbol)) { | ||
| return truncateToLength(utf8Slice(value.toString()), type); | ||
|
|
@@ -221,7 +232,7 @@ else if (value instanceof GenericFixed) { | |
| throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName)); | ||
| } | ||
|
|
||
| private static Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName) | ||
| protected static Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName) | ||
| { | ||
| if (type instanceof ArrayType) { | ||
| return serializeList(builder, value, type, columnName); | ||
|
|
@@ -236,7 +247,7 @@ private static Block serializeObject(BlockBuilder builder, Object value, Type ty | |
| return null; | ||
| } | ||
|
|
||
| private static Block serializeList(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) | ||
| protected static Block serializeList(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) | ||
| { | ||
| if (value == null) { | ||
| checkState(parentBlockBuilder != null, "parentBlockBuilder is null"); | ||
|
|
@@ -258,7 +269,7 @@ private static Block serializeList(BlockBuilder parentBlockBuilder, Object value | |
| return blockBuilder.build(); | ||
| } | ||
|
|
||
| private static void serializePrimitive(BlockBuilder blockBuilder, Object value, Type type, String columnName) | ||
| protected static void serializePrimitive(BlockBuilder blockBuilder, Object value, Type type, String columnName) | ||
| { | ||
| requireNonNull(blockBuilder, "blockBuilder is null"); | ||
|
|
||
|
|
@@ -292,10 +303,15 @@ private static void serializePrimitive(BlockBuilder blockBuilder, Object value, | |
| return; | ||
| } | ||
|
|
||
| if (type instanceof TimeType || type instanceof DateType || type instanceof TimestampType) { | ||
| type.writeLong(blockBuilder, (Long) value); | ||
|
||
| return; | ||
| } | ||
|
|
||
| throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName)); | ||
| } | ||
|
|
||
| private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) | ||
| protected static Block serializeMap(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) | ||
| { | ||
| if (value == null) { | ||
| checkState(parentBlockBuilder != null, "parentBlockBuilder is null"); | ||
|
|
@@ -331,7 +347,7 @@ private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value, | |
| return null; | ||
| } | ||
|
|
||
| private static Block serializeRow(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) | ||
| protected static Block serializeRow(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) | ||
| { | ||
| if (value == null) { | ||
| checkState(parentBlockBuilder != null, "parent block builder is null"); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,197 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
| <modelVersion>4.0.0</modelVersion> | ||
|
|
||
| <parent> | ||
| <artifactId>trino-root</artifactId> | ||
| <groupId>io.trino</groupId> | ||
| <version>362-SNAPSHOT</version> | ||
| <relativePath>../../pom.xml</relativePath> | ||
| </parent> | ||
|
|
||
| <artifactId>trino-pulsar</artifactId> | ||
| <description>Trino - Pulsar Connector</description> | ||
| <packaging>trino-plugin</packaging> | ||
|
|
||
| <properties> | ||
| <air.main.basedir>${project.parent.basedir}</air.main.basedir> | ||
| <pulsar.version>2.8.0</pulsar.version> | ||
| <bookkeeer.version>4.14.1</bookkeeer.version> | ||
| </properties> | ||
|
|
||
| <dependencies> | ||
| <dependency> | ||
| <groupId>io.trino</groupId> | ||
| <artifactId>trino-plugin-toolkit</artifactId> | ||
| <exclusions> | ||
| <exclusion> | ||
| <groupId>jakarta.ws.rs</groupId> | ||
| <artifactId>jakarta.ws.rs-api</artifactId> | ||
| </exclusion> | ||
| </exclusions> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.trino</groupId> | ||
| <artifactId>trino-record-decoder</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>bootstrap</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>configuration</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>json</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>log</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>log-manager</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>stats</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>units</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.fasterxml.jackson.core</groupId> | ||
| <artifactId>jackson-core</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.fasterxml.jackson.core</groupId> | ||
| <artifactId>jackson-databind</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.guava</groupId> | ||
| <artifactId>guava</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.inject</groupId> | ||
| <artifactId>guice</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>javax.inject</groupId> | ||
| <artifactId>javax.inject</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>javax.validation</groupId> | ||
| <artifactId>validation-api</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>javax.ws.rs</groupId> | ||
| <artifactId>javax.ws.rs-api</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.apache.avro</groupId> | ||
| <artifactId>avro</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.apache.pulsar</groupId> | ||
| <artifactId>pulsar-client-mledger</artifactId> | ||
| <version>${pulsar.version}</version> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.jctools</groupId> | ||
| <artifactId>jctools-core</artifactId> | ||
| <version>2.1.2</version> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.weakref</groupId> | ||
| <artifactId>jmxutils</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.trino</groupId> | ||
| <artifactId>trino-spi</artifactId> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>slice</artifactId> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.fasterxml.jackson.core</groupId> | ||
| <artifactId>jackson-annotations</artifactId> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.openjdk.jol</groupId> | ||
| <artifactId>jol-core</artifactId> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.trino</groupId> | ||
| <artifactId>trino-main</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.trino</groupId> | ||
| <artifactId>trino-testing</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.trino</groupId> | ||
| <artifactId>trino-tpch</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.airlift</groupId> | ||
| <artifactId>testing</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.assertj</groupId> | ||
| <artifactId>assertj-core</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.testcontainers</groupId> | ||
| <artifactId>testcontainers</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.testng</groupId> | ||
| <artifactId>testng</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| </dependencies> | ||
| </project> |
Uh oh!
There was an error while loading. Please reload this page.