Skip to content

NullPointerException when there are some null columns in the schema #62

Closed
@nicolae-gorgias

Description

@nicolae-gorgias

Describe the bug

Clickhouse-kafka-connect fails with NullPointerException when there are some null columns in the schema.

Full traceback:

java.lang.NullPointerException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWritePrimitive(ClickHouseWriter.java:174)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteCol(ClickHouseWriter.java:210)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:291)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:307)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2023-02-24 15:52:59,493] ERROR [tracking-events-clickhouse-sink|task-2]  WorkerSinkTask{id=tracking-events-clickhouse-sink-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:312)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2023-02-24 15:52:59,497] ERROR [tracking-events-clickhouse-sink|task-2]  WorkerSinkTask{id=tracking-events-clickhouse-sink-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:312)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more

Steps to reproduce

  1. Create a table with Nullable columns
  2. Declare a schema that supports null values
  3. Produce a message with null value

Expected behavior

Insert null values if the table and schema support null values.

The solution should be similar to #55 for schemaless.

Workaround

Don't use Nullable columns according to best practices

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions