-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed sklearn kmeans based on ray and modin #429
Conversation
import modin.pandas as pd | ||
|
||
|
||
@ray.remote(num_cpus=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about modin on dask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unwrapping partitions of a DataFrame
works for dask engine as well. Some changes are needed to do here. It looks like it is feasible. Is it planned to be as part of this PR or another?
src/dist_kmeans.h
Outdated
using namespace std; | ||
using namespace daal; | ||
using namespace daal::algorithms; | ||
using namespace daal::data_management; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use multiple and nested namespaces. Spell them out where used.
// reduce all partial results | ||
auto pres = map_reduce_tree::map_reduce_tree<Algo>::reduce(algo, s1_result); | ||
// finalize and check convergence/end of iteration | ||
auto res = tcvr->gather(s1_result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👎 Tree-reduce is more efficient.
# | ||
#******************************************************************************* | ||
# Copyright 2014-2020 Intel Corporation | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
#******************************************************************************/ | ||
|
||
import pandas | ||
|
||
class RowPartitionsActor: | ||
def __init__(self, node): | ||
self.row_parts_ = [] | ||
self.node_ = node | ||
|
||
def set_row_parts(self, *row_parts): | ||
self.row_parts_ = pandas.concat(list(row_parts), axis=0) | ||
|
||
def set_row_parts_list(self, *row_parts_list): | ||
self.row_parts_ = list(row_parts_list) | ||
|
||
def append_row_part(self, row_part): | ||
self.row_parts_.append(row_part) | ||
|
||
def concat_row_parts(self): | ||
self.row_parts_ = pandas.concat(self.row_parts_, axis=0) | ||
|
||
def get_row_parts(self): | ||
return self.row_parts_ | ||
|
||
def get_actor_ip(self): | ||
return self.node_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this here and in ray?
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
#******************************************************************************/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls add a brief description of what this file contains and how this works. 2.3 lines suffice.
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
#******************************************************************************/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls add a brief description of what this file contains and how this works. 2.3 lines suffice.
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
#******************************************************************************/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls add a brief description of what this file contains and how this works. 2.3 lines suffice.
Pls also add docstrings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the actor and modin code should be pushed down to daal4py itself. I see no reason why this should only be available for sklearn. When in daal4py itself, sklearn is automatically supported.
Also, a merge requires a complete CCL transceiver.
With the above we'd have a support for modin in all distributed algos.
row_parts_last_idx = ( | ||
len(row_partitions) // num_nodes | ||
if len(row_partitions) % num_nodes == 0 | ||
else len(row_partitions) // num_nodes + 1 | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row_parts_last_idx = ( | |
len(row_partitions) // num_nodes | |
if len(row_partitions) % num_nodes == 0 | |
else len(row_partitions) // num_nodes + 1 | |
) | |
row_parts_last_idx = ( | |
len(row_partitions) // num_nodes | |
if len(row_partitions) % num_nodes < num_nodes // 2 + 1 | |
else len(row_partitions) // num_nodes + 1 | |
) |
In case .. == 0
situation is possible, when nodes will be empty (11 partitions and 5 actors, for example).
for actor in actors: | ||
actor.set_row_parts( | ||
[r.result() for r in row_partitions[ | ||
slice(i, i + row_parts_last_idx) | ||
if i + row_parts_last_idx < len(row_partitions) | ||
else slice(i, len(row_partitions)) | ||
]] | ||
) | ||
i += row_parts_last_idx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to change slices according to comment above.
row_parts_last_idx = ( | ||
len(row_partitions) // num_nodes | ||
if len(row_partitions) % num_nodes == 0 | ||
else len(row_partitions) // num_nodes + 1 | ||
) | ||
|
||
i = 0 | ||
for actor in actors: | ||
actor.set_row_parts._remote( | ||
args=( | ||
row_partitions[ | ||
slice(i, i + row_parts_last_idx) | ||
if i + row_parts_last_idx < len(row_partitions) | ||
else slice(i, len(row_partitions)) | ||
]) | ||
) | ||
i += row_parts_last_idx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same as for dask.
actors.append(dask_client.submit(RowPartitionsActor, None, workers=set([ip]), actor=True)) | ||
actors = [actor.result() for actor in actors] | ||
|
||
row_partitions = unwrap_row_partitions(X) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...(X, axis=0)
actors | ||
), f"number of nodes {num_nodes} is not equal to number of actors {len(actors)}" | ||
|
||
row_partitions = unwrap_row_partitions(X) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...(X, axis=0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap_row_partitions
is old and not actual function. It should be replaced to unwrap_partitions
.
Out of date with repository - if continued work on this is desired a new ticket+PR can be opened |
And also set the variable CCLROOT. In addition, it needs to use oneccl_transceiver:
export D4P_TRANSCEIVER = oneccl_transceiver