Delta Lake: Trino fails to write checkpoint on tables containing Infinity
#24029
Description
Hi folks,
We've recently noticed an issue where the jobs optimizing our Delta Lake tables were silently failing to write checkpoints.
The query appears to complete successfully:
$ ALTER TABLE foo EXECUTE optimize;
ALTER TABLE EXECUTE
rows
------
(0 rows)
Query 20241025_143716_00027_7d36b, FINISHED, 2 nodes
Splits: 8 total, 8 done (100.00%)
0.22 [0 rows, 0B] [0 rows/s, 0B/s]
But upon looking at the Delta Table, no new checkpoint was created; eventually leading to very long analysis times (after a few thousand log entries piled up without a checkpoint 😬 ).
Looking into it, we noticed the following error log in the coordinator:
2024-11-04T12:39:37.774Z ERROR 20241104_123936_00273_m45u9.0.0.0-16-159 io.trino.plugin.deltalake.DeltaLakeMetadata Failed to write checkpoint for table foo for version 1234
And the full stacktrace:
java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Double (java.lang.String and java.lang.Double are in module java.base of loader 'bootstrap')
at io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue(DeltaLakeParquetStatisticsUtils.java:131)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$preprocessMinMaxValues$15(CheckpointWriter.java:434)
at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$preprocessMinMaxValues$16(CheckpointWriter.java:428)
at java.base/java.util.Optional.map(Optional.java:260)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.preprocessMinMaxValues(CheckpointWriter.java:420)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.writeMinMaxMapAsFields(CheckpointWriter.java:391)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$writeParsedStats$12(CheckpointWriter.java:375)
at io.trino.spi.block.RowBlockBuilder.buildEntry(RowBlockBuilder.java:111)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.writeParsedStats(CheckpointWriter.java:361)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$writeAddFileEntry$5(CheckpointWriter.java:279)
at io.trino.spi.block.RowBlockBuilder.buildEntry(RowBlockBuilder.java:111)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.writeAddFileEntry(CheckpointWriter.java:251)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.write(CheckpointWriter.java:165)
at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager.writeCheckpoint(CheckpointWriterManager.java:158)
at io.trino.plugin.deltalake.DeltaLakeMetadata.writeCheckpointIfNeeded(DeltaLakeMetadata.java:2766)
at io.trino.plugin.deltalake.DeltaLakeMetadata.finishOptimize(DeltaLakeMetadata.java:2614)
at io.trino.plugin.deltalake.DeltaLakeMetadata.finishTableExecute(DeltaLakeMetadata.java:2555)
at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishTableExecute(ClassLoaderSafeConnectorMetadata.java:239)
at io.trino.tracing.TracingConnectorMetadata.finishTableExecute(TracingConnectorMetadata.java:177)
at io.trino.metadata.MetadataManager.finishTableExecute(MetadataManager.java:354)
at io.trino.tracing.TracingMetadata.finishTableExecute(TracingMetadata.java:227)
at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4121)
at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
at io.trino.operator.Driver.processInternal(Driver.java:403)
at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
at io.trino.operator.Driver.tryWithLock(Driver.java:709)
at io.trino.operator.Driver.process(Driver.java:298)
at io.trino.operator.Driver.processForDuration(Driver.java:269)
at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
at io.trino.$gen.Trino_testversion____20241104_190209_71.run(Unknown Source)
at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1570)
Expected outcome
Table is optimized and a new checkpoint is written.
Actual outcome
Silent (from the user's point of view) failure: checkpoint fails to write, query completes successfully.
Reproduction steps
Assuming a delta lake catalog called delta
and a schema schema
:
CREATE OR REPLACE TABLE delta.schema.test_optimize (id int, foo double);
INSERT INTO delta.schema.test_optimize VALUES (1, cast('Infinity' as double));
ALTER TABLE delta.schema.test_optimize EXECUTE optimize;
Confirmed with both S3 and Minio as backends.
Detected in Trino version 453 (with a few patches) and confirmed on Trino 463 vanilla from upstream.
Activity