Skip to content

[fleet_executor] handle empty addr for single card train #37150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,30 @@ void FleetExecutor::InitMessageBus() {
std::unordered_map<int64_t, std::string> rank_to_addr;
std::string addr;
for (const auto& rank_info : exe_desc_.cluster_info()) {
// init the dns map
int64_t rank = rank_info.rank();
std::string ip_port = rank_info.ip_port();
ss << rank << "\t->\t" << ip_port << "\n";
// TODO(Yuang): replace the first 'rank' with real interceptor id
// TODO(Yuang): init interceptor_id_to_rank out of this loop
interceptor_id_to_rank.insert(std::make_pair(rank, rank));
rank_to_addr.insert(std::make_pair(rank, ip_port));
if (rank == cur_rank) {
addr = ip_port;
}
}
PADDLE_ENFORCE_NE(
addr, "",
platform::errors::NotFound(
"Current rank is %s, which ip_port cannot be found in the config.",
cur_rank));
VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is " << addr
<< ".";
VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << ".";
if (addr == "") {
PADDLE_ENFORCE_EQ(
rank_to_addr.size(), 0,
platform::errors::NotFound("Empty address is not valid for "
"paddle.distributed.launch method."));
PADDLE_ENFORCE_EQ(
cur_rank, 0,
platform::errors::NotFound("Address is empty but cur rank is not 0."));
}
VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is "
<< (addr == "" ? "empty" : addr) << ".";
VLOG(3) << "The number of ranks are "
<< (rank_to_addr.size() == 0 ? 1 : rank_to_addr.size()) << ".";
VLOG(5) << ss.str();
MessageBus& message_bus_instance = MessageBus::Instance();
if (!message_bus_instance.IsInit()) {
Expand Down
8 changes: 8 additions & 0 deletions paddle/fluid/distributed/fleet_executor/message_bus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) {
}

void MessageBus::ListenPort() {
if (addr_ == "") {
VLOG(3) << "No need listen to port since training on single card.";
return;
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
// function keep listen the port and handle the message
Expand Down Expand Up @@ -125,6 +129,10 @@ bool MessageBus::IsSameRank(int64_t src_id, int64_t dst_id) {
dst_rank, interceptor_id_to_rank_.end(),
platform::errors::NotFound(
"Cannot find rank for dst interceptor id %lld. Init error.", dst_id));
if (addr_ == "") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以优化一下,应该不需要判断addr是否相同,直接判断src_rank == dst_rank就行

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO:
1.Remove the src_id's ip check, or mote to other place.
2.Remove this check since it does the same thing with line 147.

// single card training, must be same rank
return true;
}
const auto& src_ip = rank_to_addr_.find(src_rank->second);
PADDLE_ENFORCE_NE(src_ip, rank_to_addr_.end(),
platform::errors::NotFound(
Expand Down
1 change: 1 addition & 0 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_fleet_gradient_scale)
LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler)
LIST(REMOVE_ITEM TEST_OPS test_fleet_executor)
LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_multi_devices)
endif()

# Temporally disable test_deprecated_decorator
Expand Down
13 changes: 1 addition & 12 deletions python/paddle/fluid/tests/unittests/test_fleet_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import unittest
import os
import paddle
import paddle.fluid as fluid

Expand All @@ -32,17 +31,7 @@ def run_fleet_executor(self, place):
}
exe.run(empty_program, feed={'x': [1]})

def test_executor_on_multi_devices(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for place in places:
self.run_fleet_executor(place)

def test_dist_executor_on_multi_devices(self):
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"
def test_executor_on_single_device(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import os
import paddle
import paddle.fluid as fluid

paddle.enable_static()


class TestFleetExecutor(unittest.TestCase):
def run_fleet_executor(self, place):
exe = paddle.static.Executor(place)
empty_program = paddle.static.Program()
with fluid.program_guard(empty_program, empty_program):
x = fluid.layers.data(name='x', shape=[1], dtype=paddle.float32)
empty_program._pipeline_opt = {
"fleet_opt": True,
"section_program": empty_program
}
exe.run(empty_program, feed={'x': [1]})

def test_dist_executor_on_multi_devices(self):
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for place in places:
self.run_fleet_executor(place)


if __name__ == "__main__":
unittest.main()