Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 144 additions & 164 deletions sarxarray/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
RE_PATTERNS_DORIS5,
RE_PATTERNS_DORIS5_IFG,
RE_PATTERNS_SNAP,
RE_PATTERNS_SNAP_DATALAYER,
TIME_FORMAT_DORIS4,
TIME_FORMAT_DORIS5,
TIME_FORMAT_SNAP,
TIME_STAMP_KEY,
ZNAP_DATA_VAR_MOTHER,
_dtypes,
_memsize_chunk_mb,
)
Expand Down Expand Up @@ -228,7 +230,7 @@ def from_snap_dataset(snap_znap_archives: list[str, Path]) -> xr.Dataset:
"""
# Check if snap_znap_archives is a non empty Iterable and not a string
if not hasattr(snap_znap_archives, "__iter__") or isinstance(
snap_znap_archives, str
snap_znap_archives, str
):
raise ValueError(
"snap_znap_archives should be a non-empty Iterable and not a string."
Expand All @@ -238,176 +240,125 @@ def from_snap_dataset(snap_znap_archives: list[str, Path]) -> xr.Dataset:
if len(snap_znap_archives) == 0:
raise ValueError("snap_znap_archives should be a non-empty Iterable.")

# in the .znaps, the dimension x is range, y is azimuth
# xt and yt are interpolated values at coarse resolution
full_data = {}
mother_epoch = None

# Loop over all ZNAP archives and read into ds_stack
ds_stack = None # Stack of all epochs as xr.Dataset
data_mother = None # Mother epoch specific xr.Dataset
metadata_file = None # Metadata file, read from mother epoch
epoch_file_dict = {} # Map of epoch to file. snap_znap_archives may not be sorted
for file in snap_znap_archives:
data = xr.open_zarr(file, consolidated=False)
data_layers = data.original_raster_data_node_order
cur_epoch = data_layers[0].split("_")[-1]
epoch = datetime.strptime(cur_epoch, "%d%b%Y").strftime("%Y%m%d")

# there are two types of layers: full data layers [x, y], and tiepoint [xt, yt]
# we only preserve the full data layers, as the others cannot be placed
cleaned_data_layer = {}
for layer in data_layers:
dims = data[layer].dims
if "x" in dims and "y" in dims and len(dims) == 2:
cleaned_name = layer
cleaned_name = cleaned_name.replace(f"_{cur_epoch}", "")
for pol in ["VV", "VH", "HH", "HV"]:
cleaned_name = cleaned_name.replace(f"_{pol}", "")

cleaned_data_layer[cleaned_name] = layer
if cleaned_name in [
"latitude", "longitude", "elevation"
] and mother_epoch is None: # these are expected only at the mother
mother_epoch = epoch

full_data[epoch] = {
"data": data,
"cleaned_data_layers": cleaned_data_layer,
"filename": file
}

# sort the provided files chronologically
sorted_epochs = list(sorted(list(full_data.keys())))

# fallback for when mother epoch is not in the stack
if mother_epoch is None:
warning_msg = (
f"Mother has not been identified. Using first epoch {sorted_epochs[0]} "
"for metadata instead."
)
logger.warning(warning_msg)
mother_epoch = sorted_epochs[0]
daughter_epochs = [epoch for epoch in sorted_epochs if epoch != mother_epoch]

# read metadata to find the azimuth and range coordinate offsets
metadata_file = f"{full_data[mother_epoch]['filename']}/SNAP/product_metadata.json"
metadata = read_metadata(metadata_file, driver="snap")

# Initialize stack as a Dataset
coords = {
"azimuth": range(
metadata["first_line_number"],
metadata["first_line_number"] + metadata["number_of_lines"]
),
"range": range(
metadata["first_pixel_number"],
metadata["first_pixel_number"] + metadata["number_of_pixels"]
),
"time": sorted_epochs,
}
ds_stack = xr.Dataset(coords=coords)

# handle the complex data, i = real, q = complex
slcs = None
for epoch in sorted_epochs:
cplx = (
full_data[epoch]["data"][full_data[epoch]["cleaned_data_layers"]["i"]].data
+ 1j *
full_data[epoch]["data"][full_data[epoch]["cleaned_data_layers"]["q"]].data
)
# Read the ZNAP archive and check if it is a mother epoch
data, is_mother = _read_one_znap_archive(file)

# Get current epoch
cur_epoch = data.attrs["time_coverage_start"]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to discuss this. I agree my original implementation of retrieving this from the data layer names is not ideal, but currently this metadata field always corresponds to the mother, so I do not think we can use this. There are three questions here:

  • Can we assume this field is always present?
  • If so, can we assume that this field always refers to the daughter image?
  • If both are yes, is this just a TU Delft-specific config related to our processing infrastructure, or is this generically true?

We could also consider the user to define the epochs of each znap archive as a second variable or a dictionary-like structure. Let's discuss

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion with @rogerkuou : we cannot assume this field to be correct by default since our best attempt at a sarxarray workflow apparently produces incorrect metadata in this field. Let's therefore use the second best option: in the metadata there is a field called:

    "elements" : [ {
      "name" : "sources",
      "attributes" : [ {
        "name" : "sourceProduct",
        "readOnly" : false,
        "data" : {
          "type" : "ascii",
          "elems" : "product:S1A_IW_SLC__1SDV_20230506T055033_20230506T055100_048409_05D2A0_F5DE"
        }
      } ]

The timestamp in the product file is always correct since this is configured by ESA. Let's read it from there

time_stamp = datetime.strptime(cur_epoch, "%Y-%m-%dT%H:%M:%S.%fZ")
epoch = time_stamp.strftime("%Y%m%d")

# register the epoch and file in a dictionary for later use
epoch_file_dict[epoch] = file

# If mother epoch
# keep variables ZNAP_DATA_VAR_MOTHER separately
# Assign an all zero h2ph variable
if is_mother:
# check if mother epoch has already been found
if data_mother is not None:
raise ValueError(
"Multiple mother epochs found. "
"Please check the ZNAP archives."
"Only one mother epoch can have the following variables: "
f"{ZNAP_DATA_VAR_MOTHER}. "
)

if slcs is None:
slc = cplx.reshape(
(metadata["number_of_lines"], metadata["number_of_pixels"], 1)
data_mother = data[ZNAP_DATA_VAR_MOTHER]
data = data.drop_vars(ZNAP_DATA_VAR_MOTHER)
data = data.assign(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be generic, not just h2ph. I propose to move this outside the loop when we can be sure that data is fully formed, and we can simply add all the missing fields present in data as zero layers

{"h2ph": (("y", "x"), np.zeros((data.sizes["y"], data.sizes["x"])))}
)
slcs = slc
metadata_file = f"{file}/SNAP/product_metadata.json"

# Assign to ds_stack along the time dimension
if ds_stack is None: # first epoch, initialize ds_stack
ds_stack = data.expand_dims(time=[time_stamp])
# Drop attrs inherited from the first epoch
Comment thread
rogerkuou marked this conversation as resolved.
ds_stack.attrs = {}
# Always initialize mother_epoch to None
# If first epoch is mother, it will be set to epoch below
ds_stack = ds_stack.assign_attrs({"mother_epoch": None})
else:
slc = cplx.reshape(
(metadata["number_of_lines"], metadata["number_of_pixels"], 1)
ds_stack = xr.concat(
[ds_stack, data.expand_dims(time=[time_stamp])],
dim="time",
combine_attrs="override", # override the attrs of the first epoch
)
slcs = da.concatenate([slcs, slc], axis=2)

ds_stack = ds_stack.assign({"complex": (("azimuth", "range", "time"), slcs)})

# retain the additional layers in the daughter znaps (which are time-variant)
if len(daughter_epochs) > 0:
for layer in full_data[daughter_epochs[0]]["cleaned_data_layers"].keys():
if layer in ["i", "q"]:
continue # these are handled separately above
layer_data = None
for epoch in sorted_epochs:
if layer_data is None:
if layer not in full_data[epoch]["cleaned_data_layers"].keys():
# e.g. it does not exist in the mother epoch, so we add zeros
layer_data = da.zeros(
(
metadata["number_of_lines"],
metadata["number_of_pixels"],
1
)
)
else:
layer_data = full_data[epoch]["data"][
full_data[epoch]["cleaned_data_layers"][layer]
].data
layer_data = layer_data.reshape(
(
metadata["number_of_lines"],
metadata["number_of_pixels"],
1
)
)
else:
if layer not in full_data[epoch]["cleaned_data_layers"].keys():
# e.g. it does not exist in the mother epoch, so we add zeros
cur_layer_data = da.zeros(
(
metadata["number_of_lines"],
metadata["number_of_pixels"],
1
)
)
else:
cur_layer_data = full_data[epoch]["data"][
full_data[epoch]["cleaned_layer_names"][layer]
].data
cur_layer_data = cur_layer_data.reshape(
(
metadata["number_of_lines"],
metadata["number_of_pixels"],
1
)
)
layer_data = da.concatenate(
[layer_data, cur_layer_data], axis=2
)
# Modify the ds_stack if mother epoch
if is_mother:
ds_stack = ds_stack.assign_attrs({"mother_epoch": epoch})

ds_stack = ds_stack.assign(
{layer: (("azimuth", "range", "time"), layer_data)}
)
# sort ds_stack by time
ds_stack = ds_stack.sortby("time")

# retain the additional layers in the mother znap (which is time-invariant)
for layer in full_data[mother_epoch]["cleaned_data_layers"].keys():
if layer in ds_stack.data_vars.keys() or layer in ["i", "q"]:
continue # these are already included
ds_stack = ds_stack.assign(
{
layer: (
("azimuth", "range"),
full_data[mother_epoch]["data"][
full_data[mother_epoch]["cleaned_data_layers"][layer]
].data
)
}
# order epoch_file_dict by epoch
epoch_file_dict = dict(sorted(epoch_file_dict.items()))

# Assign the mother epoch data to ds_stack
if data_mother is not None:
ds_stack = ds_stack.assign(data_mother)
else:
warning_msg = (
"Mother epoch has not been identified. "
f"The following variables {ZNAP_DATA_VAR_MOTHER} "
"are not present in any of the ZNAP archives. "
)
logger.warning(warning_msg)

# Read the metadata from the mother epoch if it exists
if metadata_file is not None:
metadata = read_metadata(metadata_file, driver="snap")
else:
warning_msg = (
"Mother epoch has not been identified. "
"Using first epoch for metadata instead."
)
logger.warning(warning_msg)
file_first_epoch = list(epoch_file_dict.values())[0]
metadata_file = f"{file_first_epoch}/SNAP/product_metadata.json"
metadata = read_metadata(metadata_file, driver="snap")
# Assign the metadata to ds_stack.attrs["metadata_mother"]
ds_stack = ds_stack.assign_attrs({"metadata_mother": metadata})

# Rename and enforce the order of dimensions
ds_stack = (
ds_stack.rename(
{"y": "azimuth", "x": "range"}
).transpose( # rename the dimensions
"azimuth", "range", "time"
) # enforce the order of dimensions
)

# Get the complex data variable
ds_stack = (
ds_stack.assign_coords(
azimuth=ds_stack["azimuth"] + metadata["first_line_number"],
range=ds_stack["range"] + metadata["first_pixel_number"],
) # shift the azimuth and range coordinates by offset
.assign({"complex": ds_stack["i"] + 1j * ds_stack["q"]}) # assign complex
.drop_vars(["i", "q"]) # drop the original i and q variables
)

# Calculate amplitude and phase from complex
ds_stack = ds_stack.slcstack._get_amplitude()
ds_stack = ds_stack.slcstack._get_phase()

return ds_stack


def to_binary(
output_path: str,
data: xr.Dataset | xr.DataArray,
data_var_name: str | None = None,
allow_overwrite: bool = False
output_path: str,
data: xr.Dataset | xr.DataArray,
data_var_name: str | None = None,
allow_overwrite: bool = False,
):
"""Write a zarr data layer to a binary file.

Expand Down Expand Up @@ -439,7 +390,7 @@ def to_binary(
- When `data` is not an `xr.Dataset` or `xr.DataArray`

KeyError
When `data` is an `xr.Dataset` and `data_var_name` is not a data variable in
When `data` is an `xr.Dataset` and `data_var_name` is not a data variable in
`data`

OSError
Expand All @@ -461,10 +412,7 @@ def to_binary(

# Create the memmap
memmap = np.memmap(
output_path,
dtype=datalayer.dtype,
mode="w+",
shape=datalayer.shape
output_path, dtype=datalayer.dtype, mode="w+", shape=datalayer.shape
)
# Assign the data to the memmap
memmap[:] = datalayer.data[:]
Expand Down Expand Up @@ -771,7 +719,7 @@ def _flatten_snap_json_metadata(content: dict | list, cur_keys: tuple = ()):
all_values = []
if isinstance(content, list):
for key in range(len(content)):
cur_keys_loop = cur_keys + (str(key), )
cur_keys_loop = cur_keys + (str(key),)
part_values = _flatten_snap_json_metadata(content[key], cur_keys_loop)
all_values = [*all_values, *part_values]
elif isinstance(content, dict):
Expand All @@ -787,19 +735,19 @@ def _flatten_snap_json_metadata(content: dict | list, cur_keys: tuple = ()):
val = content["data"]["elems"][0]
else:
val = content["data"]["elems"]
cur_keys_save = cur_keys + (content["name"], )
cur_keys_save = cur_keys + (content["name"],)
all_values.append([val, cur_keys_save])

else:
cur_keys_loop = cur_keys + (content["name"], )
cur_keys_loop = cur_keys + (content["name"],)
if "elements" in content.keys():
cur_keys_loop_el = cur_keys_loop + ("elements", )
cur_keys_loop_el = cur_keys_loop + ("elements",)
part_values = _flatten_snap_json_metadata(
content["elements"], cur_keys_loop_el
)
all_values = [*all_values, *part_values]
if "attributes" in content.keys():
cur_keys_loop_at = cur_keys_loop + ("attributes", )
cur_keys_loop_at = cur_keys_loop + ("attributes",)
part_values = _flatten_snap_json_metadata(
content["attributes"], cur_keys_loop_at
)
Expand Down Expand Up @@ -920,3 +868,35 @@ def _regulate_metadata(metadata, driver):
logger.warning(warning_msg)

return metadata


def _read_one_znap_archive(file: str | Path) -> tuple[xr.Dataset, bool]:
"""Read a single ZNAP archive produced by SNAP and flag if mother."""
# Initialize is_mother flag
is_mother = False

# open one ZNAP archive
data = xr.open_zarr(file, consolidated=False)

# there are two types of layers: full data layers [x, y], and tiepoint [xt, yt]
# we only preserve the full data layers, as the others cannot be placed
# drop all non-x/y dims, this also drops tiepoint vars
dims_non_xy = [dim for dim in data.dims if dim not in ["x", "y"]]
data = data.drop_dims(dims_non_xy)

# Rename the data variables according to RE_PATTERNS_SNAP_DATALAYER
for layer in list(data.data_vars):
for key, pattern in RE_PATTERNS_SNAP_DATALAYER.items():
if re.match(pattern, layer):
data = data.rename({layer: key})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we only rename the hardcoded layers i, q, h2ph, latitude, longitude, and elevation. In my original implementation I scanned the current datalayer name for presence of polarisation and a date instead of hardcoding it, and stripped them when found. This means that it is not limited to these six layers, and can be used by anyone in a generic sense. I am in favour of reverting to a more generic implementation to facilitate other users as well

break

Comment thread
Copilot marked this conversation as resolved.
# Check if mother epoch
is_mother = any(v in data.data_vars for v in ZNAP_DATA_VAR_MOTHER)
# Assign indices as coordinates to x and y
# This facilitates the concatenation
data = data.assign_coords(
{"x": range(data.sizes["x"]), "y": range(data.sizes["y"])}
)

return data, is_mother
Loading
Loading