Skip to content

Commit

Permalink
perf(eda): optimize plot_missing performance by tweaking dask
Browse files Browse the repository at this point in the history
  • Loading branch information
dovahcrow committed Jul 17, 2020
1 parent bbeec05 commit d766977
Showing 4 changed files with 179 additions and 141 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
@@ -108,4 +108,4 @@ release version:


@ensure-git-clean:
if [ ! -z "$(git status --porcelain)" ]; then echo "Git tree is not clean, commit first"; exit 1; fi
if [ ! -z "$(git status --porcelain)" ]; then echo "Git tree is not clean, commit first"; exit 1; fi
7 changes: 1 addition & 6 deletions dataprep/eda/missing/__init__.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@ def plot_missing(
y: Optional[str] = None,
*,
bins: int = 30,
ncols: int = 30,
ndist_sample: int = 100,
dtype: Optional[DTypeDef] = None,
) -> Report:
@@ -39,8 +38,6 @@ def plot_missing(
a valid column name of the data frame
y
a valid column name of the data frame
ncols
The number of columns in the figure
bins
The number of rows in the figure
ndist_sample
@@ -59,8 +56,6 @@ def plot_missing(
>>> plot_missing(df, "HDI_for_year")
>>> plot_missing(df, "HDI_for_year", "population")
"""
itmdt = compute_missing(
df, x, y, dtype=dtype, bins=bins, ncols=ncols, ndist_sample=ndist_sample
)
itmdt = compute_missing(df, x, y, dtype=dtype, bins=bins, ndist_sample=ndist_sample)
fig = render_missing(itmdt)
return Report(fig)
187 changes: 115 additions & 72 deletions dataprep/eda/missing/compute.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
This module implements the plot_missing(df) function's
calculating intermediate part
"""
from typing import Optional, Tuple, Union, List
from typing import List, Optional, Tuple, Union, Callable

import dask
import dask.array as da
@@ -12,20 +12,20 @@
from scipy.stats import rv_histogram

from ...errors import UnreachableError
from ..utils import to_dask
from ..dtypes import (
is_dtype,
detect_dtype,
is_pandas_categorical,
Continuous,
Nominal,
DTypeDef,
Nominal,
detect_dtype,
is_dtype,
is_pandas_categorical,
)
from ..intermediate import Intermediate, ColumnsMetadata
from ..intermediate import ColumnsMetadata, Intermediate
from ..utils import to_dask

__all__ = ["compute_missing"]

LABELS = ["Origin", "DropMissing"]
LABELS = ["With Missing", "Missing Dropped"]


def histogram(
@@ -74,88 +74,146 @@ def histogram(
raise UnreachableError()


def missing_perc_blockwise(block: np.ndarray) -> np.ndarray:
def missing_impact(df: dd.DataFrame, bins: int) -> Intermediate:
"""
Calculate the data for visualizing the plot_missing(df).
This contains the missing spectrum, missing bar chart and missing heatmap.
"""
cols = df.columns.values
(nulldf,) = dask.persist(df.isnull())
nullity = nulldf.to_dask_array(lengths=True)

null_perc = nullity.sum(axis=0) / nullity.shape[0]

tasks = (
missing_spectrum(nullity, cols, bins=bins),
null_perc,
missing_bars(null_perc, cols),
missing_heatmap(nulldf, null_perc, cols),
)

spectrum, null_perc, bars, heatmap = dd.compute(*tasks)

return Intermediate(
data_total_missing={col: null_perc[idx] for idx, col in enumerate(cols)},
data_spectrum=spectrum,
data_bars=bars,
data_heatmap=heatmap,
visual_type="missing_impact",
)


def missing_perc_blockwise(bin_size: int) -> Callable[[np.ndarray], np.ndarray]:
"""
Compute the missing percentage in a block
"""
return block.sum(axis=0, keepdims=True) / len(block)

def imp(block: np.ndarray) -> np.ndarray:
nbins = block.shape[0] // bin_size

sep = nbins * bin_size
block1 = block[:sep].reshape((bin_size, nbins, *block.shape[1:]))
ret = block1.sum(axis=0) / bin_size

def missing_spectrum(
df: dd.DataFrame, bins: int, ncols: int
) -> Tuple[dd.DataFrame, dd.DataFrame]:
# remaining data that cannot be fit into a single bin
if block.shape[0] != sep:
ret_remainder = block[sep:].sum(axis=0, keepdims=True) / (
block.shape[0] - sep
)
ret = np.concatenate([ret, ret_remainder], axis=0)

return ret

return imp


def missing_spectrum( # pylint: disable=too-many-locals
data: da.Array, cols: np.ndarray, bins: int
) -> dd.DataFrame:
"""
Calculate a missing spectrum for each column
"""
# pylint: disable=too-many-locals
num_bins = min(bins, len(df) - 1)

df = df.iloc[:, :ncols]
cols = df.columns[:ncols]
ncols = len(cols)
nrows = len(df)
chunk_size = len(df) // num_bins
data = df.isnull().to_dask_array()
data.compute_chunk_sizes()
data = data.rechunk((chunk_size, None))
nrows, ncols = data.shape
num_bins = min(bins, nrows - 1)
bin_size = nrows // num_bins
chunk_size = min(
1024 * 1024 * 128, nrows * ncols
) # max 1024 x 1024 x 128 Bytes bool values
nbins_per_chunk = max(chunk_size // (bin_size * data.shape[1]), 1)

notnull_counts = data.sum(axis=0) / data.shape[0]
total_missing_percs = {col: notnull_counts[idx] for idx, col in enumerate(cols)}
chunk_size = nbins_per_chunk * bin_size

spectrum_missing_percs = data.map_blocks(
missing_perc_blockwise, chunks=(1, data.shape[1]), dtype=float
data = data.rechunk((chunk_size, None))

sep = nrows // chunk_size * chunk_size
spectrum_missing_percs = data[:sep].map_blocks(
missing_perc_blockwise(bin_size),
chunks=(nbins_per_chunk, *data.shape[1:]),
dtype=float,
)
nsegments = len(spectrum_missing_percs)

locs0 = da.arange(nsegments) * chunk_size
locs1 = da.minimum(locs0 + chunk_size, nrows)
locs_middle = locs0 + chunk_size / 2
# calculation for the last chunk
if sep != nrows:
spectrum_missing_percs_remain = data[sep:].map_blocks(
missing_perc_blockwise(bin_size),
chunks=(int(np.ceil((nrows - sep) / bin_size)), *data.shape[1:]),
dtype=float,
)
spectrum_missing_percs = da.concatenate(
[spectrum_missing_percs, spectrum_missing_percs_remain], axis=0
)

num_bins = spectrum_missing_percs.shape[0]

locs0 = da.arange(num_bins) * bin_size
locs1 = da.minimum(locs0 + bin_size, nrows)
locs_middle = locs0 + bin_size / 2

df = dd.from_dask_array(
da.repeat(da.from_array(cols.values, (1,)), nsegments), columns=["column"],
da.repeat(da.from_array(cols, (1,)), num_bins), columns=["column"],
)

df = df.assign(
location=da.tile(locs_middle, ncols),
missing_rate=spectrum_missing_percs.T.ravel(),
missing_rate=spectrum_missing_percs.T.ravel().rechunk(locs_middle.shape[0]),
loc_start=da.tile(locs0, ncols),
loc_end=da.tile(locs1, ncols),
)

return df, total_missing_percs
return df


def missing_heatmap(df: dd.DataFrame) -> Optional[pd.DataFrame]:
def missing_bars(null_perc: da.Array, cols: np.ndarray) -> pd.DataFrame:
"""
Calculate a heatmap visualization of nullity correlation in the given DataFrame
Calculate a bar chart visualization of nullity correlation in the given DataFrame
"""
notnull_perc = 1 - null_perc

# Remove completely filled or completely empty variables.
nonnulls = da.stack([df[col].isnull().sum() == 0 for col in df.columns])
allnulls = da.stack([df[col].isnull().sum() == len(df) for col in df.columns])

sel = ~(nonnulls | allnulls)
df = dd.from_dask_array(
da.stack([null_perc, notnull_perc, da.from_array(cols, (1,))], axis=1),
columns=["missing", "not missing", "columns"],
)

cols = df.columns[sel.compute()] # TODO: Can we remove the compute here?
if len(cols) == 0:
return None
df = df.set_index("columns")

corr_mat = df[cols].isnull().corr()
return corr_mat
return df


def missing_bars(df: dd.DataFrame) -> pd.DataFrame:
def missing_heatmap(
nulldf: dd.DataFrame, null_perc: da.Array, cols: np.ndarray
) -> Optional[pd.DataFrame]:
"""
Calculate a bar chart visualization of nullity correlation in the given DataFrame
Calculate a heatmap visualization of nullity correlation in the given DataFrame
"""
nullity_counts = df.isnull().sum() / len(df)
non_nullity_counts = 1 - nullity_counts
df = nullity_counts.to_frame()
df = df.assign(notnull=non_nullity_counts)

df.columns = ["missing", "not missing"]
# Remove completely filled or completely empty variables.
sel = ~((null_perc == 0) | (null_perc == 1))
cols = cols[sel.compute()] # TODO: Can we remove the compute here?
if len(cols) == 0:
return None

return df
corr_mat = nulldf[cols].corr()
return corr_mat


def missing_impact_1vn( # pylint: disable=too-many-locals
@@ -364,7 +422,6 @@ def compute_missing(
y: Optional[str] = None,
*,
bins: int = 30,
ncols: int = 30,
ndist_sample: int = 100,
dtype: Optional[DTypeDef] = None,
) -> Intermediate:
@@ -381,9 +438,7 @@ def compute_missing(
a valid column name of the data frame
y
a valid column name of the data frame
ncols
The number of columns in the figure
bins
bins
The number of rows in the figure
ndist_sample
The number of sample points
@@ -413,16 +468,4 @@ def compute_missing(
df, dtype=dtype, x=x, y=y, bins=bins, ndist_sample=ndist_sample
)
else:
spectrum, total_missing, bars, heatmap = dd.compute(
*missing_spectrum(df, bins=bins, ncols=ncols),
missing_bars(df),
missing_heatmap(df),
)

return Intermediate(
data_total_missing=total_missing,
data_spectrum=spectrum,
data_bars=bars,
data_heatmap=heatmap,
visual_type="missing_impact",
)
return missing_impact(df, bins=bins)
Loading

0 comments on commit d766977

Please sign in to comment.