Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed sklearn kmeans based on ray and modin #429

Closed

Conversation

PivovarA
Copy link
Contributor

  • Added one_ccl transceiver.
  • Rewrote dist_kmeans.h. The spark sample and daal4py used map_tree, but it is based on send and receive operations, which oneccl not support yet. oneDAL KMeans MPI sample didn't use map_tree, so I used it as a basis.
  • Added ray_partition_actor and ray context. At the moment ray_context is basic and serves more to extract some information from the ray cluster. This functionality is planned to be expanded in the future.
  • Added to build.sh the ability to work with oneccl. At the moment, for correct operation, you need a oneccl build: https://github.com/oneapi-src/oneCCL/tree/2021.1-beta07-1
    And also set the variable CCLROOT. In addition, it needs to use oneccl_transceiver:
    export D4P_TRANSCEIVER = oneccl_transceiver

import modin.pandas as pd


@ray.remote(num_cpus=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about modin on dask?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unwrapping partitions of a DataFrame works for dask engine as well. Some changes are needed to do here. It looks like it is feasible. Is it planned to be as part of this PR or another?

Comment on lines 25 to 28
using namespace std;
using namespace daal;
using namespace daal::algorithms;
using namespace daal::data_management;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use multiple and nested namespaces. Spell them out where used.

// reduce all partial results
auto pres = map_reduce_tree::map_reduce_tree<Algo>::reduce(algo, s1_result);
// finalize and check convergence/end of iteration
auto res = tcvr->gather(s1_result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👎 Tree-reduce is more efficient.

Comment on lines +1 to +41
#
#*******************************************************************************
# Copyright 2014-2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#******************************************************************************/

import pandas

class RowPartitionsActor:
def __init__(self, node):
self.row_parts_ = []
self.node_ = node

def set_row_parts(self, *row_parts):
self.row_parts_ = pandas.concat(list(row_parts), axis=0)

def set_row_parts_list(self, *row_parts_list):
self.row_parts_ = list(row_parts_list)

def append_row_part(self, row_part):
self.row_parts_.append(row_part)

def concat_row_parts(self):
self.row_parts_ = pandas.concat(self.row_parts_, axis=0)

def get_row_parts(self):
return self.row_parts_

def get_actor_ip(self):
return self.node_
Copy link
Contributor

@fschlimb fschlimb Dec 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this here and in ray?

# See the License for the specific language governing permissions and
# limitations under the License.
#******************************************************************************/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add a brief description of what this file contains and how this works. 2.3 lines suffice.

# See the License for the specific language governing permissions and
# limitations under the License.
#******************************************************************************/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add a brief description of what this file contains and how this works. 2.3 lines suffice.

# See the License for the specific language governing permissions and
# limitations under the License.
#******************************************************************************/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add a brief description of what this file contains and how this works. 2.3 lines suffice.
Pls also add docstrings.

Copy link
Contributor

@fschlimb fschlimb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the actor and modin code should be pushed down to daal4py itself. I see no reason why this should only be available for sklearn. When in daal4py itself, sklearn is automatically supported.
Also, a merge requires a complete CCL transceiver.
With the above we'd have a support for modin in all distributed algos.

Comment on lines +66 to +70
row_parts_last_idx = (
len(row_partitions) // num_nodes
if len(row_partitions) % num_nodes == 0
else len(row_partitions) // num_nodes + 1
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
row_parts_last_idx = (
len(row_partitions) // num_nodes
if len(row_partitions) % num_nodes == 0
else len(row_partitions) // num_nodes + 1
)
row_parts_last_idx = (
len(row_partitions) // num_nodes
if len(row_partitions) % num_nodes < num_nodes // 2 + 1
else len(row_partitions) // num_nodes + 1
)

In case .. == 0 situation is possible, when nodes will be empty (11 partitions and 5 actors, for example).

Comment on lines +73 to +81
for actor in actors:
actor.set_row_parts(
[r.result() for r in row_partitions[
slice(i, i + row_parts_last_idx)
if i + row_parts_last_idx < len(row_partitions)
else slice(i, len(row_partitions))
]]
)
i += row_parts_last_idx

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to change slices according to comment above.

Comment on lines +67 to +83
row_parts_last_idx = (
len(row_partitions) // num_nodes
if len(row_partitions) % num_nodes == 0
else len(row_partitions) // num_nodes + 1
)

i = 0
for actor in actors:
actor.set_row_parts._remote(
args=(
row_partitions[
slice(i, i + row_parts_last_idx)
if i + row_parts_last_idx < len(row_partitions)
else slice(i, len(row_partitions))
])
)
i += row_parts_last_idx

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as for dask.

actors.append(dask_client.submit(RowPartitionsActor, None, workers=set([ip]), actor=True))
actors = [actor.result() for actor in actors]

row_partitions = unwrap_row_partitions(X)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...(X, axis=0)

actors
), f"number of nodes {num_nodes} is not equal to number of actors {len(actors)}"

row_partitions = unwrap_row_partitions(X)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...(X, axis=0)

Copy link

@YarShev YarShev Feb 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrap_row_partitions is old and not actual function. It should be replaced to unwrap_partitions.

@ethanglaser
Copy link
Contributor

Out of date with repository - if continued work on this is desired a new ticket+PR can be opened

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants