-
Notifications
You must be signed in to change notification settings - Fork 5
PR suggestions for from_znap_dataset #106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 84-support-snap
Are you sure you want to change the base?
Changes from all commits
badd8af
a4cf66b
daa1a7b
4da10dd
53b1eb7
62b5fbb
af78740
b62dcb6
04a3baa
873d7c5
01d09f9
c4ab050
07e278d
b851c51
837a5af
121252c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,10 +25,12 @@ | |
| RE_PATTERNS_DORIS5, | ||
| RE_PATTERNS_DORIS5_IFG, | ||
| RE_PATTERNS_SNAP, | ||
| RE_PATTERNS_SNAP_DATALAYER, | ||
| TIME_FORMAT_DORIS4, | ||
| TIME_FORMAT_DORIS5, | ||
| TIME_FORMAT_SNAP, | ||
| TIME_STAMP_KEY, | ||
| ZNAP_DATA_VAR_MOTHER, | ||
| _dtypes, | ||
| _memsize_chunk_mb, | ||
| ) | ||
|
|
@@ -228,7 +230,7 @@ def from_snap_dataset(snap_znap_archives: list[str, Path]) -> xr.Dataset: | |
| """ | ||
| # Check if snap_znap_archives is a non empty Iterable and not a string | ||
| if not hasattr(snap_znap_archives, "__iter__") or isinstance( | ||
| snap_znap_archives, str | ||
| snap_znap_archives, str | ||
| ): | ||
| raise ValueError( | ||
| "snap_znap_archives should be a non-empty Iterable and not a string." | ||
|
|
@@ -238,176 +240,125 @@ def from_snap_dataset(snap_znap_archives: list[str, Path]) -> xr.Dataset: | |
| if len(snap_znap_archives) == 0: | ||
| raise ValueError("snap_znap_archives should be a non-empty Iterable.") | ||
|
|
||
| # in the .znaps, the dimension x is range, y is azimuth | ||
| # xt and yt are interpolated values at coarse resolution | ||
| full_data = {} | ||
| mother_epoch = None | ||
|
|
||
| # Loop over all ZNAP archives and read into ds_stack | ||
| ds_stack = None # Stack of all epochs as xr.Dataset | ||
| data_mother = None # Mother epoch specific xr.Dataset | ||
| metadata_file = None # Metadata file, read from mother epoch | ||
| epoch_file_dict = {} # Map of epoch to file. snap_znap_archives may not be sorted | ||
| for file in snap_znap_archives: | ||
| data = xr.open_zarr(file, consolidated=False) | ||
| data_layers = data.original_raster_data_node_order | ||
| cur_epoch = data_layers[0].split("_")[-1] | ||
| epoch = datetime.strptime(cur_epoch, "%d%b%Y").strftime("%Y%m%d") | ||
|
|
||
| # there are two types of layers: full data layers [x, y], and tiepoint [xt, yt] | ||
| # we only preserve the full data layers, as the others cannot be placed | ||
| cleaned_data_layer = {} | ||
| for layer in data_layers: | ||
| dims = data[layer].dims | ||
| if "x" in dims and "y" in dims and len(dims) == 2: | ||
| cleaned_name = layer | ||
| cleaned_name = cleaned_name.replace(f"_{cur_epoch}", "") | ||
| for pol in ["VV", "VH", "HH", "HV"]: | ||
| cleaned_name = cleaned_name.replace(f"_{pol}", "") | ||
|
|
||
| cleaned_data_layer[cleaned_name] = layer | ||
| if cleaned_name in [ | ||
| "latitude", "longitude", "elevation" | ||
| ] and mother_epoch is None: # these are expected only at the mother | ||
| mother_epoch = epoch | ||
|
|
||
| full_data[epoch] = { | ||
| "data": data, | ||
| "cleaned_data_layers": cleaned_data_layer, | ||
| "filename": file | ||
| } | ||
|
|
||
| # sort the provided files chronologically | ||
| sorted_epochs = list(sorted(list(full_data.keys()))) | ||
|
|
||
| # fallback for when mother epoch is not in the stack | ||
| if mother_epoch is None: | ||
| warning_msg = ( | ||
| f"Mother has not been identified. Using first epoch {sorted_epochs[0]} " | ||
| "for metadata instead." | ||
| ) | ||
| logger.warning(warning_msg) | ||
| mother_epoch = sorted_epochs[0] | ||
| daughter_epochs = [epoch for epoch in sorted_epochs if epoch != mother_epoch] | ||
|
|
||
| # read metadata to find the azimuth and range coordinate offsets | ||
| metadata_file = f"{full_data[mother_epoch]['filename']}/SNAP/product_metadata.json" | ||
| metadata = read_metadata(metadata_file, driver="snap") | ||
|
|
||
| # Initialize stack as a Dataset | ||
| coords = { | ||
| "azimuth": range( | ||
| metadata["first_line_number"], | ||
| metadata["first_line_number"] + metadata["number_of_lines"] | ||
| ), | ||
| "range": range( | ||
| metadata["first_pixel_number"], | ||
| metadata["first_pixel_number"] + metadata["number_of_pixels"] | ||
| ), | ||
| "time": sorted_epochs, | ||
| } | ||
| ds_stack = xr.Dataset(coords=coords) | ||
|
|
||
| # handle the complex data, i = real, q = complex | ||
| slcs = None | ||
| for epoch in sorted_epochs: | ||
| cplx = ( | ||
| full_data[epoch]["data"][full_data[epoch]["cleaned_data_layers"]["i"]].data | ||
| + 1j * | ||
| full_data[epoch]["data"][full_data[epoch]["cleaned_data_layers"]["q"]].data | ||
| ) | ||
| # Read the ZNAP archive and check if it is a mother epoch | ||
| data, is_mother = _read_one_znap_archive(file) | ||
|
|
||
| # Get current epoch | ||
| cur_epoch = data.attrs["time_coverage_start"] | ||
| time_stamp = datetime.strptime(cur_epoch, "%Y-%m-%dT%H:%M:%S.%fZ") | ||
| epoch = time_stamp.strftime("%Y%m%d") | ||
|
|
||
| # register the epoch and file in a dictionary for later use | ||
| epoch_file_dict[epoch] = file | ||
|
|
||
| # If mother epoch | ||
| # keep variables ZNAP_DATA_VAR_MOTHER separately | ||
| # Assign an all zero h2ph variable | ||
| if is_mother: | ||
| # check if mother epoch has already been found | ||
| if data_mother is not None: | ||
| raise ValueError( | ||
| "Multiple mother epochs found. " | ||
| "Please check the ZNAP archives." | ||
| "Only one mother epoch can have the following variables: " | ||
| f"{ZNAP_DATA_VAR_MOTHER}. " | ||
| ) | ||
|
|
||
| if slcs is None: | ||
| slc = cplx.reshape( | ||
| (metadata["number_of_lines"], metadata["number_of_pixels"], 1) | ||
| data_mother = data[ZNAP_DATA_VAR_MOTHER] | ||
| data = data.drop_vars(ZNAP_DATA_VAR_MOTHER) | ||
| data = data.assign( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be generic, not just |
||
| {"h2ph": (("y", "x"), np.zeros((data.sizes["y"], data.sizes["x"])))} | ||
| ) | ||
| slcs = slc | ||
| metadata_file = f"{file}/SNAP/product_metadata.json" | ||
|
|
||
| # Assign to ds_stack along the time dimension | ||
| if ds_stack is None: # first epoch, initialize ds_stack | ||
| ds_stack = data.expand_dims(time=[time_stamp]) | ||
| # Drop attrs inherited from the first epoch | ||
|
rogerkuou marked this conversation as resolved.
|
||
| ds_stack.attrs = {} | ||
| # Always initialize mother_epoch to None | ||
| # If first epoch is mother, it will be set to epoch below | ||
| ds_stack = ds_stack.assign_attrs({"mother_epoch": None}) | ||
| else: | ||
| slc = cplx.reshape( | ||
| (metadata["number_of_lines"], metadata["number_of_pixels"], 1) | ||
| ds_stack = xr.concat( | ||
| [ds_stack, data.expand_dims(time=[time_stamp])], | ||
| dim="time", | ||
| combine_attrs="override", # override the attrs of the first epoch | ||
| ) | ||
| slcs = da.concatenate([slcs, slc], axis=2) | ||
|
|
||
| ds_stack = ds_stack.assign({"complex": (("azimuth", "range", "time"), slcs)}) | ||
|
|
||
| # retain the additional layers in the daughter znaps (which are time-variant) | ||
| if len(daughter_epochs) > 0: | ||
| for layer in full_data[daughter_epochs[0]]["cleaned_data_layers"].keys(): | ||
| if layer in ["i", "q"]: | ||
| continue # these are handled separately above | ||
| layer_data = None | ||
| for epoch in sorted_epochs: | ||
| if layer_data is None: | ||
| if layer not in full_data[epoch]["cleaned_data_layers"].keys(): | ||
| # e.g. it does not exist in the mother epoch, so we add zeros | ||
| layer_data = da.zeros( | ||
| ( | ||
| metadata["number_of_lines"], | ||
| metadata["number_of_pixels"], | ||
| 1 | ||
| ) | ||
| ) | ||
| else: | ||
| layer_data = full_data[epoch]["data"][ | ||
| full_data[epoch]["cleaned_data_layers"][layer] | ||
| ].data | ||
| layer_data = layer_data.reshape( | ||
| ( | ||
| metadata["number_of_lines"], | ||
| metadata["number_of_pixels"], | ||
| 1 | ||
| ) | ||
| ) | ||
| else: | ||
| if layer not in full_data[epoch]["cleaned_data_layers"].keys(): | ||
| # e.g. it does not exist in the mother epoch, so we add zeros | ||
| cur_layer_data = da.zeros( | ||
| ( | ||
| metadata["number_of_lines"], | ||
| metadata["number_of_pixels"], | ||
| 1 | ||
| ) | ||
| ) | ||
| else: | ||
| cur_layer_data = full_data[epoch]["data"][ | ||
| full_data[epoch]["cleaned_layer_names"][layer] | ||
| ].data | ||
| cur_layer_data = cur_layer_data.reshape( | ||
| ( | ||
| metadata["number_of_lines"], | ||
| metadata["number_of_pixels"], | ||
| 1 | ||
| ) | ||
| ) | ||
| layer_data = da.concatenate( | ||
| [layer_data, cur_layer_data], axis=2 | ||
| ) | ||
| # Modify the ds_stack if mother epoch | ||
| if is_mother: | ||
| ds_stack = ds_stack.assign_attrs({"mother_epoch": epoch}) | ||
|
|
||
| ds_stack = ds_stack.assign( | ||
| {layer: (("azimuth", "range", "time"), layer_data)} | ||
| ) | ||
| # sort ds_stack by time | ||
| ds_stack = ds_stack.sortby("time") | ||
|
|
||
| # retain the additional layers in the mother znap (which is time-invariant) | ||
| for layer in full_data[mother_epoch]["cleaned_data_layers"].keys(): | ||
| if layer in ds_stack.data_vars.keys() or layer in ["i", "q"]: | ||
| continue # these are already included | ||
| ds_stack = ds_stack.assign( | ||
| { | ||
| layer: ( | ||
| ("azimuth", "range"), | ||
| full_data[mother_epoch]["data"][ | ||
| full_data[mother_epoch]["cleaned_data_layers"][layer] | ||
| ].data | ||
| ) | ||
| } | ||
| # order epoch_file_dict by epoch | ||
| epoch_file_dict = dict(sorted(epoch_file_dict.items())) | ||
|
|
||
| # Assign the mother epoch data to ds_stack | ||
| if data_mother is not None: | ||
| ds_stack = ds_stack.assign(data_mother) | ||
| else: | ||
| warning_msg = ( | ||
| "Mother epoch has not been identified. " | ||
| f"The following variables {ZNAP_DATA_VAR_MOTHER} " | ||
| "are not present in any of the ZNAP archives. " | ||
| ) | ||
| logger.warning(warning_msg) | ||
|
|
||
| # Read the metadata from the mother epoch if it exists | ||
| if metadata_file is not None: | ||
| metadata = read_metadata(metadata_file, driver="snap") | ||
| else: | ||
| warning_msg = ( | ||
| "Mother epoch has not been identified. " | ||
| "Using first epoch for metadata instead." | ||
| ) | ||
| logger.warning(warning_msg) | ||
| file_first_epoch = list(epoch_file_dict.values())[0] | ||
| metadata_file = f"{file_first_epoch}/SNAP/product_metadata.json" | ||
| metadata = read_metadata(metadata_file, driver="snap") | ||
| # Assign the metadata to ds_stack.attrs["metadata_mother"] | ||
| ds_stack = ds_stack.assign_attrs({"metadata_mother": metadata}) | ||
|
|
||
| # Rename and enforce the order of dimensions | ||
| ds_stack = ( | ||
| ds_stack.rename( | ||
| {"y": "azimuth", "x": "range"} | ||
| ).transpose( # rename the dimensions | ||
| "azimuth", "range", "time" | ||
| ) # enforce the order of dimensions | ||
| ) | ||
|
|
||
| # Get the complex data variable | ||
| ds_stack = ( | ||
| ds_stack.assign_coords( | ||
| azimuth=ds_stack["azimuth"] + metadata["first_line_number"], | ||
| range=ds_stack["range"] + metadata["first_pixel_number"], | ||
| ) # shift the azimuth and range coordinates by offset | ||
| .assign({"complex": ds_stack["i"] + 1j * ds_stack["q"]}) # assign complex | ||
| .drop_vars(["i", "q"]) # drop the original i and q variables | ||
| ) | ||
|
|
||
| # Calculate amplitude and phase from complex | ||
| ds_stack = ds_stack.slcstack._get_amplitude() | ||
| ds_stack = ds_stack.slcstack._get_phase() | ||
|
|
||
| return ds_stack | ||
|
|
||
|
|
||
| def to_binary( | ||
| output_path: str, | ||
| data: xr.Dataset | xr.DataArray, | ||
| data_var_name: str | None = None, | ||
| allow_overwrite: bool = False | ||
| output_path: str, | ||
| data: xr.Dataset | xr.DataArray, | ||
| data_var_name: str | None = None, | ||
| allow_overwrite: bool = False, | ||
| ): | ||
| """Write a zarr data layer to a binary file. | ||
|
|
||
|
|
@@ -439,7 +390,7 @@ def to_binary( | |
| - When `data` is not an `xr.Dataset` or `xr.DataArray` | ||
|
|
||
| KeyError | ||
| When `data` is an `xr.Dataset` and `data_var_name` is not a data variable in | ||
| When `data` is an `xr.Dataset` and `data_var_name` is not a data variable in | ||
| `data` | ||
|
|
||
| OSError | ||
|
|
@@ -461,10 +412,7 @@ def to_binary( | |
|
|
||
| # Create the memmap | ||
| memmap = np.memmap( | ||
| output_path, | ||
| dtype=datalayer.dtype, | ||
| mode="w+", | ||
| shape=datalayer.shape | ||
| output_path, dtype=datalayer.dtype, mode="w+", shape=datalayer.shape | ||
| ) | ||
| # Assign the data to the memmap | ||
| memmap[:] = datalayer.data[:] | ||
|
|
@@ -771,7 +719,7 @@ def _flatten_snap_json_metadata(content: dict | list, cur_keys: tuple = ()): | |
| all_values = [] | ||
| if isinstance(content, list): | ||
| for key in range(len(content)): | ||
| cur_keys_loop = cur_keys + (str(key), ) | ||
| cur_keys_loop = cur_keys + (str(key),) | ||
| part_values = _flatten_snap_json_metadata(content[key], cur_keys_loop) | ||
| all_values = [*all_values, *part_values] | ||
| elif isinstance(content, dict): | ||
|
|
@@ -787,19 +735,19 @@ def _flatten_snap_json_metadata(content: dict | list, cur_keys: tuple = ()): | |
| val = content["data"]["elems"][0] | ||
| else: | ||
| val = content["data"]["elems"] | ||
| cur_keys_save = cur_keys + (content["name"], ) | ||
| cur_keys_save = cur_keys + (content["name"],) | ||
| all_values.append([val, cur_keys_save]) | ||
|
|
||
| else: | ||
| cur_keys_loop = cur_keys + (content["name"], ) | ||
| cur_keys_loop = cur_keys + (content["name"],) | ||
| if "elements" in content.keys(): | ||
| cur_keys_loop_el = cur_keys_loop + ("elements", ) | ||
| cur_keys_loop_el = cur_keys_loop + ("elements",) | ||
| part_values = _flatten_snap_json_metadata( | ||
| content["elements"], cur_keys_loop_el | ||
| ) | ||
| all_values = [*all_values, *part_values] | ||
| if "attributes" in content.keys(): | ||
| cur_keys_loop_at = cur_keys_loop + ("attributes", ) | ||
| cur_keys_loop_at = cur_keys_loop + ("attributes",) | ||
| part_values = _flatten_snap_json_metadata( | ||
| content["attributes"], cur_keys_loop_at | ||
| ) | ||
|
|
@@ -920,3 +868,35 @@ def _regulate_metadata(metadata, driver): | |
| logger.warning(warning_msg) | ||
|
|
||
| return metadata | ||
|
|
||
|
|
||
| def _read_one_znap_archive(file: str | Path) -> tuple[xr.Dataset, bool]: | ||
| """Read a single ZNAP archive produced by SNAP and flag if mother.""" | ||
| # Initialize is_mother flag | ||
| is_mother = False | ||
|
|
||
| # open one ZNAP archive | ||
| data = xr.open_zarr(file, consolidated=False) | ||
|
|
||
| # there are two types of layers: full data layers [x, y], and tiepoint [xt, yt] | ||
| # we only preserve the full data layers, as the others cannot be placed | ||
| # drop all non-x/y dims, this also drops tiepoint vars | ||
| dims_non_xy = [dim for dim in data.dims if dim not in ["x", "y"]] | ||
| data = data.drop_dims(dims_non_xy) | ||
|
|
||
| # Rename the data variables according to RE_PATTERNS_SNAP_DATALAYER | ||
| for layer in list(data.data_vars): | ||
| for key, pattern in RE_PATTERNS_SNAP_DATALAYER.items(): | ||
| if re.match(pattern, layer): | ||
| data = data.rename({layer: key}) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we only rename the hardcoded layers |
||
| break | ||
|
|
||
|
Copilot marked this conversation as resolved.
|
||
| # Check if mother epoch | ||
| is_mother = any(v in data.data_vars for v in ZNAP_DATA_VAR_MOTHER) | ||
| # Assign indices as coordinates to x and y | ||
| # This facilitates the concatenation | ||
| data = data.assign_coords( | ||
| {"x": range(data.sizes["x"]), "y": range(data.sizes["y"])} | ||
| ) | ||
|
|
||
| return data, is_mother | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to discuss this. I agree my original implementation of retrieving this from the data layer names is not ideal, but currently this metadata field always corresponds to the mother, so I do not think we can use this. There are three questions here:
We could also consider the user to define the epochs of each
znaparchive as a second variable or a dictionary-like structure. Let's discussThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion with @rogerkuou : we cannot assume this field to be correct by default since our best attempt at a sarxarray workflow apparently produces incorrect metadata in this field. Let's therefore use the second best option: in the metadata there is a field called:
The timestamp in the product file is always correct since this is configured by ESA. Let's read it from there