Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pandas.compat import string_types
from pandas.core.dtypes.cast import find_common_type
from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like)
from pandas.core.index import _ensure_index

from .partitioning.partition_collections import BlockPartitions, RayBlockPartitions
from .partitioning.remote_partition import RayRemotePartition
Expand All @@ -26,7 +27,7 @@ def __init__(self, block_partitions_object, index, columns, dtypes=None):
if dtypes is not None:
self._dtype_cache = dtypes

# dtypes
# Index, columns and dtypes objects
_dtype_cache = None

def _get_dtype(self):
Expand All @@ -45,37 +46,41 @@ def _set_dtype(self, dtypes):

dtypes = property(_get_dtype, _set_dtype)

# Index and columns objects
# These objects are currently not distributed.
# Note: These are more performant as pandas.Series objects than they are as
# pandas.DataFrame objects.
#
# _index_cache is a pandas.Series that holds the index
_index_cache = None
# _columns_cache is a pandas.Series that holds the columns
_columns_cache = None

def _get_index(self):
return self._index_cache.index
return self._index_cache

def _get_columns(self):
return self._columns_cache.index
return self._columns_cache

def _validate_set_axis(self, new_labels, old_labels):
new_labels = _ensure_index(new_labels)
old_len = len(old_labels)
new_len = len(new_labels)
if old_len != new_len:
raise ValueError('Length mismatch: Expected axis has %d elements, '
'new values have %d elements' % (old_len, new_len))
return new_labels

def _set_index(self, new_index):
if self._index_cache is not None:
self._index_cache.index = new_index
if self._index_cache is None:
self._index_cache = _ensure_index(new_index)
else:
self._index_cache = pandas.Series(index=new_index)
new_index = self._validate_set_axis(new_index, self._index_cache)
self._index_cache = new_index

def _set_columns(self, new_columns):
if self._columns_cache is not None:
self._columns_cache.index = new_columns
if self._columns_cache is None:
self._columns_cache = _ensure_index(new_columns)
else:
self._columns_cache = pandas.Series(index=new_columns)
new_columns = self._validate_set_axis(new_columns, self._columns_cache)
self._columns_cache = new_columns

columns = property(_get_columns, _set_columns)
index = property(_get_index, _set_index)

# END Index, columns, and dtypes objects

def compute_index(self, axis, data_object, compute_diff=True):
Expand Down Expand Up @@ -1011,20 +1016,20 @@ def head(self, n):
# ensure that we extract the correct data on each node. The index
# on a transposed manager is already set to the correct value, so
# we need to only take the head of that instead of re-transposing.
result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self.dtypes)
result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self._dtype_cache)
result._is_transposed = True
else:
result = cls(self.data.take(0, n), self.index[:n], self.columns, self.dtypes)
result = cls(self.data.take(0, n), self.index[:n], self.columns, self._dtype_cache)
return result

def tail(self, n):
cls = type(self)
# See head for an explanation of the transposed behavior
if self._is_transposed:
result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self.dtypes)
result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self._dtype_cache)
result._is_transposed = True
else:
result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self.dtypes)
result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self._dtype_cache)
return result

def front(self, n):
Expand Down
45 changes: 43 additions & 2 deletions modin/data_management/partitioning/partition_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import print_function

import numpy as np
import ray
import pandas

from .remote_partition import RayRemotePartition
Expand Down Expand Up @@ -99,7 +100,7 @@ def block_lengths(self):
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same length in a
# row of blocks.
self._lengths_cache = [obj.length for obj in self.partitions.T[0]]
self._lengths_cache = [obj.length().get() for obj in self.partitions.T[0]]
return self._lengths_cache

# Widths of the blocks
Expand All @@ -116,7 +117,7 @@ def block_widths(self):
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same width in a
# column of blocks.
Copy link

@osalpekar osalpekar Sep 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe width and length should be functions instead of an attributes?

self._widths_cache = [obj.width for obj in self.partitions[0]]
self._widths_cache = [obj.width().get() for obj in self.partitions[0]]
return self._widths_cache

def full_reduce(self, map_func, reduce_func, axis):
Expand Down Expand Up @@ -672,6 +673,9 @@ def __getitem__(self, key):
cls = type(self)
return cls(self.partitions[key])

def __len__(self):
return sum(self.block_lengths)


class RayBlockPartitions(BlockPartitions):
"""This method implements the interface in `BlockPartitions`."""
Expand All @@ -682,6 +686,43 @@ class RayBlockPartitions(BlockPartitions):
def __init__(self, partitions):
self.partitions = partitions

# We override these for performance reasons.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the critical change that made it work?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that’s the case, can we drop block partitions implementation all together?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. we were doing a ray.get each time which was scaling poorly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cache the remote function?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually cache the lengths themselves with this structure. We also cache the object id's from the lengths right now so we can do a ray.get on them also. So to answer your question, we already do cache the result of the remote function.

# Lengths of the blocks
_lengths_cache = None

# These are set up as properties so that we only use them when we need
# them. We also do not want to trigger this computation on object creation.
@property
def block_lengths(self):
"""Gets the lengths of the blocks.

Note: This works with the property structure `_lengths_cache` to avoid
having to recompute these values each time they are needed.
"""
if self._lengths_cache is None:
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same length in a
# row of blocks.
self._lengths_cache = ray.get([obj.length().oid for obj in self.partitions.T[0]])
return self._lengths_cache

# Widths of the blocks
_widths_cache = None

@property
def block_widths(self):
"""Gets the widths of the blocks.

Note: This works with the property structure `_widths_cache` to avoid
having to recompute these values each time they are needed.
"""
if self._widths_cache is None:
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same width in a
# column of blocks.
self._widths_cache = ray.get([obj.width().oid for obj in self.partitions[0]])
return self._widths_cache

@property
def column_partitions(self):
"""A list of `RayColumnPartition` objects."""
Expand Down
6 changes: 2 additions & 4 deletions modin/data_management/partitioning/remote_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,22 @@ def width_extraction_fn(cls):
_length_cache = None
_width_cache = None

@property
def length(self):
if self._length_cache is None:
cls = type(self)
func = cls.length_extraction_fn()
preprocessed_func = cls.preprocess_func(func)

self._length_cache = self.apply(preprocessed_func).get()
self._length_cache = self.apply(preprocessed_func)
return self._length_cache

@property
def width(self):
if self._width_cache is None:
cls = type(self)
func = cls.width_extraction_fn()
preprocessed_func = cls.preprocess_func(func)

self._width_cache = self.apply(preprocessed_func).get()
self._width_cache = self.apply(preprocessed_func)
return self._width_cache


Expand Down
Loading