Skip to content

Commit

Permalink
Implemented solution for #20 and #34
Browse files Browse the repository at this point in the history
  • Loading branch information
forman committed Jun 4, 2021
1 parent f1c65e8 commit 0619d17
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 15 deletions.
15 changes: 15 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

### Version 1.1.2 (in development)

* Whether to adjust output metadata (global attributes)
after the last write/append can now be forced by the new
`output.adjust_metadata` setting whose default is `false`.
If set to `true`, this will adjust the
following metadata attributes
- "history"
- "source"
- "time_coverage_start" (and "start_time" if existed before)
- "time_coverage_end" (and "stop_time" if existed before)

In addition, extra metadata (global attributes) can now be added
after the last write/append using the new setting
`output.metadata` whose value is a mapping from attribute
names to values. (#20, #34)

* Fixed a problem that avoided appending datasets that contained variables
with `dtype = "|S1"` (ESA SST CCI). Such variables are written initially,
but then no longer appended at all given that they do not contain the
Expand Down
22 changes: 20 additions & 2 deletions nc2zarr/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

class Converter:
"""
TODO: describe me any py params.
TODO: describe me and my params.
:param input_paths:
:param input_sort_by:
Expand All @@ -58,6 +58,8 @@ class Converter:
If there is no existing dataset, one will be created regardless
of the value of this parameter.
:param output_append_dim:
:param output_adjust_metadata:
:param output_metadata:
:param output_s3:
:param output_custom_postprocessor:
:param dry_run:
Expand All @@ -84,6 +86,8 @@ def __init__(self, *,
output_overwrite: bool = False,
output_append: bool = False,
output_append_dim: str = None,
output_adjust_metadata: bool = False,
output_metadata: Dict[str, Any] = None,
output_s3: Dict[str, Any] = None,
output_retry: Dict[str, Any] = None,
output_custom_postprocessor: str = False,
Expand All @@ -107,7 +111,14 @@ def __init__(self, *,
output_append_dim = input_concat_dim or DEFAULT_OUTPUT_APPEND_DIM_NAME

if output_overwrite and output_append:
raise ConverterError('Output overwrite and append flags cannot be given both.')
raise ConverterError('Output overwrite and append flags '
'cannot be given both.')

if output_metadata \
and (not isinstance(output_metadata, dict)
or any(not isinstance(k, str) for k in output_metadata)):
raise ConverterError('Output metadata must be a mapping '
'from attribute names to values.')

self.input_paths = input_paths
self.input_sort_by = input_sort_by
Expand All @@ -129,6 +140,8 @@ def __init__(self, *,
self.output_overwrite = output_overwrite
self.output_append = output_append
self.output_append_dim = output_append_dim
self.output_adjust_metadata = output_adjust_metadata
self.output_metadata = output_metadata
self.output_s3 = output_s3
self.output_retry = output_retry
self.dry_run = dry_run
Expand Down Expand Up @@ -168,9 +181,12 @@ def _run(self):
output_overwrite=self.output_overwrite,
output_append=self.output_append,
output_append_dim=self.output_append_dim,
output_adjust_metadata=self.output_adjust_metadata,
output_metadata=self.output_metadata,
output_s3_kwargs=self.output_s3,
output_retry_kwargs=self.output_retry,
input_decode_cf=self.input_decode_cf,
input_paths=self.input_paths,
dry_run=self.dry_run)

append = None
Expand All @@ -179,3 +195,5 @@ def _run(self):
writer.write_dataset(output_dataset, encoding=output_encoding, append=append)
input_dataset.close()
append = True

writer.finalize()
13 changes: 13 additions & 0 deletions nc2zarr/res/config-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ output:
# Append dimension. Defaults to input/concat_dim or "time".
append_dim: false

# Whether to adjust output metadata (global attributes)
# After the last write/append. This will adjust the
# following global attributes
# - "history"
# - "source"
# - "time_coverage_start" (and "start_time" if existed before)
# - "time_coverage_end" (and "stop_time" if existed before)
adjust_metadata: false

# Extra metadata (global attributes) used to update
# after the last write/append.
metadata: { }

# Object storage file system configuration.
# If given, content are the keyword arguments passed to s3fs.S3FileSystem().
# See https://s3fs.readthedocs.io/en/latest/api.html#s3fs.core.S3FileSystem
Expand Down
92 changes: 84 additions & 8 deletions nc2zarr/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import datetime
import os.path
from typing import Dict, Any
from typing import Dict, Any, Sequence

import fsspec
import fsspec.implementations.local
import numpy as np
import pandas as pd
import retry.api
import xarray as xr
import numpy as np
import zarr

from .constants import DEFAULT_OUTPUT_APPEND_DIM_NAME
from .constants import DEFAULT_OUTPUT_RETRY_KWARGS
from .custom import load_custom_func
from .log import LOGGER
from .log import log_duration
from .version import version


class DatasetWriter:
Expand All @@ -45,9 +49,12 @@ def __init__(self,
output_overwrite: bool = False,
output_append: bool = False,
output_append_dim: str = None,
output_adjust_metadata: bool = False,
output_metadata: Dict[str, Any] = None,
output_s3_kwargs: Dict[str, Any] = None,
output_retry_kwargs: Dict[str, Any] = None,
input_decode_cf: bool = False,
input_paths: Sequence[str] = None,
dry_run: bool = False):
if not output_path:
raise ValueError('output_path must be given')
Expand All @@ -62,9 +69,12 @@ def __init__(self,
self._output_overwrite = output_overwrite
self._output_append = output_append
self._output_append_dim = output_append_dim or DEFAULT_OUTPUT_APPEND_DIM_NAME
self._output_adjust_metadata = output_adjust_metadata
self._output_metadata = output_metadata
self._output_s3_kwargs = output_s3_kwargs
self._output_retry_kwargs = output_retry_kwargs or DEFAULT_OUTPUT_RETRY_KWARGS
self._input_decode_cf = input_decode_cf
self._input_paths = input_paths
self._dry_run = dry_run
if output_s3_kwargs or output_path.startswith('s3://'):
self._fs = fsspec.filesystem('s3', **(output_s3_kwargs or {}))
Expand All @@ -86,6 +96,35 @@ def write_dataset(self,
logger=LOGGER,
**self._output_retry_kwargs)

def finalize(self):
adjusted_metadata = {}
if self._output_adjust_metadata:
# Get new attribute values
with xr.open_zarr(self._output_store, decode_cf=True) as dataset:
history = self._get_history_metadata(dataset)
source = self._get_source_metadata(dataset)
time_coverage_start, time_coverage_end = \
self._get_time_coverage_metadata(dataset)
adjusted_data = (
('history', history, False),
('source', source, False),
('time_coverage_start', time_coverage_start, False),
('time_coverage_end', time_coverage_end, False),
('start_time', time_coverage_start, True),
('stop_time', time_coverage_end, True),
)
adjusted_metadata = {k: v
for k, v, f in adjusted_data
if v is not None
and not f or k in dataset.attrs}
if self._output_metadata:
adjusted_metadata.update(self._output_metadata)

if adjusted_metadata:
# Externally modify attributes
with zarr.open_group(self._output_store, cache_attrs=False) as group:
group.attrs.update(adjusted_metadata)

def _write_dataset(self,
ds: xr.Dataset,
encoding: Dict[str, Any] = None,
Expand All @@ -98,12 +137,6 @@ def _write_dataset(self,
else:
self._append_dataset(ds)

# def close(self):
# if self._output_store is not None \
# and hasattr(self._output_store, 'close') \
# and callable(self._output_store.close):
# self._output_store.close()

def _ensure_store(self):
if self._output_store is None:
self._output_path_exists = self._fs.isdir(self._output_path)
Expand Down Expand Up @@ -172,3 +205,46 @@ def _remove_variable_attrs(cls, ds: xr.Dataset) -> xr.Dataset:
for k, v in ds.variables.items():
v.attrs = dict()
return ds

def _get_source_metadata(self, dataset: xr.Dataset):
source = None
if self._input_paths:
# Note, currently we only name root sources, our NetCDF files.
nc_paths = ', '.join(path for path in self._input_paths if path.endswith('.nc'))
source = dataset.attrs.get('source')
source = ((source + ',\n') if source else '') + nc_paths
return source

@classmethod
def _get_history_metadata(cls, dataset: xr.Dataset):
now = _np_timestamp_to_str(np.array(datetime.datetime.utcnow(), dtype=np.datetime64))
present = f"{now}: converted by nc2zarr, version {version}"
history = dataset.attrs.get("history")
return ((history + '\n') if history else '') + present

@classmethod
def _get_time_coverage_metadata(cls, dataset: xr.Dataset):
time_coverage_start, time_coverage_end = None, None
if 'time' in dataset:
time = dataset['time']
bounds = time.attrs.get('bounds', 'time_bnds')
if bounds in dataset \
and dataset[bounds].ndim == 2 \
and dataset[bounds].shape[1] == 2:
time_bnds = dataset[bounds]
time_coverage_start = _xr_timestamp_to_str(time_bnds[0, 0])
time_coverage_end = _xr_timestamp_to_str(time_bnds[-1, 1])
else:
time_coverage_start = _xr_timestamp_to_str(dataset.time[0])
time_coverage_end = _xr_timestamp_to_str(dataset.time[-1])
return time_coverage_start, time_coverage_end


def _xr_timestamp_to_str(time_scalar: xr.DataArray):
return _np_timestamp_to_str(time_scalar.values.item())


def _np_timestamp_to_str(time_scalar: np.ndarray):
# noinspection PyTypeChecker
return pd.to_datetime(time_scalar, utc=True) \
.strftime("%Y-%m-%d %H:%M:%S")
17 changes: 15 additions & 2 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def delete_path(path, ignore_errors=False):
def new_test_dataset(w: int = 36,
h: int = 18,
day: int = None,
chunked: bool = False) -> xr.Dataset:
chunked: bool = False,
add_time_bnds: bool = False) -> xr.Dataset:
res = 180 / h
lon = xr.DataArray(np.linspace(-180 + res, 180 - res, num=w), dims=('lon',))
lat = xr.DataArray(np.linspace(-90 + res, 90 - res, num=h), dims=('lat',))
Expand All @@ -122,7 +123,19 @@ def new_test_dataset(w: int = 36,
calendar="proleptic_gregorian",
units="seconds since 1970-01-01 00:00:00"
)
coords = dict(lon=lon, lat=lat, time=time)
if add_time_bnds:
time.attrs['bounds'] = 'time_bnds'
time_bnds = xr.DataArray(np.array([['2020-12-{:02d}T09:30:00'.format(day),
'2020-12-{:02d}T10:30:00'.format(day)]],
dtype='datetime64[s]'),
dims=('time', 'bnds'))
time_bnds.encoding.update(
calendar="proleptic_gregorian",
units="seconds since 1970-01-01 00:00:00"
)
coords = dict(lon=lon, lat=lat, time=time, time_bnds=time_bnds)
else:
coords = dict(lon=lon, lat=lat, time=time)

r_ui16 = xr.DataArray(
np.random.randint(0, 1000, size=var_shape).astype(dtype=np.uint16),
Expand Down
27 changes: 24 additions & 3 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,28 @@ def test_no_inputs(self):
Converter()
self.assertEqual('At least one input must be given.', f'{cm.exception}')

def test_both_append_dim_and_concat_dim(self):
with self.assertRaises(ConverterError) as cm:
def test_both_output_append_dim_and_overwrite(self):
with self.assertRaises(ConverterError) as e:
Converter(input_paths='inputs/*.nc', output_overwrite=True, output_append=True)
self.assertEqual('Output overwrite and append flags cannot be given both.', f'{cm.exception}')
self.assertEqual(('Output overwrite and append '
'flags cannot be given both.',),
e.exception.args)

def test_invalid_output_metadata(self):
self.add_path('my.zarr')

with self.assertRaises(ConverterError) as e:
# noinspection PyTypeChecker
Converter(input_paths='inputs/*.nc',
output_metadata=[('comment', 'This dataset is crap.')])
self.assertEqual(('Output metadata must be a '
'mapping from attribute names to values.',),
e.exception.args)

with self.assertRaises(ConverterError) as e:
# noinspection PyTypeChecker
Converter(input_paths='inputs/*.nc',
output_metadata={12: 'This dataset is crap.'})
self.assertEqual(('Output metadata must be a '
'mapping from attribute names to values.',),
e.exception.args)
52 changes: 52 additions & 0 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,58 @@ def test_local_dry_run(self):
writer.write_dataset(ds)
self.assertFalse(os.path.isdir('out.zarr'))

def test_finalize_adjusts_metadata(self):
self.add_path('my.zarr')
writer = DatasetWriter('my.zarr',
output_append=True,
output_adjust_metadata=True,
input_paths=['a.nc', 'z.zarr', 'b.nc'])
for i in range(3):
ds = new_test_dataset(day=i + 1)
writer.write_dataset(ds)
with xr.open_zarr('my.zarr') as ds:
self.assertNotIn('history', ds.attrs)
self.assertNotIn('source', ds.attrs)
self.assertNotIn('time_coverage_start', ds.attrs)
self.assertNotIn('time_coverage_end', ds.attrs)
writer.finalize()
with xr.open_zarr('my.zarr') as ds:
self.assertIn('history', ds.attrs)
self.assertIn('source', ds.attrs)
self.assertEqual('a.nc, b.nc', ds.attrs['source'])
self.assertIn('time_coverage_start', ds.attrs)
self.assertEqual('2020-12-01 10:00:00', ds.attrs['time_coverage_start'])
self.assertIn('time_coverage_end', ds.attrs)
self.assertEqual('2020-12-03 10:00:00', ds.attrs['time_coverage_end'])

def test_finalize_adjusts_metadata_with_time_bnds(self):
self.add_path('my.zarr')
writer = DatasetWriter('my.zarr', output_append=True, output_adjust_metadata=True)
for i in range(3):
ds = new_test_dataset(day=i + 1, add_time_bnds=True)
writer.write_dataset(ds)
writer.finalize()
with xr.open_zarr('my.zarr') as ds:
self.assertIn('time_coverage_start', ds.attrs)
self.assertEqual('2020-12-01 09:30:00', ds.attrs['time_coverage_start'])
self.assertIn('time_coverage_end', ds.attrs)
self.assertEqual('2020-12-03 10:30:00', ds.attrs['time_coverage_end'])

def test_finalize_updates_metadata(self):
self.add_path('my.zarr')
writer = DatasetWriter('my.zarr',
output_append=True,
output_metadata=dict(comment='This dataset is crap.'))
for i in range(3):
ds = new_test_dataset(day=i + 1)
writer.write_dataset(ds)
with xr.open_zarr('my.zarr') as ds:
self.assertNotIn('comment', ds.attrs)
writer.finalize()
with xr.open_zarr('my.zarr') as ds:
self.assertIn('comment', ds.attrs)
self.assertEqual('This dataset is crap.', ds.attrs['comment'])

def test_local_dry_run_for_existing(self):
self.add_path('my.zarr')
ds = new_test_dataset(day=1)
Expand Down

0 comments on commit 0619d17

Please sign in to comment.