ParallelIterable may cause memory pressure for presto coordinator #3179
Description
-
No limit for split queue
Presto iceberg connector useBaseTableScan#planTasks()
to generate splits which is processed by aParallelIterable
to plan parallelly. The base logic ofParallelIterable
is that: whenhasNext()
is called, plan tasks(each task for each manifest) are submitted to a thread pool to plan data files from each manifest, and the generatedFileScanTask
are stored in a queue and then are consumed bynext()
. However there is no limit size or target size for the queue. That is to say when a task is submitted, all data files of the manifest in the task will be added to the queue continuously unitl finished. So in extreme cases, if a manifest has a huge amount of a datafiles and the consume speed is slow meanwhile, the size of the queue will grow quickly and cause memory pressure of the presto coordinator , especially when theFileScanTask
size is big, eg. planning table with extremelly complicated schema. -
Split queue is not cleared after iterator closed
Another problem is that when theParallelIterable
is closed, the queue is not cleared as the code shows below.
Since presto keeps history queries for a while, theFileScanTask
remaining in the queue are referenced by scheduler and then reference by a presto query which can not collected by gc. This may cause the coordinator OOM in extremelly cases.
In our test, we set up a presto coordinator (worker node as well) of -Xmx=40GB, and query a sql like select col from table limit 1
in which the table
has a verg complicated schema. When this sql is submitted continuously 8-10 times, the coordinator crashed caused of a oom error. We found that the FileScanTask
s in the ParallelIterator#queue
reached 2.5GB per query in the heap dump, and filled the heap of jvm quickly.
Of course, we can set null
explicitly to the iterator reference of combinedScanIterable
in presto iceberg connector to let gc collect the splits after query completed, but I think it's resonable to clear the queue when closing the ParallelIterator
since the iterator can not be accessed anymore after closed.
@Override
public void close() {
// cancel background tasks
for (int i = 0; i < taskFutures.length; i += 1) {
if (taskFutures[i] != null && !taskFutures[i].isDone()) {
taskFutures[i].cancel(true);
}
}
this.closed = true;
}
Maybe we can add some improvement for the ParallelIterable
eg. add limit size and clear the queue after closed.