Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ environment.yml.lock
marimo/_static/
marimo/_lsp/
__marimo__/

data_processing/data
data_access/imerg_day/
data_access/test_imerg.h5
data_access/data/
81 changes: 81 additions & 0 deletions data_processing/daily_processing_era5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
## Daily Loop

I prepared a script that can be called via the command line. All relevant parameters can be passed when calling the script. To retrieve the data required in the exercise, simply call:

`python daily_processing_era5.py --save_path <your_path> --start_date 2024-12-01 --end_date 2024-12-05`.

To specify the other parameters, use the `--levels`, `--params`, `--shortname` and `--nsides` parameters. For list parameters, supply a space-separated list.

While not explicitly mentioned in the exercise description, I added a start and end date for retrieving data within a specified interval, which is needed for the rest of the task. I thus implemented in the following way:
- start and end date are optional arguments; if no start date is given, the oldest record is used; if no end date is given, the current date is used
- the function then checks the output zarr and looks for any missing dates to fetch between the start and end date - if there are inconsistencies between the dates present for different NSIDE values, these will be mitigated after script execution, making it possible to add different NSIDE values to the same zarr after the initial creation.

More formally:

Assume a zarr store already existis with NSIDE values A, B, C.

- nside A has data for dates [a, b, c, d, e]
- nside B has data for dates [a, b, c]
- nside C has data for dates [c, d, e]

The user requests time-interval from a-g, so
- for nside A, [f, g] need to be added
- nside B, [c, d, e, f, g] need to be added
- nside C, [a, b, c, f, g] need to be added

Thus, only date c will be ommited from the processing loop. After execution, all NSIDE groups will contain the data for the whole time-interval a-g.

- still possible to pass single date to retrieve data for a single day


## CDS API Request

I used this [page](https://confluence.ecmwf.int/display/UDOC/ENS%3A+Atmospheric+%28enfo%29%2C+Perturbed+Forecast+%28pf%29+for+single+level+%28sfc%29%3A+Guidelines+to+write+efficient+MARS+requests) as reference to construct the required request to the CDS API.:

- Contrary to the example notebook (load_ecmwf_era5.ipynb), we do not need forecasting data, so we load the analysis data `"class": "ea"`.
- As required, we only load single days
- For 6-hourly intervals, we need to define the exact time-points for `"time"` parameter
- the pressure-levels and varaibles are defined flexibly, s.t. any available configuration can be fetched.
**NOTE**: in the rest of the processing (from netCDF to zarr), I am using the parameter-shortname, which can also be passed to the function. While the retrieval allows for multiple variables, only the variable passed to `param_shortname` will be saved in the .zarr.
- In the lables listed [here](https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Table1), the correct IDs and shortnames for the variables can be found. For humidity, the correct identifier is `133.128` and the parameter-shortname is `q`.
- I retrieve the data in netCDF format, which can then easily be loaded using `xarray`.


## Transforming map data to Healpix

To map the data to a healpix grid, I followed the official [tutorial](https://healpy.readthedocs.io/en/latest/tutorial.html). There also exists [another tutorial](https://gist.github.com/zonca/680c68c3d60697eb0cb669cf1b41c324) linked on the official documentation page, specifically meant for transforming map data into healpix data. However, in this toy example, the authors have a much smaller scale origin data, yielding a 1-to-1 mapping between map-values and healpix-grid indices.

In our case, its not as simple, as we seem to have a N-to-1 relation between map-values and grid-indices, i.e. multiple indices are mapped to the same grid cell. To illustrate, consider we have $N$ data-values that are assigned to a single healpix-cell $c$. If we use the approach from the tutorial, only one of the $N$ values will be assigned to $c$. This might be seen as a random sampling approach, where a single value represents the whole group of $N$ values. However, a more reasonable approach would be to assign the average of all $N$ values to $c$, which is what I chose to implement.

I thus create a single healpix map for every combination of pressure-level, time-step and NSIDE.

## Saving in Zarr format

The Healpix data is written to one Zarr store, with **one group per spatial resolution**:

* `nside=8`
* `nside=16`

Different NSIDE values correspond to different grids and is therefore kept separate. Each group can be loaded independently.

Within each `nside` group, the data is stored as an `xarray.Dataset` with dimensions:

* `time` (6-hourly time steps),
* `level` (pressure levels),
* `pix` (Healpix pixel index).

The data variable (e.g. `q`) has shape `(time, level, pix)`.

**Chunking strategy**

I chose the chunking strategy to match typical access patterns:

* `time: 1` – enables efficient appending and time-based subsetting,
* `level: 1` – allows loading individual pressure levels,
* `pix: -1` – stores the full Healpix map in a single chunk.

This results in chunks of shape `(1, 1, Npix)`, which avoids fragmenting the spatial dimension while keeping temporal and vertical access efficient.

**Appending/Inserting data**

Data is appended to the dataset along the time domain for each NSIDE value. For this to work correctly, the same level configuration needs to be chosen. Note, if the dataset for an arbitrary NSIDE value has gaps (i.e. a day is missing), in the next execution of the script, this day will be processed and appended, but not _inserted_ in the traditional sense. This will result in a possibly unordered time-list within the dataset, which can be easily fixed by sorting this variable when loading the data.
209 changes: 209 additions & 0 deletions data_processing/daily_processing_era5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import os
import datetime
import argparse
import numpy as np
import healpy as hp
import xarray as xr
import cdsapi

def already_processed(zarr_path, nside_list):
nside_dict = {nside: np.array([], dtype="datetime64[D]") for nside in nside_list}

if not os.path.exists(zarr_path):
return np.array([], dtype="datetime64[D]"), nside_dict

per_nside_sets = []

for nside in nside_list:
group = f"nside={nside}"
try:
hp_ds = xr.open_zarr(zarr_path, group=group)
except (KeyError, FileNotFoundError):
# missing group → no date is fully processed
return np.array([], dtype="datetime64[D]"), nside_dict

dates = np.unique(hp_ds.time.values.astype("datetime64[D]"))
nside_dict[nside] = dates
per_nside_sets.append(set(dates))

# intersection: only dates present in ALL nsides
processed_dates = sorted(set.intersection(*per_nside_sets))

return np.array(processed_dates, dtype="datetime64[D]"), nside_dict


def process_data(save_path: str, start_date: str = "1940-01-01", end_date: str = None, date: str = None,
level_list: list = [975, 900, 800, 500, 300],
param_list: list = ["133.128"], # specific humidity
param_shortname: str = "q",
nside_list: list = [8, 16]
):

zarr_file = f"era5_healpix_{param_shortname}.zarr"
zarr_path = os.path.join(save_path, zarr_file)

if not os.path.exists(save_path):
os.makedirs(save_path)

already_processed_dates, dates_by_nside = already_processed(zarr_path, nside_list)

if date is not None: # process single date
dates_to_process = [date] if np.datetime64(date, "D") not in already_processed_dates else []
else:
if end_date is None: # process until today
end_date = datetime.datetime.now().strftime("%Y-%m-%d")
start_dt = datetime.datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.datetime.strptime(end_date, "%Y-%m-%d")

# check that start date is before end date
assert start_dt <= end_dt, "Start date must be before end date"

delta = end_dt - start_dt
dates_to_process = [(start_dt + datetime.timedelta(days=i)).strftime("%Y-%m-%d") for i in range(delta.days + 1)]
dates_to_process = [d for d in dates_to_process if np.datetime64(d, "D") not in already_processed_dates]

c = cdsapi.Client()
for d in dates_to_process:

# fetch from cds api
file_pth = os.path.join(save_path, f"{d}.nc")
c.retrieve("reanalysis-era5-complete", {
"class": "ea",
"type": "an", # analysis data
"stream": "oper",
"expver": "1",
"levtype": "pl",
"date": d, # date in YYYY-MM-DD
"time": "00:00:00/06:00:00/12:00:00/18:00:00", # 6-hourly
"levelist": "/".join([str(lv) for lv in level_list]),
"param": "/".join(param_list),
"grid": "5.625/5.625",
"format": "netcdf"
}, file_pth)

# load data
ds = xr.open_dataset(file_pth)

## transform to healpix
maps = {nside: [] for nside in nside_list}
times = {nside: [] for nside in nside_list}
levels = {nside: [] for nside in nside_list}

pressure_levels = ds.pressure_level.values
time_steps = ds.valid_time.values
for t in time_steps:
for p in pressure_levels:

selected_data = ds[param_shortname].sel(
valid_time=t,
pressure_level=p
)
data = selected_data.values
lon = selected_data.longitude.values
lat = selected_data.latitude.values

lon_grid, lat_grid = np.meshgrid(lon, lat)
theta = np.deg2rad(90 - lat_grid) # Converts latitude to colatitude
phi = np.deg2rad(lon_grid)

for nside in nside_list:

if np.datetime64(t, "D") in dates_by_nside[nside]:
continue # skip already processed dates for this nside

pixel_indices = hp.ang2pix(nside, theta, phi)
m = np.zeros(hp.nside2npix(nside))
counts = np.zeros(hp.nside2npix(nside))

# accumulate data assigned to every possition and count occurrences
np.add.at(m, pixel_indices.ravel(), data.ravel())
np.add.at(counts, pixel_indices.ravel(), 1)

# average summed data by number of occurrences
mask = counts > 0
m[mask] /= counts[mask]

maps[nside].append(m)
times[nside].append(np.datetime64(t))
levels[nside].append(int(p))

## save as zarr
for nside, data_list in maps.items(): # group by nside

if len(data_list) == 0:
continue # nothing new for this nside

data = np.stack(data_list, axis=0) # stack pixmaps
times_ = times[nside]
levels_ = levels[nside]

ds_hp = xr.Dataset(
{
param_shortname: (("sample", "pix"), data)
},
coords={
"sample": np.arange(len(data_list)),
"time": ("sample", times_),
"level": ("sample", levels_),
"pix": np.arange(data.shape[1]),
}
)

ds_hp = (
ds_hp
.set_index(sample=("time", "level"))
.unstack("sample")
.chunk(
{
"time": 1,
"level": 1,
"pix": -1
}
)
)

group = f"nside={nside}"
group_path = os.path.join(zarr_path, group)

if not os.path.exists(group_path):
# first write → create dataset
ds_hp.to_zarr(
zarr_path,
group=group,
mode="w"
)
else:
# subsequent days → append
ds_hp.to_zarr(
zarr_path,
group=group,
mode="a",
append_dim="time"
)

## remove temp file
# os.remove(file_pth)

if __name__ == "__main__":

parser = argparse.ArgumentParser()
parser.add_argument('--save_path', dest='save_path', type=str, help='Path to save data', required=True)
parser.add_argument('--start_date', dest='start_date', type=str, help='Add start date', default="1940-01-01")
parser.add_argument('--end_date', dest='end_date', type=str, help='Add end date', default=None)
parser.add_argument('--date', dest='date', type=str, help='Add specific date', default=None)
parser.add_argument("--levels", nargs="+", type=int, help="Pressure levels", default=[975, 900, 800, 500, 300])
parser.add_argument("--params", nargs="+", type=str, help="Parameter codes", default=["133.128"]) # specific humidity
parser.add_argument("--shortname", type=str, help="Parameter shortname", default="q")
parser.add_argument("--nsides", nargs="+", type=int, help="Healpix nside values", default=[8, 16])

args = parser.parse_args()

process_data(save_path=args.save_path,
start_date=args.start_date,
end_date=args.end_date,
date=args.date,
level_list=args.levels,
param_list=args.params,
param_shortname=args.shortname,
nside_list=args.nsides
)
Loading