Skip to content

Commit

Permalink
Merge pull request #8 from norlandrhagen/debug
Browse files Browse the repository at this point in the history
eNATL feedstock
  • Loading branch information
jbusecke authored Jul 12, 2024
2 parents f738543 + 2da0dce commit 97b8b78
Showing 1 changed file with 27 additions and 34 deletions.
61 changes: 27 additions & 34 deletions feedstock/eNATL60.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,58 @@
"""
...
"""
import logging
import xarray as xr
import apache_beam as beam
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
Indexed,
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
T,
)

logger = logging.getLogger(__name__)

# Common Parameters
days = range(1, 32)
dataset_url = 'https://zenodo.org/records/10513552/files'
dataset_url = "https://zenodo.org/records/10513552/files"

## Monthly version
input_urls = [f'{dataset_url}/eNATL60-BLBT02_y2009m07d{d:02d}.1d_TSWm_60m.nc' for d in days]
pattern = pattern_from_file_sequence(input_urls, concat_dim='time')
input_urls = [
f"{dataset_url}/eNATL60-BLBT02_y2009m07d{d:02d}.1d_TSWm_60m.nc" for d in days
]
pattern = pattern_from_file_sequence(input_urls, concat_dim="time")


# does this succeed with all coords stripped?
class Preprocess(beam.PTransform):
"""Custom transform to fix invalid time dimension"""

@staticmethod
def _set_coords(item: Indexed[T]) -> Indexed[T]:
index, ds = item
logger.info(f"Index is {index=}")
logger.info(f"Dataset before processing {ds=}")
logger.info(f"Time counter data : {ds.time_counter.data}")
# could try using cftime to force
# create t_new as variable
t_new = xr.DataArray(ds.time_counter.data, dims=['time'])
logger.info(f"New Time Dimension {t_new=}")
def _set_coords(ds: xr.Dataset) -> xr.Dataset:
t_new = xr.DataArray(ds.time_counter.data, dims=["time"])
ds = ds.assign_coords(time=t_new)
ds = ds.drop(['time_counter'])
ds = ds.set_coords(['deptht', 'depthw', 'nav_lon', 'nav_lat', 'tmask'])

return index, ds

# ds = ds.set_coords(['deptht', 'depthw', 'nav_lon', 'nav_lat', 'time_counter', 'tmask'])
# ds = ds.assign_coords(
# tmask=ds.coords['tmask'].squeeze(), deptht=ds.coords['deptht'].squeeze()
# )
ds = ds.drop(["time_counter"])
ds = ds.set_coords(["deptht", "depthw", "nav_lon", "nav_lat", "tmask"])

return ds

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | 'Set coordinates' >> beam.Map(self._set_coords)
return pcoll | "Fixes time coord" >> beam.MapTuple(
lambda k, v: (k, self._set_coords(v))
)


eNATL60_BLBT02 = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(xarray_open_kwargs = {'use_cftime':True})
| OpenWithXarray(
xarray_open_kwargs={"use_cftime": True, "engine": "netcdf4"},
load=True,
copy_to_local=True,
)
| Preprocess()
| StoreToZarr(
store_name='eNATL60_BLBT02.zarr',
store_name="eNATL60_BLBT02.zarr",
combine_dims=pattern.combine_dim_keys,
target_chunks={'x':2000, 'y':2000, 'time':2},
target_chunks={"x": 2000, "y": 2000, "time": 2},
)
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
)

0 comments on commit 97b8b78

Please sign in to comment.