Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
0.3 (unreleased)
================

- Add `add_aggregation` and `add_variable_agg` to `Dataset` class. By @huard
- Add `add_scan` to `Dataset` class. By @huard
- Closing the dataset returned by `open_ncml` will close the underlying opened files. By @huard


Expand Down
41 changes: 41 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,47 @@ def test_remove_variable():
assert expected == res


def test_add_aggregation():
nc = xncml.Dataset(input_file)
nc.add_aggregation('new_dim', 'joinNew')
nc.add_variable_agg('new_dim', 'newVar')

expected = [
OrderedDict(
[
('@dimName', 'new_dim'),
('@type', 'joinNew'),
('variableAgg', [OrderedDict([('@name', 'newVar')])]),
]
)
]
res = nc.ncroot['netcdf']['aggregation']

assert expected == res


def test_add_scan():
nc = xncml.Dataset(input_file)
nc.add_aggregation('new_dim', 'joinExisting')
nc.add_scan('new_dim', location='foo', suffix='.nc')

expected = [
OrderedDict(
[
('@dimName', 'new_dim'),
('@type', 'joinExisting'),
(
'scan',
[OrderedDict([('@location', 'foo'), ('@subdirs', 'true'), ('@suffix', '.nc')])],
),
]
)
]

res = nc.ncroot['netcdf']['aggregation']
assert expected == res


def test_to_ncml():
nc = xncml.Dataset(input_file)
with tempfile.NamedTemporaryFile(suffix='.ncml') as t:
Expand Down
141 changes: 140 additions & 1 deletion xncml/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import OrderedDict
from enum import Enum
from pathlib import Path
from typing import Any
from warnings import warn
Expand Down Expand Up @@ -52,8 +53,124 @@ def __init__(self, filepath: str = None, location: str = None):
def __repr__(self):
return xmltodict.unparse(self.ncroot, pretty=True)

# Variable
# Aggregations and scans
def add_aggregation(
self, dim_name: str, type_: str, recheck_every: str = None, time_units_change: bool = None
):
"""Add aggregation.

Parameters
----------
dim_name : str
Dimension name.
type_ : str
Aggregation type.
recheck_every : str
Time interval for rechecking the aggregation. Only used if `type_` is `AggregationType.scan`.
time_units_change : bool
Whether the time units change. Only used if `type_` is `AggregationType.scan`.
"""
at = AggregationType(type_)
item = OrderedDict(
{
'@dimName': dim_name,
'@type': at.value,
'@recheckEvery': recheck_every,
'@timeUnitsChange': time_units_change,
}
)
item = preparse(item)

aggregations = self.ncroot['netcdf'].get('aggregation', [])
for agg in aggregations:
if agg['@dimName'] == dim_name:
agg.update(item)
break
else:
aggregations.append(item)
self.ncroot['netcdf']['aggregation'] = aggregations

def add_variable_agg(self, dim_name: str, name: str):
"""Add variable aggregation.

Parameters
----------
dim_name: str
Dimension name for the aggregation.
name : str
Variable name.
"""
item = OrderedDict({'@name': name})
aggregations = self.ncroot['netcdf'].get('aggregation')
for agg in aggregations:
if agg['@dimName'] == dim_name:
variables = agg.get('variableAgg', [])
for var in variables:
if var['@name'] == name:
var.update(item)
break
else:
variables.append(item)
agg['variableAgg'] = variables

def add_scan(
self,
dim_name: str,
location: str,
reg_exp: str = None,
suffix: str = None,
subdirs: bool = True,
older_than: str = None,
date_format_mark: str = None,
enhance: bool = None,
):
"""
Add scan element.

Parameters
----------
dim_name : str
Dimension name.
location : str
Location of the files to scan.
reg_exp : str
Regular expression to match the full pathname of files.
suffix : str
File suffix.
subdirs : bool
Whether to scan subdirectories.
older_than : str
Older than time interval.
date_format_mark : str
Date format mark.
enhance : bool
Whether to enhance the scan.
"""
item = OrderedDict(
{
'@location': location,
'@regExp': reg_exp,
'@suffix': suffix,
'@subdirs': subdirs,
'@olderThan': older_than,
'@dateFormatMark': date_format_mark,
'@enhance': enhance,
}
)

item = preparse(item)

# An aggregation must exist for the scan to be added.
for agg in self.ncroot['netcdf'].get('aggregation'):
if agg['@dimName'] == dim_name:
scan = agg.get('scan', [])
scan.append(item)
agg['scan'] = scan
break
else:
raise ValueError(f'No aggregation found for dimension {dim_name}.')

# Variable
def add_variable_attribute(self, variable, key, value, type_='String'):
"""Add variable attribute.

Expand Down Expand Up @@ -443,3 +560,25 @@ def _is_coordinate(var):
return True

return False


def preparse(obj: dict) -> dict:
"""
- Remove None values from dictionary.
- Convert booleans to strings.
"""
for k, v in obj.items():
if isinstance(v, bool):
obj[k] = str(v).lower()
return {k: v for k, v in obj.items() if v is not None}


class AggregationType(Enum):
"""Type of aggregation."""

FORECAST_MODEL_RUN_COLLECTION = 'forecastModelRunCollection'
FORECAST_MODEL_RUN_SINGLE_COLLECTION = 'forecastModelRunSingleCollection'
JOIN_EXISTING = 'joinExisting'
JOIN_NEW = 'joinNew'
TILED = 'tiled'
UNION = 'union'