-
Notifications
You must be signed in to change notification settings - Fork 168
Writing parcels output in Parquet format #1399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Having to remove 'once' because this does not work in parquet's column-based format
And fixing flake8 error because of merge with plotting-removal PR
I'd very strongly advocate for making sure there is a way of "storing" the particle data in a way working for minimal infrastructure (ie no disk) and with minimal software dependencies. In a way, #1303 did a lot more than enabling just a specific type of output store: As it is very easy to create a valid Zarr store (e.g. as an interface to a database etc), allowing for an arbitrary Zarr store opened up an easy way of hacking arbitrary output strategies into Parcels without touching the Parcels code. |
OK, good point @willirath. I agree that we should maintain a way to write to memory (instead of to disk) in Parcels. But I'd be reluctant for now to keep both zarr and Parquet support, because it bloats the code considerably and make maintenance more challenging. I'm assuming there's also a way to create and update a pyarrow table in memory. Does anyone have experience? Alternative is to write into a pandas dataframe, but this would not work well with advantage number 4 above (writing from within the JIT loop). Since pyarrow has a c-implementation, that might be possible to write to? |
Also cleaning path creation for Parquet file
|
It is worth thinking about how the data will be written and accessed as storage formats are considered. In doing so, there are a number of basic things to consider.
It is unlikely that a single data layout will be optimal for all problems. But in general, I can imagine several common data access patterns (in the following, I am assuming the parquet is written out as rows containing a single observation for a single trajectory, and the row will contain all the data for that observation (e.g. lat, lon, age, etc); if this is wrong, let me know and I will update). I am also assuming that there is not a regular order in the parquet data set, so you can't easily calculate which row a given trajectory and observation are at. This is in general true since there is no guarantee that all particles will have the same number of valid observations.
If you are willing to spend some extra disk space, you can make a flat file like parquet or an SQL database much faster by calculating indices into the data set (see SQLLITE or perhaps parquet indices as described in https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/ (I have no experience with parquet indices)). These indices tradeoff disk space for access speed. (Note that SQLLITE has a severe storage penalty because it defaults to double precision floats and long integers, other databases do not have this issue). But without these indices, I think for many common access patterns, parquet would require you to read the entire data column time or observation for the data set to find the data you are looking for. For example, if you want to find all drifters that start at a given time, you would have to read all of the time and observation column in parquet, and only the time[:,0] in zarr or some other matrix oriented data format. I think this would often be a severe performance penalty. If parquet ends up being the default output format, it would be very useful for someone (perhaps me) to write a post-processing step that is 1) memory efficient and 2) writes to a matrix oriented format. One note on polars -- it appears to be in-memory only. Again, that will limit the use on small laptops or for big problems, and so is sub-optimal (or am I wrong about polars?). Jamie |
…rning In recent version of numpy, we see a "RuntimeWarning: invalid value encountered in cast". This commit fixes that
Fixing a bug where writing could be done multiple times for very small dt
for more information, see https://pre-commit.ci
|
Hi Jamie, I’ll focus on accessing data output in .parquet format given that I know much less about the details of writing trajectory data in Parcels. Firstly, I completely agree with your point that a single data structure will never be optimal for all users, and therefore would definitely support a post-processing option to output in the traditional matrix format as you suggested. With that said, I would like to tackle your primary concern regarding larger-than-memory trajectory .parquet files and the need to eagerly read an entire trajectory dataset to access individual trajectories or time-slices. To do this, I’ll consider the example of a collection Lagrangian trajectories released across 24 months in 1990-1991 in the 1/12 degree ORCA0083-N006 ocean sea ice hindcast simulation. The trajectories were evaluated using TRACMASS rather than Parcels and the output data is formatted as you mentioned with each row corresponding to a one observation of multiple variables (columns) of a single trajectory identified with a unique ID. Since TRACMASS writes to our output file in the main loop of the Fortran code, you are also correct that the tabular output data structure does not store trajectories in a regular order, but rather they are stacked on top of one another during runtime. As with many Lagrangian water mass transformation studies, I output the trajectories at every model grid cell crossing rather than at a regular time interval (i.e., every 5-days) since this allows us to calculate Lagrangian stream functions - I’m not sure that this is a feature in Parcels, but I’m just highlighting this for clarity. In our single .parquet file combining all seeding steps, we have a total of 272,454,859 observations of 10 variables along 834,668 trajectories. To access our trajectories, we’ll start by using the Polars DataFrame library out of the box, and crucially make use of the Lazy API (which is not subject to in-memory constraints) using the example code below: Importing Example Lagrangian Trajectory LazyFrameProcessing our LazyFrameNext, we will filter the LazyFrame for a single trajectory and extract its temperature. To do this, we'll first need to write a query, this will then be automatically optimised by Polars. To obtain the result as an eager DataFrame we execute the query using the collect() method. Here, we use streaming to improve performance by executing the query in batches - this is an especially good option for larger-than-memory datasets. A more common task might be to calculate some aggregated statistic along each Lagrangian trajectory. For example, if we want to find the minimum temperature along each trajectory we can create a query using the .groupby() and .agg() LazyFrame methods: The Lagrangian Trajectories ToolboxNaturally, users may feel that learning Polars syntax is a significant barrier to undertaking Lagrangian ocean analysis and so I have been developing a Lagrangian Trajectories Toolbox (LT_Toolbox) that centres around the TrajectoryStore (TrajStore) object. The TrajStore is designed to be both a container for Polars Data/LazyFrames and also includes common methods which would be useful for Lagrangian ocean analysis. This is still in very early development, but below is a quick example where we create a TrajectoryStore object which consists of a TrajFrame (Data/LazyFrame with at least one observation per trajectory) and a SummaryFrame (Data/LazyFrame with a single row per trajectory - designed to store aggregated statistics) and calculate the minimum temperature along a single trajectory: Undoubtedly, I'll have missed something above but I hope this at least addresses some of the concerns you outlined above. I'd be especially keen to get any feedback on the LT Toolbox as a developing library for Lagrangian ocean analysis - it's worth saying that I originally wrote this library to work with xarray for .nc or .zarr outputs from Parcels, but have since found myself on an all-consuming journey into learning Polars instead! Ollie |
|
I wonder if it makes sense to write out a test big dataset to play with? It is easier to optimize output code without having to get parcels perfect first. If I were to do so by going from the existing zarr files I have to parquet, it would be nice to have answers to the following questions:
If it is easy enough, I might try to make a big parquet file this afternoon. No promises. If I make a big file, where would be a good place to share it? I could perhaps throw it on a web server. Jamie OK, I have decided to do this with fast parquet. This will also allow easy experimentation with small numbers of initial particle writes. This can simulate the small-chunk size problem with zarr. I am not sure, but this might have similar issues with small files in parquet. Anyway, it will be easier to play with a code that easily allows going from zarr to parquet than dealing with the whole parcels framework. |
|
Thanks so much for your elaborate thoughts, @JamiePringle and @oj-tooth! Really good to have this open discussion about the (dis)advantages of Parquet before we implement it. Some quick responses to your specific questions, Jamie:
Actually, with bde07a3 the MultiIndex is
I have't touched MPI yet; will hopefully get to that next week. I first want to see whether we can use Parquet to write during the Kernel-loop (point 4 of the list at the top). Is it ok if I answer this later? About the Parquet library: I'm now using Note that @michaeldenes is also doing some comparisons between zarr and Parquet output now; he could maybe share one of his large output files for us all to explore? |
|
@erikvansebille, of course MPI can wait... On indexing by time instead of observation, that will be great for somethings but bad for others. However, if the output timing is not really strange, it should be relatively easy to deal with this if you need to. For example, I write out a particle age. This can be used to index on. Jamie |
Yep, you're right. Users can always reindex based on what fits their workflow best, using e.g. |
|
Just a quick update on the Parquet-process: I realised yesterday that Parquet probably won't work for #1388, because we would then need to create one Parquet file for each particle at each tilmestep. That could be millions of files! I'm now looking into SQLite as a format; since that is optimised for appending rows (particle positions at times). A basic version works in a047dca; where particles are added to the SQLite database in the JIT-Kernel too. There's still a lot to do in terms of optimisation and testing efficiency, and it might be that this is not the right solution either; but at least we now have a version where we can write output in the Kernels directly (and that is probably better than writing in csv format like most other Lagrangian frameworks do). So stay tuned! |
|
Some thoughts.
Having said all this, I have found SQLITE to robust and mature. It can't do everything, but it does what it says on the tin, and most problems have already been figured out by someone else. Also, CSV sucks at everything and longevity, and should be avoided. I hate to mention this... but would all your problems go away if you append each row a plain binary format like fortran unformatted, and then just post processes this journal file in the end? If you have the urge to throw something at me now, I forgive you... Jamie |
|
This train may have left the station, but I have done some benchmarking of parquet written as a single file with yarrows and zarr on a file that has 666,361,041 trajectories and 33 observations. The chunking in zarr is (50000, 10) and the row-grouping in parquet is 23359. I have no idea if the later is optimal. All reading of parquet is with Polars, as Ollie suggests. All tests made on a warm cache, but similar patterns obtain on a cold cache (it just takes somewhat longer). Summary: Parquet & Polars is surprisingly performant, but for my work patterns, Zarr is still much faster. Task 1: read all starting times (the first trajectory). Zarr takes 20 seconds, Parquet/Polars takes 50 seconds Task 2: find all starting trajectories for all starting times between months 3 and 4 (about 1/7th of the data set). For zarr, I use the starting times read above. This might seem cheating, but this is the pattern I would use. Zarr 5.4 seconds, Parquet/Polars 75 seconds. Similar patterns hold for other common tasks, like finding all particle releases 10 days after release for months 3 and 4. It might be that the data access patterns that spring to my mind are the ones that are easy to do in Zarr since I am used to it. But the performance is significantly, if not overwhelmingly, better than parquet in these cases. Jamie |
|
Hi Jamie, Thanks for undertaking an initial benchmarking exercise, it sounds like you've done a fantastic job of optimising your work with Zarr outputs for your research! Is there any chance you could post the Polars queries you used for Tasks 1 & 2? I'd be interested to see if there are any alternative and more performant options (Polars queries can be quite an art form given the lack of clarity in some of the docs). It's worth also saying that I don't anticipate Polars to be more performant on all tasks (especially those making use of the matrix structure of Zarr), but there are many cases when working with fewer (orders millions) but much more diverse trajectories (large range of observations per trajectory), as I do in the subpolar North Atlantic, where Polars has worked very well. Thanks again for your update. Ollie |
|
Ollie -- of course -- I am sure these are not optimal... |
|
Hi Jamie, Your tasks definitely appear to be well formulated for working in Zarr! Given that your examples are focused on filtering and selecting data, I've included an alternative approach using Pyarrow directly below. I've found that for filtering large .parquet files, using Pyarrow's read_table() function and including a custom filter can yield significant performance improvements over the Polars query engine. In the example below, I output to a Pandas DataFrame, but you could easily transform this into a Polars LazyFrame if you then wanted to take advantage of their efficient expressions and groupby operations. Could you let me know if there's an improvement with the Pyarrow approach? I can't imagine it will reach Zarr's level of performance, but I'd be interested to know the difference from your original Polars query. Ollie |
|
OK, so I've done some more testing of the zarr (current master; bf98c97) versus Parquet (23483c5) versions of Parcels. I used the code below to randomly move 100,00 particles for 100 time steps. def Moveonly(fieldset, particle, time):
particle.lon += ParcelsRandom.random()
particle.lat += ParcelsRandom.random()
fieldset = FieldSet.from_data({'U': 0., 'V': 0.}, {'lat': 0., 'lon': 0.})
npart = int(1e5)
pset = ParticleSet(fieldset, pclass=JITParticle, lon=[0] * npart, lat=[0] * npart)
output_file=pset.ParticleFile(fname, outputdt=1)
pset.execute(kernel, runtime=100, dt=1, output_file=output_file)The results were (on my MacBook Pro)
So I'm a bit disappointed by Parquet for more complex simulations. For #1388, I've thus decided to go back to zarr format (now #1402), and we'll have to work on another solution for the slowdown for many Note that users can still convert zarr output to Parquet for their own analysis after executing parcels if that improved performance for them; this PR was really about the native format for writing. |
|
Closing this PR as #1402 is the chosen implementation |
This PR implements a new way of writing, using the Parquet format developed by Apache and used in many big data projects. It was suggested by @oj-tooth, who converts the zarr-output of parcels v2.4 to Parquet for more powerful analysis using database-concepts.
This PR implements direct writing of Parquet files by parcels. Advantages are
The files are typically 10x smaller than zarr-output
No more need for
chunkparameters inParticleFileNo slowdown expected for many repeatdt calls on small initial sets (so no need for Removing repeatdt #1387)
However, the main reason to implement this now, is that it might be possible to write directly in the JIT-loop, which is required for a clean implementation of [OUTDATED; DO NOT MERGE] Vectorial summing of particle displacement at end of kernel loop #1388. This is because Parquet also has a well-established C-library. But before we can explore moving the
pfile.write()into thekernel-execution-loop (instead of thepset-execution-loop), we first have to implement it as cleanly as possible here.One of the (dis?)advantages of using Parquet is that
to_write='once'cannot be supported anymore, because Parquet is a tabular format (essentially, each observation of a particle is a row in a table). This means that unmutable Variables (liketrajectory) will be written at each time step. However, the memory overhead of these Variables will be small, because they can be easily zipped; and removing support forto_write='once'does makes part of the parcels code quite a bit cleaner and simpler.There are a few open issues that need to be fixed before this PR can be merged
np.datetime64dtype in the Parquet fileMemorystores(like Allow for Zarr stores being passed to ParticleFile #1303; @willirath could you weigh in?)pyarrowor thefastparquetpackage (or another one)