Skip to content

Vcfzarr concatenate and rechunk #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 159 additions & 1 deletion sgkit/io/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import Mapping, Optional, Tuple
from typing import Any, Dict, Mapping, Optional, Sequence, Tuple

import dask.array as da
import dask.dataframe as dd
import fsspec
import numpy as np
import xarray as xr
import zarr

from ..typing import ArrayLike, DType
from ..utils import encode_array, max_str_len
Expand Down Expand Up @@ -37,3 +41,157 @@ def encode_contigs(contig: ArrayLike) -> Tuple[np.ndarray, np.ndarray]:
else:
ids, names = encode_array(np.asarray(contig, dtype=str))
return ids, names


def zarrs_to_dataset(
urls: Sequence[str],
chunk_length: int = 10_000,
chunk_width: int = 1_000,
storage_options: Optional[Dict[str, str]] = None,
) -> xr.Dataset:
"""Combine multiple Zarr stores into a single Xarray dataset.

The Zarr stores are concatenated and rechunked to produce a single combined dataset.

Parameters
----------
urls
A list of URLs to the Zarr stores to combine, typically the return value of
:func:`vcf_to_zarrs`.
chunk_length
Length (number of variants) of chunks in which data are stored, by default 10,000.
chunk_width
Width (number of samples) to use when storing chunks in output, by default 1,000.
storage_options
Any additional parameters for the storage backend (see ``fsspec.open``).

Returns
-------
A dataset representing the combined dataset.
"""

storage_options = storage_options or {}

datasets = [xr.open_zarr(fsspec.get_mapper(path, **storage_options), concat_characters=False) for path in urls] # type: ignore[no-untyped-call]

# Combine the datasets into one
ds = xr.concat(datasets, dim="variants", data_vars="minimal")

# This is a workaround to make rechunking work when the temp_chunk_length is different to chunk_length
# See https://github.com/pydata/xarray/issues/4380
for data_var in ds.data_vars:
if "variants" in ds[data_var].dims:
del ds[data_var].encoding["chunks"]

# Rechunk to uniform chunk size
ds: xr.Dataset = ds.chunk({"variants": chunk_length, "samples": chunk_width})

# Set variable length strings to fixed length ones to avoid xarray/conventions.py:188 warning
# (Also avoids this issue: https://github.com/pydata/xarray/issues/3476)
max_variant_id_length = max(ds.attrs["max_variant_id_length"] for ds in datasets)
max_variant_allele_length = max(
ds.attrs["max_variant_allele_length"] for ds in datasets
)
ds["variant_id"] = ds["variant_id"].astype(f"S{max_variant_id_length}") # type: ignore[no-untyped-call]
ds["variant_allele"] = ds["variant_allele"].astype(f"S{max_variant_allele_length}") # type: ignore[no-untyped-call]
del ds.attrs["max_variant_id_length"]
del ds.attrs["max_variant_allele_length"]

return ds


def concatenate_and_rechunk(
zarrs: Sequence[zarr.Array],
chunks: Optional[Tuple[int, ...]] = None,
dtype: DType = None,
) -> da.Array:
"""Perform a concatenate and rechunk operation on a collection of Zarr arrays
to produce an array with a uniform chunking, suitable for saving as
a single Zarr array.

In contrast to Dask's ``rechunk`` method, the Dask computation graph
is embarrassingly parallel and will make efficient use of memory,
since no Zarr chunks are cached by the Dask scheduler.

The Zarr arrays must have matching shapes except in the first
dimension.

Parameters
----------
zarrs
Collection of Zarr arrays to concatenate.
chunks : Optional[Tuple[int, ...]], optional
The chunks to apply to the concatenated arrays. If not specified
the chunks for the first array will be applied to the concatenated
array.
dtype
The dtype of the concatenated array, by default the same as the
first array.

Returns
-------
A Dask array, suitable for saving as a single Zarr array.

Raises
------
ValueError
If the Zarr arrays do not have matching shapes (except in the first
dimension).
"""

if len(set([z.shape[1:] for z in zarrs])) > 1:
shapes = [z.shape for z in zarrs]
raise ValueError(
f"Zarr arrays must have matching shapes (except in the first dimension): {shapes}"
)

lengths = np.array([z.shape[0] for z in zarrs])
lengths0 = np.insert(lengths, 0, 0, axis=0)
offsets = np.cumsum(lengths0)
total_length = offsets[-1]

shape = (total_length, *zarrs[0].shape[1:])
chunks = chunks or zarrs[0].chunks
dtype = dtype or zarrs[0].dtype

ar = da.empty(shape, chunks=chunks)

def load_chunk(
x: ArrayLike,
zarrs: Sequence[zarr.Array],
offsets: ArrayLike,
block_info: Dict[Any, Any],
) -> ArrayLike:
return _slice_zarrs(zarrs, offsets, block_info[0]["array-location"])

return ar.map_blocks(load_chunk, zarrs=zarrs, offsets=offsets, dtype=dtype)


def _zarr_index(offsets: ArrayLike, pos: int) -> int:
"""Return the index of the zarr file that pos falls in"""
index: int = np.searchsorted(offsets, pos, side="right") - 1
return index


def _slice_zarrs(
zarrs: Sequence[zarr.Array], offsets: ArrayLike, locs: Sequence[Tuple[int, ...]]
) -> ArrayLike:
"""Slice concatenated zarrs by locs"""
# convert array locations to slices
locs = [slice(*loc) for loc in locs]
# determine which zarr files are needed
start, stop = locs[0].start, locs[0].stop # stack on first axis
i0 = _zarr_index(offsets, start)
i1 = _zarr_index(offsets, stop)
if i0 == i1: # within a single zarr file
sel = slice(start - offsets[i0], stop - offsets[i0])
return zarrs[i0][(sel, *locs[1:])]
else: # more than one zarr file
slices = []
slices.append((i0, slice(start - offsets[i0], None)))
for i in range(i0 + 1, i1): # entire zarr
slices.append((i, slice(None)))
if stop > offsets[i1]:
slices.append((i1, slice(0, stop - offsets[i1])))
parts = [zarrs[i][(sel, *locs[1:])] for (i, sel) in slices]
return np.concatenate(parts)
3 changes: 2 additions & 1 deletion sgkit/io/vcf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import platform

try:
from ..utils import zarrs_to_dataset
from .vcf_partition import partition_into_regions
from .vcf_reader import vcf_to_zarr, vcf_to_zarrs, zarrs_to_dataset
from .vcf_reader import vcf_to_zarr, vcf_to_zarrs

__all__ = [
"partition_into_regions",
Expand Down
58 changes: 1 addition & 57 deletions sgkit/io/vcf/vcf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import xarray as xr
from cyvcf2 import VCF, Variant

from sgkit.io.utils import zarrs_to_dataset
from sgkit.io.vcf.utils import build_url, chunks, temporary_directory, url_filename
from sgkit.model import DIM_VARIANT, create_genotype_call_dataset
from sgkit.typing import PathType
Expand Down Expand Up @@ -275,63 +276,6 @@ def vcf_to_zarrs(
return parts


def zarrs_to_dataset(
urls: Sequence[str],
chunk_length: int = 10_000,
chunk_width: int = 1_000,
storage_options: Optional[Dict[str, str]] = None,
) -> xr.Dataset:
"""Combine multiple Zarr stores into a single Xarray dataset.

The Zarr stores are concatenated and rechunked to produce a single combined dataset.

Parameters
----------
urls
A list of URLs to the Zarr stores to combine, typically the return value of
:func:`vcf_to_zarrs`.
chunk_length
Length (number of variants) of chunks in which data are stored, by default 10,000.
chunk_width
Width (number of samples) to use when storing chunks in output, by default 1,000.
storage_options
Any additional parameters for the storage backend (see ``fsspec.open``).

Returns
-------
A dataset representing the combined dataset.
"""

storage_options = storage_options or {}

datasets = [xr.open_zarr(fsspec.get_mapper(path, **storage_options)) for path in urls] # type: ignore[no-untyped-call]

# Combine the datasets into one
ds = xr.concat(datasets, dim="variants", data_vars="minimal")

# This is a workaround to make rechunking work when the temp_chunk_length is different to chunk_length
# See https://github.com/pydata/xarray/issues/4380
for data_var in ds.data_vars:
if "variants" in ds[data_var].dims:
del ds[data_var].encoding["chunks"]

# Rechunk to uniform chunk size
ds: xr.Dataset = ds.chunk({"variants": chunk_length, "samples": chunk_width})

# Set variable length strings to fixed length ones to avoid xarray/conventions.py:188 warning
# (Also avoids this issue: https://github.com/pydata/xarray/issues/3476)
max_variant_id_length = max(ds.attrs["max_variant_id_length"] for ds in datasets)
max_variant_allele_length = max(
ds.attrs["max_variant_allele_length"] for ds in datasets
)
ds["variant_id"] = ds["variant_id"].astype(f"S{max_variant_id_length}") # type: ignore[no-untyped-call]
ds["variant_allele"] = ds["variant_allele"].astype(f"S{max_variant_allele_length}") # type: ignore[no-untyped-call]
del ds.attrs["max_variant_id_length"]
del ds.attrs["max_variant_allele_length"]

return ds


def vcf_to_zarr(
input: Union[PathType, Sequence[PathType]],
output: Union[PathType, MutableMapping[str, bytes]],
Expand Down
Loading