Skip to content

Kafka Connector AvroEncoder doesn't use "mappings" #5791

Closed
@hashhar

Description

From the docs at https://prestosql.io/docs/current/connector/kafka.html#avro-encoder

mapping - slash-separated list of field names to select a field from the Avro schema. If the field specified in mapping does not exist in the original Avro schema, then a write operation fails.

This means that given a table definition like:

{
  "tableName": "write_all_datatypes_avro",
  "schemaName": "product_tests",
  "topicName": "write_all_datatypes_avro",
  "message": {
    "dataFormat": "avro",
    "dataSchema": "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes_avro_schema.avsc",
    "fields": [
      {
        "name": "c_varchar",
        "type": "VARCHAR",
        "mapping": "a_varchar"
      },
      {
        "name": "c_bigint",
        "type": "BIGINT",
        "mapping": "a_bigint"
      },
      {
        "name": "c_double",
        "type": "DOUBLE",
        "mapping": "a_double"
      },
      {
        "name": "c_boolean",
        "type": "BOOLEAN",
        "mapping": "a_boolean"
      }
    ]
  },
  "key": {
    "dataFormat": "raw",
    "fields": []
  }
}

and an INSERT query like INSERT INTO product_tests.write_all_datatypes_avro (c_varchar) VALUES ('foobar') should work since the c_varchar will be written to the Avro record under the field a_varchar.

But the query instead fails with an error:

2020-11-03 15:50:33 INFO: FAILURE     /    io.prestosql.tests.kafka.TestKafkaAvroWritesSmokeTest.testInsertPrimitiveDataType (Groups: profile_specific_tests, kafka) took 1.6 seconds
2020-11-03 15:50:33 SEVERE: Failure cause:
io.prestosql.tempto.query.QueryExecutionException: java.sql.SQLException: Query failed (#20201103_100533_00001_fh59i): Not a valid schema field: c_varchar
	at io.prestosql.tempto.query.JdbcQueryExecutor.execute(JdbcQueryExecutor.java:114)
	at io.prestosql.tempto.query.JdbcQueryExecutor.executeQuery(JdbcQueryExecutor.java:82)
	at io.prestosql.tempto.query.QueryExecutor.query(QueryExecutor.java:57)
	at io.prestosql.tests.kafka.TestKafkaAvroWritesSmokeTest.testInsertPrimitiveDataType(TestKafkaAvroWritesSmokeTest.java:70)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: Query failed (#20201103_100533_00001_fh59i): Not a valid schema field: c_varchar
	at io.prestosql.jdbc.AbstractPrestoResultSet.resultsException(AbstractPrestoResultSet.java:1778)
	at io.prestosql.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:218)
	at io.prestosql.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:178)
	at io.prestosql.jdbc.$internal.guava.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
	at io.prestosql.jdbc.$internal.guava.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
	at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
	at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
	at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
	at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
	at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
	at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
	at io.prestosql.jdbc.PrestoResultSet$AsyncIterator.lambda$new$0(PrestoResultSet.java:124)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
	... 3 more
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: c_varchar
	at org.apache.avro.generic.GenericData$Record.put(GenericData.java:241)
	at io.prestosql.plugin.kafka.encoder.avro.AvroRowEncoder.appendString(AvroRowEncoder.java:127)
	at io.prestosql.plugin.kafka.encoder.AbstractRowEncoder.appendColumnValue(AbstractRowEncoder.java:101)
	at io.prestosql.plugin.kafka.KafkaPageSink.appendPage(KafkaPageSink.java:119)
	at io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSink.appendPage(ClassLoaderSafeConnectorPageSink.java:69)
	at io.prestosql.operator.TableWriterOperator.addInput(TableWriterOperator.java:257)
	at io.prestosql.operator.Driver.processInternal(Driver.java:384)
	at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
	at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
	at io.prestosql.operator.Driver.processFor(Driver.java:276)
	at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1076)
	at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.prestosql.$gen.Presto_345_79_gc367874____20201103_100440_2.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

The problematic code is in AvroRowEncoder since it uses getName instead of getMapping here https://github.com/prestosql/presto/blob/22717c61419c851b1c3814b60a3453b22d4a45ef/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/avro/AvroRowEncoder.java#L91.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions