Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Surprisingly high Swap usage with read_parquet - much higher than eventual RAM usage #8925

Open
2 tasks done
ianozsvald opened this issue May 19, 2023 · 7 comments
Open
2 tasks done
Labels
A-io Area: reading and writing data bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@ianozsvald
Copy link

Polars version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest version of Polars.

Issue description

I'm experimenting with materialising 100Ms of rows of parquet data using Polars in RAM (64GB, 8 physical cores, 1TB SSD), using a freshly built environment. I'll be using Polars for my PyDataLondon 2023 talk in 2 weeks to talk about the practical side of using Pandas 2 vs Polars (vs Dask) and I'd like to give a fair summary. If I should ask this in Discord instead, just say.

I'll explain the data load behaviour, I've got 1 question and a couple of observations (in case they're helpful).

I'm loading 300M rows, this costs 35GB after load. If I try to load more rows of data then I exhaust the system resources, even though I have 30GB of free RAM. I was hoping to load more rows before running out of resources.

I'm using variations of dfp = pl.read_parquet(parquet_test_result + "/*.parquet", n_rows=300_000_000, columns=cols, low_memory=True) to load a limited number of rows of data, starting at 100M and stopping at 300M. If I try to read more (e.g. 400M rows) then I run out of resource and the Jupyter Kernel dies. I had hoped to load all 650M rows into RAM but this seems impossible (and I know about scan_parquet - here I'm exercising what I can easily get into RAM all in one go).

The Ubuntu system behaviour is a bit surprising, I thought I'd log it here in case anyone has suggestions. I think I'm not RAM-limited but instead SWAP-limited. When I try to read above 300M rows I clearly exhaust the system SWAP partition (with 100GB size), once the kernel crashes the SWAP is cleared back to 0.

Here's an example of the system's behaviour whilst successfully loading 300M rows. RAM quickly maxes out at 64GB and stays there. CPUs are all lightly busy (only 20-40% usage in the system). Swap steadily increases and for 300M rows it stops at around 65GB SWAP, once the data is fully loaded (130 seconds) the SWAP usage drops back to 0. This is using low_ram=True.

polars swap usage Screenshot from 2023-05-19 11-03-03

QUESTION - am I right to expect that although my dataframe costs circa 35GB in RAM, I can't load more rows as SWAP is exhausted? Am I missing a trick to loading more rows into RAM?

If I use dstat to measure disk-load I can se that this process is loading 0.5GB-1.4GB/second and writing 2.3GB/second (which presumably is the swap write). I wasn't expecting a read_parquet to be writing to disk. Is there any documentation about what's happening here?

polars rw Screenshot from 2023-05-19 11-03-34

If I perform the above operation with low_memory=False then I get similar behaviour, but SWAP maxes at 80GB (not 65GB), RAM maxes at 64GB still and it still loads, in the same amount of time. I'm not sure this flag is changing the behaviour much.

If I try use_pyarrow then the data load fails - it refuses to see my parquet files - but Dask and Polars' rust pyarrow loader see the same path just fine. I'll continue to experiment with this.

After load the dataframe in RAM consumes approximately 35GB. If I del dfp then RAM usage drops to 5-10GB and I can't free up the last 10GB (e.g. running gc.collect() doesn't clear the RAM any more and %whos shows no other lurking results, the df didn't get printed so it shouldn't be in the history). There's nothing else in my Notebook. If I reset the Kernel then the I get back to circa 0GB allocated to the process. It feels like after deleting, something is still allocated.

Versions:

Polars 0.17.10, Python 3.11, Linux Cinnamon (Ubuntu), using Jupyter Labs.

Data

The data is UK vehicle test data ("MOT data") detailing annual vehicle safety and emissions tests, covering 20 years and circa 600M rows of data with a mix of datetime, string and integers. The original CSV files are parsed with Dask and output as pyarrow parquet files.

I have 855 Parquet files, 20MB each, SNAPPY compressed and I load the following columns. I load all but one column, the last is path generated by Dask as a categorical and Polars doesn't like loading it, so I exclude it:

cols = ['test_id', 'vehicle_id', 'test_date', 'test_class_id', 'test_type', 'test_result', 'test_mileage', 
        'postcode_area', 'make', 'model', 'colour', 'fuel_type', 'cylinder_capacity', 'first_use_date', ] 

The Parquet file schema is

parquet_test_result = "../test_result.parquet"
pl.read_parquet_schema(parquet_test_result + "/part.1.parquet")

{'test_id': Int64,{'test_id': Int64,
 'vehicle_id': Int64,
 'test_date': Datetime(time_unit='ns', time_zone=None),
 'test_class_id': Int64,
 'test_type': Utf8,
 'test_result': Utf8,
 'test_mileage': Int64,
 'postcode_area': Utf8,
 'make': Utf8,
 'model': Utf8,
 'colour': Utf8,
 'fuel_type': Utf8,
 'cylinder_capacity': Int64,
 'first_use_date': Datetime(time_unit='ns', time_zone=None),
 'path': Categorical}
 'vehicle_id': Int64,
 'test_date': Datetime(time_unit='ns', time_zone=None),
 'test_class_id': Int64,
 'test_type': Utf8,
 'test_result': Utf8,
 'test_mileage': Int64,
 'postcode_area': Utf8,
 'make': Utf8,
 'model': Utf8,
 'colour': Utf8,
 'fuel_type': Utf8,
 'cylinder_capacity': Int64,
 'first_use_date': Datetime(time_unit='ns', time_zone=None),
 'path': Categorical}

Reproducible example

Described above, but datafiles aren't easy to share (but could be shared it required)

Expected behavior

Detailed above

Installed versions

---Version info---
Polars: 0.17.10
Index type: UInt32
Platform: Linux-5.15.0-70-generic-x86_64-with-glibc2.35
Python: 3.11.3 (main, Apr 19 2023, 23:54:32) [GCC 11.2.0]
---Optional dependencies---
numpy: 1.23.5
pandas: 2.0.1
pyarrow: 12.0.0
connectorx: <not installed>
deltalake: <not installed>
fsspec: 2023.5.0
matplotlib: 3.6.3
xlsx2csv: <not installed>
xlsxwriter: <not installed>
@ianozsvald ianozsvald added bug Something isn't working python Related to Python Polars labels May 19, 2023
@ritchie46
Copy link
Member

Hi Ian!

There's a lot to take in here, so let's see where we could start.

Scanning

First things first of course read_parquet WILL load all data in RAM an cannot apply any optimization to the scan level. scan_parquet (and maybe the streaming engine, but it is still in alpha) is recommended when dealing with larger file sizes.

Rechunk

Polars read_parquet defaults to rechunk=True, so you are actually doing 2 things; 1: reading all the data, 2: reallocating all data to a single chunk. This reallocation takes ~2x data size, so you can try toggling off that kwarg.

much higher than eventual RAM usage

During reading of parquet files, the data needs to be decompressed. So you have several buffers, the compressed data buffers, the decompression result buffers and then finally the columnar format of the read data. You will always have at least 2 of those buffers in memory, so it seems logical to me that the reading operation takes more memory than the final stale DataFrame.

File written

Which system has written the file? I'd recommend updating to latest polars for writing parquet files, we recently improved the default row groups size to have better parallelism.

If I try use_pyarrow then the data load fails - it refuses to see my parquet files - but Dask and Polars' rust pyarrow loader see the same path just fine. I'll continue to experiment with this.

That's because you have a globbing pattern *.parquet. Polars' translates that glob to (pseudo code): pl.concat([pl.read_parquet(..) for f in glob)]). We don't support that for the pyarrow reader.

Again, I think the rechunk here is very expensive.

After load the dataframe in RAM consumes approximately 35GB. If I del dfp then RAM usage drops to 5-10GB and I can't free up the last 10GB (e.g. running gc.collect() doesn't clear the RAM any more and %whos shows no other lurking results

Yes, that's how memory allocators work. They tend to keep memory around without giving it to the OS. If a program does a large amount of heap allocations, especially smaller ones that fragment the heap, the memory allocator will hold on to it. This is not that bad as it sounds as the memory allocator will reuse that memory for the process.

TLDR

Try with rechunk=False and try probably use scan_parquet (I don't think Dask materializes a parquet file into memory as well, right?).

@ianozsvald
Copy link
Author

Thanks for the feedback Ritchie.

Using dfp = pl.read_parquet(parquet_test_result + "/*.parquet", n_rows=300_000_000, columns=cols, rechunk=False) (the same 300M rows as above plus rechunk=False) has the same memory behaviour as above - it loads, but uses a lot of SWAP.

Using dfp = pl.read_parquet(parquet_test_result + "/*.parquet", n_rows=400_000_000, columns=cols, rechunk=False) fails with SWAP exhaustion, which is what happened with rechunk=True as the default before.

Re. Dask if you use ddf.read_parquet(...).compute() it'll read all the partial Parquet files and materialise them - I haven't tried that yet on this dataset. I'm assuming Dask will run out of RAM as it'll have to run a sequence of pd.concat operations which are bound to use a lot of memory (in case e.g. categorical columns need reverting to numpy strings and whatever other operations need handling, I guess indexes might get reset too).

The data was originally generated by Dask - directly reading 100s of CSVs and writing them out with the default to_parquet with PyArrow datatypes. Notably Dask then adds a path column which is categorical-encoded (as it is duplicated, the same path for each row of the same CSV), which is probably a silly default if you're asking for an all-PyArrow export. I need to file this issue for Dask.

Trying dfp = pl.scan_parquet(parquet_test_result + "/*.parquet", n_rows=400_000_000, ).select(cols).collect() fails, but whilst it is running I observed the same SWAP-exhaustion. I know that scan_... can be lazy-chained but my goal here is just to get the data into RAM to see how much fits with each tool (because normally it is nice to work with what's in RAM, directly, when exploring new datasets). I believe in this case it is just doing the same as read_parquet.

I was hoping that Polars might do something like pre-allocate the required DataFrame memory block (as you know the row count and dtype sizes), then materialise into that space, without many partial operations...but that was just a hope and this is a part of my learning curve. No worries.

Re. memory allocation behaviour after del dfp - cool. I wasn't sure if this was a Rust-related memory behaviour but I'm happy to accept this is probably an OS-level thing.

Routes forward could include:

  • accepting that I can materialise 300M rows from the current Parquet, and just work from those in RAM
  • use scan_parquet and materialise a subset result - not at all silly, but basically a mirror of what I'll have to do with Dask
  • re-export using Dask (csv->parquet) but disable the "write paths" (which adds a categorical Pandas string) option, so the file is all-PyArrow with no additional categorical - this opens up using use_pyarrow perhaps

Having the data in RAM isn't critical, but I'm hoping to do a side by side Polars vs Pandas 2 comparision (actually Pandas 2 PyArrow & Pandas 2 NumPy), so that needs data in RAM, so I need to figure out a sensible dataset for that task. Hence my playing and trying to understand behaviours and limitations of various approaches. Cheers for your feedback!

@ianozsvald
Copy link
Author

Possibly the rechunk behaviour is related to #8932

@csubhodeep
Copy link

csubhodeep commented May 20, 2023

@ianozsvald Thanks a lot for opening this super interesting thread.

Just a few (very stupid) suggestions since I have very limited knowledge in this topic. Did you try the following:

  1. Use pyarrow.dataset.Dataset API to load the same files. References here.

  2. I know you mentioned

If I try use_pyarrow then the data load fails - it refuses to see my parquet files - but Dask and Polars' rust pyarrow loader see the same path just fine. I'll continue to experiment with this.

But this time just point polars to the directory path and pyarrow would grab the files.

  1. Set memory_map to False

I am curious about the outcome.

@ritchie46
Copy link
Member

like pre-allocate the required DataFrame memory block (as you know the row count and dtype sizes), then materialise into that space, without many partial operations...but that was just a hope and this is a part of my learning curve

We do that with rechunk, but for nested types the size is only known after materialization, not before.

You can also set parallel=False to ensure only one file at a time may allocate.

Another question. Why do you want to materialize all the data in memory?

@ianozsvald
Copy link
Author

@csubhodeep thanks for the reply. I've noted to try these, I've moved on with my queries now as I can get enough data into memory for exploration. I got a bit lost in the various options I could try, between the two Parquet engines, then waiting a few minutes for each to blow up before my machine would respond again :-) If I try your suggestions (I've noted them), I'll reply back here.

@ianozsvald
Copy link
Author

@ritchie46 thanks for the suggestion, I've added it to my notes which I hope to come back to.

As for "why in RAM" - I guess because I'm used to doing that with 10 years of Pandas, also because in e.g. #8933 you say that some operations are only supported for Eager dataframes and I'm quite used to using value_counts to explore unknown data. Finding that a lazy query (e.g. trying to limit a data load - my colleague Giles hopefully will post an issue on this) can cause the machine to temporarily lock up in a similar way to this issue has made me a bit cautious.

For the conference talk I want to compare Pandas to Polars, so if I've got data in RAM for Pandas then that's the mode people are expecting, then I can look at the rows, plot, stick it in sklearn, check my schema is sane with pandera, maybe doing exploratory work with ydata profiling. It feels like I need to keep two slightly-different APIs in my head which slows down my learning speed. We'll also compare Dask vs Polars streaming which is much less frequently my use case, but definitely of interest. Does that make sense?

The fact that Polars is fast when reading Parquet is definitely cool and might make me reconsider my workflow, but so frequently my datasets are small (a few GB, maybe less - think of Corporate Excel sheets that need some ML) so they just fit into RAM, so I just want the easy path to figuring out if my data is any good for my uses (which then switch to plotting, building some models). Dask always felt clunkier for bigger datasets but we're getting nice speed timings with Dask for this exploration too.

@stinodego stinodego added needs triage Awaiting prioritization by a maintainer A-io Area: reading and writing data labels Jan 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-io Area: reading and writing data bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

4 participants