Skip to content

Support running on distributed compute engines like Dask, and Spark via Zap #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions scanpy/preprocessing/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Compositions of these functions are found in sc.preprocess.recipes.
"""

import numpy as np
import scipy as sp
import warnings
from scipy.sparse import issparse
Expand All @@ -14,9 +13,36 @@
from .. import logging as logg
from ..utils import sanitize_anndata

# install dask if available
try:
import dask.array as da
except ImportError:
da = None

# install zap (which wraps numpy), or fall back to plain numpy
try:
import zap.base as np
except ImportError:
import numpy as np

N_PCS = 50 # default number of PCs


def materialize_as_ndarray(a):
"""Convert distributed arrays to ndarrays."""
if type(a) in (list, tuple):
if da is not None and any(isinstance(arr, da.Array) for arr in a):
return da.compute(*a, sync=True)
elif hasattr(np, 'asarray'): # zap case
return tuple(np.asarray(arr) for arr in a)
else:
if da is not None and isinstance(a, da.Array):
return a.compute()
elif hasattr(np, 'asarray'): # zap case
return np.asarray(a)
return a


def filter_cells(data, min_counts=None, min_genes=None, max_counts=None,
max_genes=None, copy=False):
"""Filter cell outliers based on counts and numbers of genes expressed.
Expand Down Expand Up @@ -96,7 +122,7 @@ def filter_cells(data, min_counts=None, min_genes=None, max_counts=None,
raise ValueError('Provide one of min_counts, min_genes, max_counts or max_genes.')
if isinstance(data, AnnData):
adata = data.copy() if copy else data
cell_subset, number = filter_cells(adata.X, min_counts, min_genes, max_counts, max_genes)
cell_subset, number = materialize_as_ndarray(filter_cells(adata.X, min_counts, min_genes, max_counts, max_genes))
if min_genes is None and max_genes is None: adata.obs['n_counts'] = number
else: adata.obs['n_genes'] = number
adata._inplace_subset_obs(cell_subset)
Expand Down Expand Up @@ -175,9 +201,9 @@ def filter_genes(data, min_counts=None, min_cells=None, max_counts=None,

if isinstance(data, AnnData):
adata = data.copy() if copy else data
gene_subset, number = filter_genes(adata.X, min_cells=min_cells,
gene_subset, number = materialize_as_ndarray(filter_genes(adata.X, min_cells=min_cells,
min_counts=min_counts, max_cells=max_cells,
max_counts=max_counts)
max_counts=max_counts))
if min_cells is None and max_cells is None:
adata.var['n_counts'] = number
else:
Expand Down Expand Up @@ -299,7 +325,7 @@ def filter_genes_dispersion(data,
logg.msg('extracting highly variable genes',
r=True, v=4)
X = data # no copy necessary, X remains unchanged in the following
mean, var = _get_mean_var(X)
mean, var = materialize_as_ndarray(_get_mean_var(X))
# now actually compute the dispersion
mean[mean == 0] = 1e-12 # set entries equal to zero to small value
dispersion = var / mean
Expand Down Expand Up @@ -514,8 +540,8 @@ def pca(data, n_comps=None, zero_center=True, svd_solver='auto', random_state=0,
If an :class:`~anndata.AnnData` is passed, determines whether a copy
is returned. Is ignored otherwise.
chunked : `bool`, optional (default: `False`)
If `True`, perform an incremental PCA on segments of `chunk_size`. The
incremental PCA automatically zero centers and ignores settings of
If `True`, perform an incremental PCA on segments of `chunk_size`. The
incremental PCA automatically zero centers and ignores settings of
`random_seed` and `svd_solver`. If `False`, perform a full PCA.
chunk_size : `int`, optional (default: `None`)
Number of observations to include in each chunk. Required if `chunked`
Expand Down Expand Up @@ -679,7 +705,7 @@ def normalize_per_cell(data, counts_per_cell_after=None, counts_per_cell=None,
if isinstance(data, AnnData):
logg.msg('normalizing by total count per cell', r=True)
adata = data.copy() if copy else data
cell_subset, counts_per_cell = filter_cells(adata.X, min_counts=1)
cell_subset, counts_per_cell = materialize_as_ndarray(filter_cells(adata.X, min_counts=1))
adata.obs[key_n_counts] = counts_per_cell
adata._inplace_subset_obs(cell_subset)
normalize_per_cell(adata.X, counts_per_cell_after,
Expand Down Expand Up @@ -716,7 +742,7 @@ def normalize_per_cell(data, counts_per_cell_after=None, counts_per_cell=None,
with warnings.catch_warnings():
warnings.simplefilter("ignore")
counts_per_cell /= counts_per_cell_after
if not issparse(X): X /= counts_per_cell[:, np.newaxis]
if not issparse(X): X /= materialize_as_ndarray(counts_per_cell[:, np.newaxis])
else: sparsefuncs.inplace_row_scale(X, 1/counts_per_cell)
return X if copy else None

Expand Down
3 changes: 3 additions & 0 deletions scanpy/tests/_data/10x-10k-subset.zarr/.zgroup
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"zarr_format": 2
}
22 changes: 22 additions & 0 deletions scanpy/tests/_data/10x-10k-subset.zarr/X/.zarray
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"chunks": [
2000,
1000
],
"compressor": {
"blocksize": 0,
"clevel": 5,
"cname": "lz4",
"id": "blosc",
"shuffle": 1
},
"dtype": "<f4",
"fill_value": 0.0,
"filters": null,
"order": "C",
"shape": [
10000,
1000
],
"zarr_format": 2
}
Binary file added scanpy/tests/_data/10x-10k-subset.zarr/X/0.0
Binary file not shown.
Binary file added scanpy/tests/_data/10x-10k-subset.zarr/X/1.0
Binary file not shown.
Binary file added scanpy/tests/_data/10x-10k-subset.zarr/X/2.0
Binary file not shown.
Binary file added scanpy/tests/_data/10x-10k-subset.zarr/X/3.0
Binary file not shown.
Binary file added scanpy/tests/_data/10x-10k-subset.zarr/X/4.0
Binary file not shown.
25 changes: 25 additions & 0 deletions scanpy/tests/_data/10x-10k-subset.zarr/obs/.zarray
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"chunks": [
10000
],
"compressor": {
"blocksize": 0,
"clevel": 5,
"cname": "lz4",
"id": "blosc",
"shuffle": 1
},
"dtype": [
[
"index",
"|S18"
]
],
"fill_value": "AAAAAAAAAAAAAAAAAAAAAAAA",
"filters": null,
"order": "C",
"shape": [
10000
],
"zarr_format": 2
}
Binary file added scanpy/tests/_data/10x-10k-subset.zarr/obs/0
Binary file not shown.
29 changes: 29 additions & 0 deletions scanpy/tests/_data/10x-10k-subset.zarr/var/.zarray
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"chunks": [
1000
],
"compressor": {
"blocksize": 0,
"clevel": 5,
"cname": "lz4",
"id": "blosc",
"shuffle": 1
},
"dtype": [
[
"index",
"|S13"
],
[
"gene_ids",
"|S18"
]
],
"fill_value": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"filters": null,
"order": "C",
"shape": [
1000
],
"zarr_format": 2
}
Binary file added scanpy/tests/_data/10x-10k-subset.zarr/var/0
Binary file not shown.
86 changes: 86 additions & 0 deletions scanpy/tests/preprocessing_distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from importlib.util import find_spec
from pathlib import Path

import anndata as ad
import numpy.testing as npt
import pytest

from scanpy.api.pp import *
from scanpy.preprocessing.simple import materialize_as_ndarray

HERE = Path(__file__).parent / Path('_data/')
input_file = str(Path(HERE, "10x-10k-subset.zarr"))

@pytest.mark.skipif(not all((find_spec('dask'), find_spec('zap'), find_spec('zarr'))), reason='Dask, Zap, and Zarr all required')
class TestPreprocessingDistributed:
@pytest.fixture()
def adata(self):
a = ad.read_zarr(input_file) # regular anndata
a.X = a.X[:] # convert to numpy array
return a

@pytest.fixture(params=["direct", "dask"])
def adata_dist(self, request):
# regular anndata except for X, which we replace on the next line
a = ad.read_zarr(input_file)
input_file_X = input_file + "/X"
if request.param == "direct":
import zap.direct

a.X = zap.direct.from_zarr(input_file_X)
yield a
elif request.param == "dask":
import dask.array as da

a.X = da.from_zarr(input_file_X)
yield a

def test_log1p(self, adata, adata_dist):
log1p(adata_dist)
result = materialize_as_ndarray(adata_dist.X)
log1p(adata)
assert result.shape == adata.shape
assert result.shape == (adata.n_obs, adata.n_vars)
npt.assert_allclose(result, adata.X)

def test_normalize_per_cell(self, adata, adata_dist):
normalize_per_cell(adata_dist)
result = materialize_as_ndarray(adata_dist.X)
normalize_per_cell(adata)
assert result.shape == adata.shape
assert result.shape == (adata.n_obs, adata.n_vars)
npt.assert_allclose(result, adata.X)

def test_filter_cells(self, adata, adata_dist):
filter_cells(adata_dist, min_genes=3)
result = materialize_as_ndarray(adata_dist.X)
filter_cells(adata, min_genes=3)
assert result.shape == adata.shape
assert result.shape == (adata.n_obs, adata.n_vars)
npt.assert_allclose(result, adata.X)

def test_filter_genes(self, adata, adata_dist):
filter_genes(adata_dist, min_cells=2)
result = materialize_as_ndarray(adata_dist.X)
filter_genes(adata, min_cells=2)
assert result.shape == adata.shape
assert result.shape == (adata.n_obs, adata.n_vars)
npt.assert_allclose(result, adata.X)

def test_write_zarr(self, adata, adata_dist):
import dask.array as da
import zarr

log1p(adata_dist)
temp_store = zarr.TempStore()
chunks = adata_dist.X.chunks
# write metadata using regular anndata
adata.write_zarr(temp_store, chunks)
if isinstance(adata_dist.X, da.Array):
adata_dist.X.to_zarr(temp_store.dir_path("X"))
else:
adata_dist.X.to_zarr(temp_store.dir_path("X"), chunks)
# read back as zarr directly and check it is the same as adata.X
adata_log1p = ad.read_zarr(temp_store)
log1p(adata)
npt.assert_allclose(adata_log1p.X, adata.X)