Closed
Description
Currently, I use Flink JDBC + CK Driver to write data to clickhouse in real time, and there is a high probability of encountering the following errors , So maybe we can support writing empty batch ?
Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:154)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:160)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:73)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:113)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:94)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:163)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.sql.SQLException: Please call addBatch method at least once before batch execution
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.sql.SQLException: Please call addBatch method at least once before batch execution
at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:43)
at com.clickhouse.jdbc.SqlExceptionUtils.emptyBatchError(SqlExceptionUtils.java:111)
at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.executeAny(InputBasedPreparedStatement.java:95)
at com.clickhouse.jdbc.internal.AbstractPreparedStatement.executeLargeBatch(AbstractPreparedStatement.java:85)
at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.executeBatch(ClickHouseStatementImpl.java:568)
at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor.executeBatch(TableBufferedStatementExecutor.java:64)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
... 8 more