Kafka Connector AvroEncoder doesn't use "mappings" #5791
Description
From the docs at https://prestosql.io/docs/current/connector/kafka.html#avro-encoder
mapping - slash-separated list of field names to select a field from the Avro schema. If the field specified in mapping does not exist in the original Avro schema, then a write operation fails.
This means that given a table definition like:
{
"tableName": "write_all_datatypes_avro",
"schemaName": "product_tests",
"topicName": "write_all_datatypes_avro",
"message": {
"dataFormat": "avro",
"dataSchema": "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes_avro_schema.avsc",
"fields": [
{
"name": "c_varchar",
"type": "VARCHAR",
"mapping": "a_varchar"
},
{
"name": "c_bigint",
"type": "BIGINT",
"mapping": "a_bigint"
},
{
"name": "c_double",
"type": "DOUBLE",
"mapping": "a_double"
},
{
"name": "c_boolean",
"type": "BOOLEAN",
"mapping": "a_boolean"
}
]
},
"key": {
"dataFormat": "raw",
"fields": []
}
}
and an INSERT query like INSERT INTO product_tests.write_all_datatypes_avro (c_varchar) VALUES ('foobar')
should work since the c_varchar
will be written to the Avro record under the field a_varchar
.
But the query instead fails with an error:
2020-11-03 15:50:33 INFO: FAILURE / io.prestosql.tests.kafka.TestKafkaAvroWritesSmokeTest.testInsertPrimitiveDataType (Groups: profile_specific_tests, kafka) took 1.6 seconds
2020-11-03 15:50:33 SEVERE: Failure cause:
io.prestosql.tempto.query.QueryExecutionException: java.sql.SQLException: Query failed (#20201103_100533_00001_fh59i): Not a valid schema field: c_varchar
at io.prestosql.tempto.query.JdbcQueryExecutor.execute(JdbcQueryExecutor.java:114)
at io.prestosql.tempto.query.JdbcQueryExecutor.executeQuery(JdbcQueryExecutor.java:82)
at io.prestosql.tempto.query.QueryExecutor.query(QueryExecutor.java:57)
at io.prestosql.tests.kafka.TestKafkaAvroWritesSmokeTest.testInsertPrimitiveDataType(TestKafkaAvroWritesSmokeTest.java:70)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: Query failed (#20201103_100533_00001_fh59i): Not a valid schema field: c_varchar
at io.prestosql.jdbc.AbstractPrestoResultSet.resultsException(AbstractPrestoResultSet.java:1778)
at io.prestosql.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:218)
at io.prestosql.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:178)
at io.prestosql.jdbc.$internal.guava.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
at io.prestosql.jdbc.$internal.guava.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at io.prestosql.jdbc.PrestoResultSet$AsyncIterator.lambda$new$0(PrestoResultSet.java:124)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
... 3 more
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: c_varchar
at org.apache.avro.generic.GenericData$Record.put(GenericData.java:241)
at io.prestosql.plugin.kafka.encoder.avro.AvroRowEncoder.appendString(AvroRowEncoder.java:127)
at io.prestosql.plugin.kafka.encoder.AbstractRowEncoder.appendColumnValue(AbstractRowEncoder.java:101)
at io.prestosql.plugin.kafka.KafkaPageSink.appendPage(KafkaPageSink.java:119)
at io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSink.appendPage(ClassLoaderSafeConnectorPageSink.java:69)
at io.prestosql.operator.TableWriterOperator.addInput(TableWriterOperator.java:257)
at io.prestosql.operator.Driver.processInternal(Driver.java:384)
at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
at io.prestosql.operator.Driver.processFor(Driver.java:276)
at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1076)
at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
at io.prestosql.$gen.Presto_345_79_gc367874____20201103_100440_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
The problematic code is in AvroRowEncoder since it uses getName
instead of getMapping
here https://github.com/prestosql/presto/blob/22717c61419c851b1c3814b60a3453b22d4a45ef/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/avro/AvroRowEncoder.java#L91.
Activity