Skip to content

Conversation

@erikvansebille
Copy link
Member

@erikvansebille erikvansebille commented Jul 17, 2023

This PR implements a new way of writing, using the Parquet format developed by Apache and used in many big data projects. It was suggested by @oj-tooth, who converts the zarr-output of parcels v2.4 to Parquet for more powerful analysis using database-concepts.

This PR implements direct writing of Parquet files by parcels. Advantages are

  1. The files are typically 10x smaller than zarr-output

  2. No more need for chunk parameters in ParticleFile

  3. No slowdown expected for many repeatdt calls on small initial sets (so no need for Removing repeatdt #1387)

  4. However, the main reason to implement this now, is that it might be possible to write directly in the JIT-loop, which is required for a clean implementation of [OUTDATED; DO NOT MERGE] Vectorial summing of particle displacement at end of kernel loop #1388. This is because Parquet also has a well-established C-library. But before we can explore moving the pfile.write() into the kernel-execution-loop (instead of the pset-execution-loop), we first have to implement it as cleanly as possible here.

One of the (dis?)advantages of using Parquet is that to_write='once' cannot be supported anymore, because Parquet is a tabular format (essentially, each observation of a particle is a row in a table). This means that unmutable Variables (like trajectory) will be written at each time step. However, the memory overhead of these Variables will be small, because they can be easily zipped; and removing support for to_write='once' does makes part of the parcels code quite a bit cleaner and simpler.

There are a few open issues that need to be fixed before this PR can be merged

  • Check how to add metadata to Parquet format (perhaps with a separate json-file in the Parquet-directory?)
  • Check how to keep the np.datetime64 dtype in the Parquet file
  • Check whether there is a need also for supporting Memorystores (like Allow for Zarr stores being passed to ParticleFile #1303; @willirath could you weigh in?)
  • Check whether and/or how to align with the GeoParquet standard format
  • Add support for MPI
  • Decide on whether to use the pyarrow or the fastparquet package (or another one)
  • Check performance in real-world simulations (not only unit-tests and examples; @michaeldenes could you help out?)
  • Update the documentation and tutorials to show how Parquet works.

@willirath
Copy link
Collaborator

I'd very strongly advocate for making sure there is a way of "storing" the particle data in a way working for minimal infrastructure (ie no disk) and with minimal software dependencies. In a way, #1303 did a lot more than enabling just a specific type of output store: As it is very easy to create a valid Zarr store (e.g. as an interface to a database etc), allowing for an arbitrary Zarr store opened up an easy way of hacking arbitrary output strategies into Parcels without touching the Parcels code.

@erikvansebille
Copy link
Member Author

I'd very strongly advocate for making sure there is a way of "storing" the particle data in a way working for minimal infrastructure (ie no disk) and with minimal software dependencies. In a way, #1303 did a lot more than enabling just a specific type of output store: As it is very easy to create a valid Zarr store (e.g. as an interface to a database etc), allowing for an arbitrary Zarr store opened up an easy way of hacking arbitrary output strategies into Parcels without touching the Parcels code.

OK, good point @willirath. I agree that we should maintain a way to write to memory (instead of to disk) in Parcels. But I'd be reluctant for now to keep both zarr and Parquet support, because it bloats the code considerably and make maintenance more challenging.

I'm assuming there's also a way to create and update a pyarrow table in memory. Does anyone have experience? Alternative is to write into a pandas dataframe, but this would not work well with advantage number 4 above (writing from within the JIT loop). Since pyarrow has a c-implementation, that might be possible to write to?

@JamiePringle
Copy link
Collaborator

It is worth thinking about how the data will be written and accessed as storage formats are considered. In doing so, there are a number of basic things to consider.

  1. For small datasets, say less than a few gigabytes, it really does not matter what you do that is more efficient than CSV, because you can just read it into memory and sort on the fly. This covers almost all of the test cases, which means the test cases are not that useful for understanding IO performance.
  2. For large cases (for example, my typical zarr files are between (11393700, 130) and (666361041, 33) in (trajectories x observations)), managing the data efficiently is very important. Individual datasets are often larger than machine memory.
  3. The order in which data is written out is potentially, and often, as many different trajectories at different observations and potentially different model times. This must be efficient, or the ocean parcels run time will suffer. Most often, the order of efficient writing is very different from the order of efficient reading.
  • The current zarr code handles this by having each MPI process write to its own output, and some fairly complicated book keeping to handle writing different observations at the same write step (at least when particles start at different times).
  • The prior *.npz output was essentially a journal file, and wrote out the data as it was ready, and then organized it in a subsequent in memory organization step which was a severe bottle neck to larger runs. This post-processing step limited the size of the output to the size of the machine memory. This was a common problem.
  • It is worth noting that all modern big data formats, including parquet, have some chunking mechanism to allow random seeks and compression. These will tend to have the effect of slowing down writing/reading of many small records to some extent (how important this is also depends on the underlying file system, which often will have to read in chunks anyway, but can mitigate this with some sort of cacheing).

It is unlikely that a single data layout will be optimal for all problems. But in general, I can imagine several common data access patterns (in the following, I am assuming the parquet is written out as rows containing a single observation for a single trajectory, and the row will contain all the data for that observation (e.g. lat, lon, age, etc); if this is wrong, let me know and I will update). I am also assuming that there is not a regular order in the parquet data set, so you can't easily calculate which row a given trajectory and observation are at. This is in general true since there is no guarantee that all particles will have the same number of valid observations.

  • All observations of some set of particles. This is easy in zarr now, since it is organized as a (trajectory,obs) matrix and one could read, e.g., lon[someIndex,:]. For parquet, one would have to scan the entire data set.
  • A particular time since release for some set of particles. This is easy in zarr now, since it is organized as a (trajectory,obs) matrix, and one can read lon[someIndex,someObs]. For parquet, one would have to scan the entire data set.
  • A particular model time for all drifters in the ocean at that time. In zarr, if you launch particles at different times you need to find the launching time of all the drifters (time[:,0]) and then use this, and your writing interval, to find which obs to access. (and deal with some edge cases when particles stop between output times). If you launch all the particles at once, it is easy and the same as the "time since release" data access. For parquet, one would have to scan the entire data set. In neither case is access necessarily efficient.

If you are willing to spend some extra disk space, you can make a flat file like parquet or an SQL database much faster by calculating indices into the data set (see SQLLITE or perhaps parquet indices as described in https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/ (I have no experience with parquet indices)). These indices tradeoff disk space for access speed. (Note that SQLLITE has a severe storage penalty because it defaults to double precision floats and long integers, other databases do not have this issue).

But without these indices, I think for many common access patterns, parquet would require you to read the entire data column time or observation for the data set to find the data you are looking for. For example, if you want to find all drifters that start at a given time, you would have to read all of the time and observation column in parquet, and only the time[:,0] in zarr or some other matrix oriented data format. I think this would often be a severe performance penalty.

If parquet ends up being the default output format, it would be very useful for someone (perhaps me) to write a post-processing step that is 1) memory efficient and 2) writes to a matrix oriented format.

One note on polars -- it appears to be in-memory only. Again, that will limit the use on small laptops or for big problems, and so is sub-optimal (or am I wrong about polars?).

Jamie

@oj-tooth
Copy link

Hi Jamie,

I’ll focus on accessing data output in .parquet format given that I know much less about the details of writing trajectory data in Parcels.

Firstly, I completely agree with your point that a single data structure will never be optimal for all users, and therefore would definitely support a post-processing option to output in the traditional matrix format as you suggested. With that said, I would like to tackle your primary concern regarding larger-than-memory trajectory .parquet files and the need to eagerly read an entire trajectory dataset to access individual trajectories or time-slices.

To do this, I’ll consider the example of a collection Lagrangian trajectories released across 24 months in 1990-1991 in the 1/12 degree ORCA0083-N006 ocean sea ice hindcast simulation. The trajectories were evaluated using TRACMASS rather than Parcels and the output data is formatted as you mentioned with each row corresponding to a one observation of multiple variables (columns) of a single trajectory identified with a unique ID. Since TRACMASS writes to our output file in the main loop of the Fortran code, you are also correct that the tabular output data structure does not store trajectories in a regular order, but rather they are stacked on top of one another during runtime. As with many Lagrangian water mass transformation studies, I output the trajectories at every model grid cell crossing rather than at a regular time interval (i.e., every 5-days) since this allows us to calculate Lagrangian stream functions - I’m not sure that this is a feature in Parcels, but I’m just highlighting this for clarity.

In our single .parquet file combining all seeding steps, we have a total of 272,454,859 observations of 10 variables along 834,668 trajectories. To access our trajectories, we’ll start by using the Polars DataFrame library out of the box, and crucially make use of the Lazy API (which is not subject to in-memory constraints) using the example code below:

Importing Example Lagrangian Trajectory LazyFrame

# Define Lagrangian trajectories .parquet file path:
fpath = ".../ORCA12-N006/tracking/eSPNA/run/ORCA0083-N006_eSPNA_1990_run.parquet"

# Scan .parquet file to create a LazyFrame:
df_traj= pl.scan_parquet(file=fpath, rechunk=False)

# Return the schema of the LazyFrame:
print(df_traj.schema)

{'id': Int64, 'x': Float64, 'y': Float64, 'z': Float64, 'subvol': Float64, 'time': Float64, 'boxface': Int64, 'temp': Float64, 'sal': Float64, 'mld': Float64}

Processing our LazyFrame

Next, we will filter the LazyFrame for a single trajectory and extract its temperature. To do this, we'll first need to write a query, this will then be automatically optimised by Polars.

# Defining naive query:
query = df_traj.filter(pl.col('id') == 7948694).select(pl.col('temp'))

# Return optimised query:
print(query.describe_optimized_plan() 

FAST_PROJECT: [temp]
    PARQUET SCAN [.../ORCA12-N006/tracking/eSPNA/run/ORCA0083-N006_eSPNA_1990_run.parquet](.../ORCA12-N006/tracking/eSPNA/run/ORCA0083-N006_eSPNA_1990_run.parquet)
    PROJECT 2/10 COLUMNS
    SELECTION: [(col("id")) == (7948694i64)]

To obtain the result as an eager DataFrame we execute the query using the collect() method. Here, we use streaming to improve performance by executing the query in batches - this is an especially good option for larger-than-memory datasets.

%time print(query.collect(streaming=True))

shape: (1459, 1)
┌──────┐
│ temp │
│ ---  │
│ f64  │
╞══════╡
│ 4.81 │
│ 4.93 │
│ 4.87 │
│ 4.79 │
│ ...  │
│ 5.08 │
│ 5.06 │
│ 5.04 │
│ 5.1  │
└──────┘
CPU times: user 11.1 s, sys: 4.35 s, total: 15.4 s
Wall time: 2.29 s

A more common task might be to calculate some aggregated statistic along each Lagrangian trajectory. For example, if we want to find the minimum temperature along each trajectory we can create a query using the .groupby() and .agg() LazyFrame methods:

# Define naive query to calculate minimum temperature along each trajectory:
%time print(df_traj.groupby(pl.col('id')).agg(pl.col('temp').min()).collect(streaming=True))

shape: (834668, 2)
┌─────────┬───────┐
│ id      ┆ temp  │
│ ---     ┆ ---   │
│ i64     ┆ f64   │
╞═════════╪═══════╡
│ 8683520 ┆ 9.92  │
│ 8749056 ┆ 3.97  │
│ 8454144 ┆ 2.71  │
│ 8257536 ┆ 3.07  │
│ ...     ┆ ...   │
│ 8672703 ┆ 4.21  │
│ 8476095 ┆ 10.03 │
│ 8738239 ┆ 5.22  │
│ 8279487 ┆ 5.67  │
└─────────┴───────┘
CPU times: user 25.5 s, sys: 7.64 s, total: 33.1 s
Wall time: 5.4 s

The Lagrangian Trajectories Toolbox

Naturally, users may feel that learning Polars syntax is a significant barrier to undertaking Lagrangian ocean analysis and so I have been developing a Lagrangian Trajectories Toolbox (LT_Toolbox) that centres around the TrajectoryStore (TrajStore) object. The TrajStore is designed to be both a container for Polars Data/LazyFrames and also includes common methods which would be useful for Lagrangian ocean analysis. This is still in very early development, but below is a quick example where we create a TrajectoryStore object which consists of a TrajFrame (Data/LazyFrame with at least one observation per trajectory) and a SummaryFrame (Data/LazyFrame with a single row per trajectory - designed to store aggregated statistics) and calculate the minimum temperature along a single trajectory:

# Importing LT_Toolbox:
import lt_toolbox as ltt

# Define Lagrangian trajectories .parquet file path:
fpath = ".../ORCA12-N006/tracking/eSPNA/run/ORCA0083-N006_eSPNA_1990_run.parquet"

# Create TrajectoryStore object by reading trajectories from filepath as LazyFrame:
traj = ltt.TrajStore(source=fpath, read_mode='lazy', summary_source=None, rename_cols=None)

# Filter for a single trajectory and get the minimum temperature along stream.
print(traj.filter('id == 7948694').get_min(variable='temp').collect(streaming=True))

<TrajStore object>

----- Trajectory DataFrame -----
Observations: 1459
Variables: ['id', 'x', 'y', 'z', 'subvol', 'time', 'boxface', 'temp', 'sal', 'mld']
<bound method DataFrame.glimpse of shape: (1459, 10)
┌─────────┬─────────┬─────────┬───────┬─────┬─────────┬──────┬───────┬────────┐
│ id      ┆ x       ┆ y       ┆ z     ┆ ... ┆ boxface ┆ temp ┆ sal   ┆ mld    │
│ ---     ┆ ---     ┆ ---     ┆ ---   ┆     ┆ ---     ┆ ---  ┆ ---   ┆ ---    │
│ i64     ┆ f64     ┆ f64     ┆ f64   ┆     ┆ i64     ┆ f64  ┆ f64   ┆ f64    │
╞═════════╪═════════╪═════════╪═══════╪═════╪═════════╪══════╪═══════╪════════╡
│ 7948694 ┆ 3145.25 ┆ 2325.0  ┆ 47.5  ┆ ... ┆ 3       ┆ 4.81 ┆ 35.08 ┆ 244.31 │
│ 7948694 ┆ 3146.0  ┆ 2325.26 ┆ 47.68 ┆ ... ┆ 1       ┆ 4.93 ┆ 35.09 ┆ 307.69 │
│ 7948694 ┆ 3146.71 ┆ 2326.0  ┆ 47.95 ┆ ... ┆ 3       ┆ 4.87 ┆ 35.08 ┆ 267.94 │
│ 7948694 ┆ 3146.73 ┆ 2326.28 ┆ 48.0  ┆ ... ┆ 5       ┆ 4.79 ┆ 35.08 ┆ 303.75 │
│ ...     ┆ ...     ┆ ...     ┆ ...   ┆ ... ┆ ...     ┆ ...  ┆ ...   ┆ ...    │
│ 7948694 ┆ 3151.0  ┆ 2365.62 ┆ 46.53 ┆ ... ┆ 1       ┆ 5.08 ┆ 35.11 ┆ 158.35 │
│ 7948694 ┆ 3151.61 ┆ 2365.0  ┆ 46.45 ┆ ... ┆ 4       ┆ 5.06 ┆ 35.11 ┆ 160.04 │
│ 7948694 ┆ 3151.0  ┆ 2364.3  ┆ 46.35 ┆ ... ┆ 2       ┆ 5.04 ┆ 35.11 ┆ 149.71 │
│ 7948694 ┆ 3150.12 ┆ 2365.0  ┆ 46.43 ┆ ... ┆ 3       ┆ 5.1  ┆ 35.11 ┆ 240.62 │
└─────────┴─────────┴─────────┴───────┴─────┴─────────┴──────┴───────┴────────┘>

----- Summary DataFrame -----
Observations: 1
Variables: ['id', 'subvol', 'temp_min']
<bound method DataFrame.glimpse of shape: (1, 3)
┌─────────┬─────────┬──────────┐
│ id      ┆ subvol  ┆ temp_min │
│ ---     ┆ ---     ┆ ---      │
│ i64     ┆ f64     ┆ f64      │
╞═════════╪═════════╪══════════╡
│ 7948694 ┆ 4989.11 ┆ 4.59     │
└─────────┴─────────┴──────────┘>

Undoubtedly, I'll have missed something above but I hope this at least addresses some of the concerns you outlined above. I'd be especially keen to get any feedback on the LT Toolbox as a developing library for Lagrangian ocean analysis - it's worth saying that I originally wrote this library to work with xarray for .nc or .zarr outputs from Parcels, but have since found myself on an all-consuming journey into learning Polars instead!

Ollie

@JamiePringle
Copy link
Collaborator

JamiePringle commented Jul 20, 2023

I wonder if it makes sense to write out a test big dataset to play with? It is easier to optimize output code without having to get parcels perfect first.

If I were to do so by going from the existing zarr files I have to parquet, it would be nice to have answers to the following questions:

  • To replicate something close to what parcels would do, I guess I should write out the data by observation, then trajectory? I could try to fake the parcels output order by writing out roughly in order of model time.
  • @erikvansebille , you are planning to write all MPI processes to a single parquet file?

If it is easy enough, I might try to make a big parquet file this afternoon. No promises. If I make a big file, where would be a good place to share it? I could perhaps throw it on a web server.

Jamie

OK, I have decided to do this with fast parquet. This will also allow easy experimentation with small numbers of initial particle writes. This can simulate the small-chunk size problem with zarr. I am not sure, but this might have similar issues with small files in parquet. Anyway, it will be easier to play with a code that easily allows going from zarr to parquet than dealing with the whole parcels framework.

@erikvansebille
Copy link
Member Author

erikvansebille commented Jul 20, 2023

Thanks so much for your elaborate thoughts, @JamiePringle and @oj-tooth! Really good to have this open discussion about the (dis)advantages of Parquet before we implement it.

Some quick responses to your specific questions, Jamie:

To replicate something close to what parcels would do, I guess I should write out the data by observation, then trajectory? I could try to fake the parcels output order by writing out roughly in order of model time.

Actually, with bde07a3 the MultiIndex is (trajectory, time). The advantage is that we don't need an obs column anymore (so slightly smaller storage footprint) and that it should(?) be easier to collect all particles at a specific time. The disadvantage is that if the Parquet file is converted to array, this could become a much bigger (but sparse) matrix in cases with many short trajectories released at different times. Not sure what the memory impact of that is.

@erikvansebille , you are planning to write all MPI processes to a single parquet file?

I have't touched MPI yet; will hopefully get to that next week. I first want to see whether we can use Parquet to write during the Kernel-loop (point 4 of the list at the top). Is it ok if I answer this later?

About the Parquet library: I'm now using pyarrow for the Parcels implementation; but will check if fastparquet once the code is consolidated

Note that @michaeldenes is also doing some comparisons between zarr and Parquet output now; he could maybe share one of his large output files for us all to explore?

@erikvansebille erikvansebille marked this pull request as draft July 20, 2023 09:51
@JamiePringle
Copy link
Collaborator

@erikvansebille, of course MPI can wait...

On indexing by time instead of observation, that will be great for somethings but bad for others. However, if the output timing is not really strange, it should be relatively easy to deal with this if you need to. For example, I write out a particle age. This can be used to index on.

Jamie

@erikvansebille
Copy link
Member Author

On indexing by time instead of observation, that will be great for somethings but bad for others. However, if the output timing is not really strange, it should be relatively easy to deal with this if you need to. For example, I write out a particle age. This can be used to index on.

Yep, you're right. Users can always reindex based on what fits their workflow best, using e.g. pandas.DataFrame.reindex() (or the polars equivalent?)

@erikvansebille
Copy link
Member Author

Just a quick update on the Parquet-process: I realised yesterday that Parquet probably won't work for #1388, because we would then need to create one Parquet file for each particle at each tilmestep. That could be millions of files!

I'm now looking into SQLite as a format; since that is optimised for appending rows (particle positions at times). A basic version works in a047dca; where particles are added to the SQLite database in the JIT-Kernel too.

There's still a lot to do in terms of optimisation and testing efficiency, and it might be that this is not the right solution either; but at least we now have a version where we can write output in the Kernels directly (and that is probably better than writing in csv format like most other Lagrangian frameworks do).

So stay tuned!

@JamiePringle
Copy link
Collaborator

JamiePringle commented Jul 21, 2023

Some thoughts.

  • can't fastparquet append to a file? (and pyarrow can if you never close the write statement) I write the former from what I have read, but I have done (and emailed you) code for the later.
  • sqlite floats are limited to double precision, which is an issue for large datasets. It's integer data set is supposed to be efficient, however: https://www.sqlite.org/datatype3.html
  • sqlite file locking becomes a real pain point with multiple processes writing to a file -- but it works IF YOU DON"T PUT THE FILE ON A NFS FILESYSTEM OR SIMILAR.
  • only add indices if desired after writing all data, or everything gets much slower.
  • if you are going to go down the generic SQL database route, perhaps use the generic database interface?
  • you will hit severe performance issues if you update one row at a time. You will want to cache 1000s of updates and then write at once, or wrap many inserts in BEGIN and END blocks (but note that this will block writes from other processes). In general, writing a single record at a time to an SQL database (include sqlite) is much slower than updating them in chunks. This is the same as making the inserts inside a single transaction -- perhaps you can begin and end a transaction every 1 or N time steps?
  • sqlite does not compress output. This can suck for large projects.

Having said all this, I have found SQLITE to robust and mature. It can't do everything, but it does what it says on the tin, and most problems have already been figured out by someone else.

Also, CSV sucks at everything and longevity, and should be avoided.

I hate to mention this... but would all your problems go away if you append each row a plain binary format like fortran unformatted, and then just post processes this journal file in the end? If you have the urge to throw something at me now, I forgive you...

Jamie

@JamiePringle
Copy link
Collaborator

This train may have left the station, but I have done some benchmarking of parquet written as a single file with yarrows and zarr on a file that has 666,361,041 trajectories and 33 observations. The chunking in zarr is (50000, 10) and the row-grouping in parquet is 23359. I have no idea if the later is optimal. All reading of parquet is with Polars, as Ollie suggests. All tests made on a warm cache, but similar patterns obtain on a cold cache (it just takes somewhat longer).

Summary: Parquet & Polars is surprisingly performant, but for my work patterns, Zarr is still much faster.

Task 1: read all starting times (the first trajectory). Zarr takes 20 seconds, Parquet/Polars takes 50 seconds

Task 2: find all starting trajectories for all starting times between months 3 and 4 (about 1/7th of the data set). For zarr, I use the starting times read above. This might seem cheating, but this is the pattern I would use. Zarr 5.4 seconds, Parquet/Polars 75 seconds.

Similar patterns hold for other common tasks, like finding all particle releases 10 days after release for months 3 and 4.

It might be that the data access patterns that spring to my mind are the ones that are easy to do in Zarr since I am used to it. But the performance is significantly, if not overwhelmingly, better than parquet in these cases.

Jamie

@oj-tooth
Copy link

Hi Jamie,

Thanks for undertaking an initial benchmarking exercise, it sounds like you've done a fantastic job of optimising your work with Zarr outputs for your research! Is there any chance you could post the Polars queries you used for Tasks 1 & 2? I'd be interested to see if there are any alternative and more performant options (Polars queries can be quite an art form given the lack of clarity in some of the docs). It's worth also saying that I don't anticipate Polars to be more performant on all tasks (especially those making use of the matrix structure of Zarr), but there are many cases when working with fewer (orders millions) but much more diverse trajectories (large range of observations per trajectory), as I do in the subpolar North Atlantic, where Polars has worked very well.

Thanks again for your update.

Ollie

@JamiePringle
Copy link
Collaborator

Ollie -- of course -- I am sure these are not optimal...

from pylab import *
from numpy import *
import zarr
import fastparquet as fp
import pyarrow as pa
import pyarrow.parquet as papq
import pandas as ps
import polars as pl
import time 


#benchmark some zarr and polars data access

outFile='parquetTest/2008.pq'

#get data
df_traj=pl.scan_parquet(source=outFile,rechunk=False)
print(df_traj.schema)

#get zarr data
inZarrFile=('../EZfate_productionCode/dataPaths/'+
    'westernHemisphere_wholeGlobe_range100km_depthFrom_1m_to_500m_habitatTree_months01_to_12_starts_1m/2008.zarr')
zarrIn=zarr.open(inZarrFile,'rb')

#lets do a query, and get age (so we can see if complete)
if True:
    print('get all start times with age==0 from parquet')
    # Defining naive query:
    query = df_traj.filter(pl.col('age') == 0).select(pl.col('time'))
    #execute plan
    tic=time.time()
    print('   starting to execute query')
    dataCollected=query.collect(streaming=True)
    print('   done in',time.time()-tic, 'returned',dataCollected.shape,'data')

if True:
    print('get all start times with age==0 from zarr')
    tic=time.time()
    dataZarr=zarrIn['time'][:,0]
    print('   done in',time.time()-tic, 'returned',dataZarr.shape,'data\n')
    timeAge0=dataZarr

#ok, now test just getting some times (both in seconds)
timeMin=90.0*86400
timeMax=120.0*86400

if True:
    print('get all trajectories with starts between timeMin and timeMax from parquet')
    # Defining naive query:
    query = df_traj.filter(pl.col('age') == 0).filter(pl.col('time')>=timeMin).filter(pl.col('time')<=timeMax).select(pl.col('trajectory'))
    #execute plan
    tic=time.time()
    print('   starting to execute query')
    dataCollected=query.collect(streaming=True)
    print('   done in',time.time()-tic, 'returned',dataCollected.shape,'data\n')

if True:
    print('get all trajectories with starts between timeMin and timeMax from zarr, CHEATING and using existing start times')
    tic=time.time()
    dataZarr=zarrIn['trajectory'].oindex[logical_and(timeAge0>=timeMin,timeAge0<=timeMax)]
    print('   done in',time.time()-tic, 'returned',dataZarr.shape,'data\n')

@oj-tooth
Copy link

Hi Jamie,

Your tasks definitely appear to be well formulated for working in Zarr! Given that your examples are focused on filtering and selecting data, I've included an alternative approach using Pyarrow directly below. I've found that for filtering large .parquet files, using Pyarrow's read_table() function and including a custom filter can yield significant performance improvements over the Polars query engine. In the example below, I output to a Pandas DataFrame, but you could easily transform this into a Polars LazyFrame if you then wanted to take advantage of their efficient expressions and groupby operations.

Could you let me know if there's an improvement with the Pyarrow approach? I can't imagine it will reach Zarr's level of performance, but I'd be interested to know the difference from your original Polars query.

Ollie

import pandas as pd
import pyarrow.parquet as pq

# Defining output .parquet file:
outFile='parquetTest/2008.pq'

# Task 1:
if True:
    # Apply filter using pyarrow:
    tic=time.time()
    print('   starting to execute query')
    dataCollected=pq.read_table(fpath, columns=['age', 'trajectory'], filters=[('age', '==', 0)]).to_pandas()
    print('   done in',time.time()-tic, 'returned',dataCollected.shape,'data\n')

# Task 2:
if True:
    # Apply filter using pyarrow:
    tic=time.time()
    print('   starting to execute query')
    dataCollected=pq.read_table(fpath, columns=['age', 'time', 'trajectory'], filters=[('age', '==', 0), ('time', '>=', timeMin), ('time', '<=', timeMax)]).to_pandas()
    print('   done in',time.time()-tic, 'returned',dataCollected.shape,'data\n')

@erikvansebille
Copy link
Member Author

erikvansebille commented Jul 27, 2023

OK, so I've done some more testing of the zarr (current master; bf98c97) versus Parquet (23483c5) versions of Parcels.

I used the code below to randomly move 100,00 particles for 100 time steps.

def Moveonly(fieldset, particle, time):
    particle.lon += ParcelsRandom.random()
    particle.lat += ParcelsRandom.random()

fieldset = FieldSet.from_data({'U': 0., 'V': 0.}, {'lat': 0., 'lon': 0.})
npart = int(1e5)
pset = ParticleSet(fieldset, pclass=JITParticle, lon=[0] * npart, lat=[0] * npart)
output_file=pset.ParticleFile(fname, outputdt=1)
pset.execute(kernel, runtime=100, dt=1, output_file=output_file)

The results were (on my MacBook Pro)

Output format Execution time Size of output
No output 1.6 s -
zarr 9.2 s 61Mb
Parquet 47.5 s 138 Mb

So I'm a bit disappointed by Parquet for more complex simulations. For #1388, I've thus decided to go back to zarr format (now #1402), and we'll have to work on another solution for the slowdown for many repeatdt calls.

Note that users can still convert zarr output to Parquet for their own analysis after executing parcels if that improved performance for them; this PR was really about the native format for writing.

@erikvansebille
Copy link
Member Author

Closing this PR as #1402 is the chosen implementation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants