Description
Describe the bug
- This is related / tangential to Add spilling in SortMergeJoin #9359
My primary problem is that I am trying to sort 100 million strings and always getting errors like:
Resources exhausted: Failed to allocate additional 39698712 bytes for ExternalSorterMerge[0] with 4053021824 bytes already allocated - maximum available is 26915520
After reading through the sort impl a bit I have noticed a few concerns (recorded in additional context)
To Reproduce
I don't have a df reproduction but this reproduces it for me in lance:
import pyarrow as pa
import lance
# Once the dataset has been generated you can comment out this generation code and reproduce the issue very quickly
print("Generating data")
my_strings = [f"string-{i}" * 3 for i in range(100 * 1024 * 1024)]
my_table = pa.table({"my_strings": my_strings})
print("Writing dataset")
ds = lance.write_dataset(
my_table, "/tmp/big_strings.lance", mode="overwrite", schema=my_table.schema
)
del my_table
# End of generation code
ds = lance.dataset("/tmp/big_strings.lance")
print("Training scalar index")
# To create a scalar index we must sort the column, this is where the error occurs
ds.create_scalar_index("my_strings", "BTREE")
Expected behavior
I can sort any number of strings, as long as I don't overflow the disk
Additional context
Here is how I understand memory accounting in the sort today:
- As batches arrive, they are placed in an accumulation queue, and the size of the spillable reservation grows
- Once the pool is out of space we begin the spill process
- The first part of the spill process is to sort the accumulation queue (which, at this point, has many batches in it)
- Each batch becomes an input stream for a SortPreservingMergeStream (this is a LOT of inputs).
- When the batch input stream is polled the batch is sorted, the unsorted batch is dropped (and removed from the spillable reservation) and the sorted batch is returned. The SortPreservingMergeStream then puts this batch in the batch builder (which adds it to the non-spillable reservation). This is a problem, as described below
- As the sort preserving merge stream is polled it polls the various inputs, fills up the batch builder, and then starts to emit output batches
- Back in the sort exec the sort preserving merge stream is fully drained (try_collect). This is a problem, as described below
- These collected and sorted batches are then written to the spill file.
The first problem (and the one causing my error) is that a sorted batch of strings (the output of sort_batch
) is occupying 25% more memory than the unsorted batch of strings. I'm not sure if this buffer alignment, padding, or some kind of 2x allocation strategy used by the sort, but it seems reasonable something like this could happen. Unfortunately, this is a problem. We are spilling because we have used up the entire memory pool. We now take X bytes from the memory pool, convert it into 1.25 * X bytes, and try to put it back in the memory pool. This fails with the error listed above.
The second problem is that we are not accounting for the output of the sort perserving merge stream. Each output batch from the sort preserving merge stream is made up of rows from the various input batches. In the degenerate case, where the input data is fully random, this means we will probably require 2 * X bytes. This is because each output batch is made up of 1 batch from each input stream. We can't release any of the input batches until we emit the final output batch.
The solution to this second problem is that we should be streaming into the spill file. We should not collect from the sort preserving merge stream and then write the collected batches into the spill file. This problem is a bit less concerning for me at the moment because it is "datafusion uses more memory than it should" and not "datafusion is failing the plan with an error". We don't do a lot of sorting in lance and so we can work around it reasonably well by halving the size of the spill pool.