-
Couldn't load subscription status.
- Fork 1.7k
Description
Creating as high-level ticket to hopefully get consensus on the approach, before potentially creating lower level tickets
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently ObjectStore::file_reader returns an Arc<dyn ObjectReader>, this in turn has a method ObjectReader::sync_chunk_reader which takes a byte range.
In the case of parquet, a ChunkObjectReader wraps this ObjectReader and adapts it to the parquet::ChunkReader trait. The result is that the parquet reader calls ObjectReader::sync_chunk_reader for the byte range of each column chunk, of which there will be one per-column per-RowGroup, which in turn performs a range request to object storage to fetch the bytes.
As pointed out by @mateuszkj on datafusion-contrib/datafusion-objectstore-s3#53 this unfortunately results in a large number of small requests to S3 (there are also metadata requests which I will cover in a separate ticket concerning catalogs).
In the case of CSV, JSON, etc... ObjectReader::sync_reader is used which is equivalent to calling sync_chunk_reader with the length of the file, and will therefore buffer the entire file in memory.
This approach therefore has two aspects that could be improved:
- Potentially large numbers of very small requests to object storage adding latency and cost
- Potentially large amounts of data buffered in memory
Describe the solution you'd like
The simplest solution is to download the entire file to temporary local storage. This is what IOx currently does and it works well.
The next obvious improvement would then be to use the MemoryManager and DiskManager functionality added by @yjshen in #1526 to buffer in memory initially and only spill to disk under memory pressure.
I suspect for many use-cases this will perform very well, the key observations being:
- Data stored in non-archival object store tiers is billed on request count, and not the amount of data transferred
- Data transfer from object storage within the same region is extremely fast (10+ Gbps)
A final extension might be to add functionality to fetch smaller byte ranges based on projection and predicate pushdown, I started experimenting with an API of what this might look like here, but I don't have a good handle on how to balance the trade-offs of making too many requests vs requesting data we don't need, and I'm personally inclined to punt on this at least initially...
I'm not very familiar with how spark, etc... solve this problem, this is just based on my intuition, and so perhaps @sunchao or someone with more familiarity with that ecosystem might be able to provide some insight here.
Describe alternatives you've considered
One option we are likely to implement for IOx is having a shared, instance-local, read-through, disk-based Object Storage cache. The idea being to use the ephemeral NVMe disk that is available on cloud provider VMs as a shared cache for one or more query engines running on that instance. This effectively works around this problem by making all IO done by the query engine to very fast local disk, with a separate process handling interaction with object storage as required. It will also accelerate repeated queries to the same "hot" dataset. I would be very happy to write up some tickets if someone wanted to take this on.
This blog post written by @jorgecarleitao proposes streaming files block-wise (thank you @xudong963 for the link). This is close to what the implementation currently does, however, it comes with the drawbacks listed above. FWIW I have also not found this approach to perform especially well on local files either, see here, but I could have been doing something wrong.
Additional context