Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move framework.distribute to env #6022

Merged
merged 39 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
07ec82b
move get_local_rank
luqiang-guo Aug 24, 2021
22b6cff
move get_rank
luqiang-guo Aug 24, 2021
f3d2255
move get_world_size
luqiang-guo Aug 24, 2021
d5e2814
move is_multi_client
luqiang-guo Aug 24, 2021
820c57d
split_sbp
luqiang-guo Aug 24, 2021
2e62b60
delete comment
luqiang-guo Aug 24, 2021
c85c7fe
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 24, 2021
cb6b344
fix error
luqiang-guo Aug 24, 2021
f1e07f3
fix error
luqiang-guo Aug 24, 2021
aa2d7fa
Merge branch 'dev_move_framework.distribute_to_env' of https://github…
luqiang-guo Aug 24, 2021
a1fea45
of_format
luqiang-guo Aug 24, 2021
1dae5d2
of_format
luqiang-guo Aug 24, 2021
706acae
fix error
luqiang-guo Aug 24, 2021
9533fb4
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 24, 2021
9ff18b6
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 24, 2021
4b72527
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 24, 2021
9608df1
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 24, 2021
b3208c2
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 24, 2021
c4dbcaa
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 24, 2021
a9a0f98
merge master
luqiang-guo Aug 25, 2021
014db0b
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 25, 2021
33b256b
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 25, 2021
1208fe6
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 25, 2021
183574e
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 25, 2021
5ccffa0
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 25, 2021
e8e44b6
fix error
luqiang-guo Aug 25, 2021
58250a2
Merge branch 'dev_move_framework.distribute_to_env' of https://github…
luqiang-guo Aug 25, 2021
fb5be84
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 25, 2021
5bc1af5
fix get_rank()()
luqiang-guo Aug 26, 2021
d56a273
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 26, 2021
7787561
fix merge master error
luqiang-guo Aug 26, 2021
1cf26a9
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 26, 2021
87283a1
fix merge error
luqiang-guo Aug 26, 2021
7ba6cde
Merge branch 'dev_move_framework.distribute_to_env' of https://github…
luqiang-guo Aug 26, 2021
12eb69b
Merge branch 'master' into dev_move_framework.distribute_to_env
luqiang-guo Aug 26, 2021
e87db69
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 26, 2021
98b9833
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 26, 2021
aa8a7e6
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 26, 2021
278b126
Merge branch 'master' into dev_move_framework.distribute_to_env
oneflow-ci-bot Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/oneflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ def is_normal_exit(self):
def atexit_hook(hook):
if hook.is_normal_exit():
if oneflow._oneflow_internal.IsEnvInited():
if oneflow.framework.distribute.is_multi_client():
if oneflow.env.is_multi_client():
oneflow._oneflow_internal.eager.multi_client.Sync()
elif oneflow.framework.distribute.get_rank() == 0:
elif oneflow.env.get_rank() == 0:
oneflow._oneflow_internal.eager.single_client.Sync()
oneflow.framework.session_context.TryCloseDefaultSession()
if hook.is_normal_exit():
Expand Down
6 changes: 0 additions & 6 deletions python/oneflow/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,3 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from oneflow.framework.distribute import (
get_local_rank,
get_rank,
get_world_size,
is_multi_client,
)
30 changes: 30 additions & 0 deletions python/oneflow/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,33 @@
from oneflow.framework.env_util import api_logbuflevel as logbuflevel
from oneflow.framework.env_util import api_logtostderr as logtostderr
from oneflow.framework.env_util import api_machine as machine

import oneflow._oneflow_internal


def get_local_rank():
return oneflow._oneflow_internal.GetLocalRank()


def get_rank():
"""Returns the rank of current process group.

Returns:
The rank of the process group.

"""
return oneflow._oneflow_internal.GetRank()


def get_world_size():
"""Returns the number of processes in the current process group.

Returns:
The world size of the process group.

"""
return oneflow._oneflow_internal.GetWorldSize()


def is_multi_client():
return oneflow._oneflow_internal.IsMultiClient()
8 changes: 4 additions & 4 deletions python/oneflow/framework/check_point_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _LoadSingleVariable(
path: Optional[str], consistent_src_rank: Optional[int] = None
) -> "flow.Tensor":
if consistent_src_rank is not None:
rank = flow.framework.distribute.get_rank()
rank = flow.env.get_rank()
if rank == consistent_src_rank:
assert isinstance(path, str)
file_backed_blob = FileBackendVariableBlob(path)
Expand All @@ -124,7 +124,7 @@ def _LoadSingleVariable(


def _broadcast_py_object(obj, src: int = 0):
rank = flow.framework.distribute.get_rank()
rank = flow.env.get_rank()
if src == rank:
obj_bytes = pickle.dumps(obj)
return pickle.loads(flow._oneflow_internal.cpu_broadcast(obj_bytes, src))
Expand All @@ -136,7 +136,7 @@ def Load(
path: str, consistent_src_rank: Optional[int] = None,
) -> Dict[str, "flow.Tensor"]:
assert os.path.isdir(path), "Directory {} doesn't exist!".format(path)
rank = flow.framework.distribute.get_rank()
rank = flow.env.get_rank()
var_dict = {}
if consistent_src_rank is None or rank == consistent_src_rank:
all_files = os.listdir(path)
Expand Down Expand Up @@ -169,7 +169,7 @@ def save(
not var.is_consistent
), f"local tensor is needed, but {name} is a consistent tensor"

rank = flow.framework.distribute.get_rank()
rank = flow.env.get_rank()
if consistent_mode and rank != consistent_dst_rank:
return

Expand Down
28 changes: 0 additions & 28 deletions python/oneflow/framework/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,6 @@
import oneflow._oneflow_internal


def get_local_rank():
return oneflow._oneflow_internal.GetLocalRank()


def get_rank():
"""Returns the rank of current process group.

Returns:
The rank of the process group.

"""
return oneflow._oneflow_internal.GetRank()


def get_world_size():
"""Returns the number of processes in the current process group.

Returns:
The world size of the process group.

"""
return oneflow._oneflow_internal.GetWorldSize()


def is_multi_client():
return oneflow._oneflow_internal.IsMultiClient()


def split_sbp(axis: int) -> oneflow._oneflow_internal.sbp.sbp:
"""Generate a split scheme in which op will be splitted at `axis`.

Expand Down
6 changes: 3 additions & 3 deletions python/oneflow/framework/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def node_size():


def has_world_size():
if oneflow.distributed.is_multi_client():
if oneflow.env.is_multi_client():
return True
if os.getenv("ONEFLOW_TEST_WORLD_SIZE"):
assert os.getenv(
Expand All @@ -125,8 +125,8 @@ def has_world_size():


def world_size():
if oneflow.distributed.is_multi_client():
return oneflow.distributed.get_world_size()
if oneflow.env.is_multi_client():
return oneflow.env.get_world_size()
return int(os.getenv("ONEFLOW_TEST_WORLD_SIZE"))


Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/nn/graph/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import oneflow._oneflow_internal
import oneflow.framework.graph_build_util as graph_build_util
from oneflow.framework.distribute import get_rank
from oneflow.env import get_rank
from oneflow.framework.tensor import Tensor, TensorTuple
from oneflow.nn.module import Module
from oneflow.nn.parameter import Parameter
Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/nn/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import oneflow.framework.c_api_util as c_api_util
import oneflow.framework.graph_build_util as graph_build_util
import oneflow.framework.session_context as session_ctx
from oneflow.framework.distribute import get_rank
from oneflow.env import get_rank
from oneflow.framework.tensor import Tensor, TensorTuple
from oneflow.framework.multi_client_session import MultiClientSession
from oneflow.framework.tensor_tuple_util import convert_to_tensor_tuple
Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/nn/modules/all_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ def __init__(self, parallel_conf_str: str):

def forward(self, x):
assert x.device.type == "cuda"
assert x.device.index == flow.framework.distribute.get_local_rank()
assert x.device.index == flow.env.get_local_rank()
return self._op(x)[0]
2 changes: 1 addition & 1 deletion python/oneflow/nn/parallel/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def allreduce(grad):
def DistributedDataParallel(
module: "flow.nn.Module", *, broadcast_buffers: bool = True
):
world_size = flow.distributed.get_world_size()
world_size = flow.env.get_world_size()
with flow.no_grad():
for x in module.parameters():
requires_grad = x.requires_grad
Expand Down
10 changes: 5 additions & 5 deletions python/oneflow/test/graph/test_graph_asymmetric_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def __init__(self):
flow.nn.init.constant_(self.linear2.weight, 2.3)

def forward(self, x, y):
# print("local_x in rank : ", flow.distributed.get_rank(), " is : ", x)
# print("local_y in rank : ", flow.distributed.get_rank(), " is : ", y)
# print("local_x in rank : ", flow.env.get_rank(), " is : ", x)
# print("local_y in rank : ", flow.env.get_rank(), " is : ", y)
out0 = x + y
out1 = self.linear1(out0)
out2 = self.linear2(out1)
Expand Down Expand Up @@ -92,11 +92,11 @@ def build(self, x, y):
graph_local_out = graph_out.to_local()
# NOTE(chengcheng): MUST call for each rank sync correct input copy
graph_local_out_np = graph_local_out.numpy()
# print("graph_local_out in rank ", flow.distributed.get_rank(), " is : ", graph_local_out)
if flow.distributed.get_rank() == 0:
# print("graph_local_out in rank ", flow.env.get_rank(), " is : ", graph_local_out)
if flow.env.get_rank() == 0:
test_case.assertTrue(graph_local_out.shape.numel() == 0)
test_case.assertTrue(graph_local_out_np.size == np.array([]).size)
elif flow.distributed.get_rank() == 1:
elif flow.env.get_rank() == 1:
test_case.assertTrue(
np.allclose(
graph_local_out.numpy(), local_out.numpy(), atol=1e-4, rtol=1e-4
Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/test/graph/test_input_op_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@flow.unittest.skip_unless_1n1d()
class TestFeedInputTensor(unittest.TestCase):
def test_feed_input_tensor(test_case):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(oneflow.env.is_multi_client())
test_case.assertTrue(oneflow.framework.env_util.HasAllMultiClientEnvVars())
x = flow.Tensor(1, 1, 10, 10)
flow.nn.init.uniform_(x, a=-1.0, b=1.0)
Expand Down
4 changes: 2 additions & 2 deletions python/oneflow/test/graph/test_multi_client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
@flow.unittest.skip_unless_1n1d()
class TestMultiClientSession(unittest.TestCase):
def test_case1(self):
self.assertTrue(flow.distributed.is_multi_client())
self.assertTrue(flow.env.is_multi_client())
sess = session_ctx.GetDefaultSession()
self.assertTrue(isinstance(sess, MultiClientSession))
sess.TryInit()
self.assertEqual(sess.status, sess.Status.INITED)

def test_case2(self):
print("test_case2")
self.assertTrue(flow.distributed.is_multi_client())
self.assertTrue(flow.env.is_multi_client())
sess = session_ctx.GetDefaultSession()
self.assertTrue(isinstance(sess, MultiClientSession))
sess.TryInit()
Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/test/graph/test_output_op_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@flow.unittest.skip_unless_1n1d()
class TestFetchOutputTensor(unittest.TestCase):
def test_fetch_output_tensor(test_case):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(oneflow.env.is_multi_client())
test_case.assertTrue(oneflow.framework.env_util.HasAllMultiClientEnvVars())
x = flow.Tensor(1, 1, 10, 10)
flow.nn.init.uniform_(x, a=-1.0, b=1.0)
Expand Down
8 changes: 4 additions & 4 deletions python/oneflow/test/graph/test_to_consistent.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class ToConsistentGraphTestCase(oneflow.unittest.TestCase):
def test_fwd_P2B(test_case):
""" compare eager fwd and lazy bwd
"""
rank = flow.distributed.get_rank()
rank = flow.env.get_rank()
# pid = os.getpid()
# print(f"[{pid}][{rank}] ToConsistentGraphTestCase.test_fwd_P2B")

Expand Down Expand Up @@ -230,7 +230,7 @@ def test_fwd_P2B(test_case):
def test_bwd_P2B(test_case):
""" compare eager bwd and lazy bwd
"""
rank = flow.distributed.get_rank()
rank = flow.env.get_rank()
# pid = os.getpid()
# print(f"[{pid}][{rank}] ToConsistentGraphTestCase.test_bwd_P2B")

Expand Down Expand Up @@ -275,7 +275,7 @@ def test_bwd_P2B(test_case):
def test_multi_graph(test_case):
""" compare two lazy fwd
"""
rank = flow.distributed.get_rank()
rank = flow.env.get_rank()
# pid = os.getpid()
# print(f"[{pid}][{rank}] ToConsistentGraphTestCase.test_multi_graph")

Expand Down Expand Up @@ -370,7 +370,7 @@ def test_free_tensor_to_consistent(test_case):

# @unittest.skipIf(True, "")
def test_to_placement(test_case):
rank = flow.distributed.get_rank()
rank = flow.env.get_rank()
# pid = os.getpid()
# print(f"[{pid}][{rank}] ToConsistentGraphTestCase.test_to_placement")

Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/test/graph/test_user_op_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _get_c_tensor(t):


def _test_user_op_graph(test_case, is_cuda):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(oneflow.env.is_multi_client())
test_case.assertTrue(oneflow.framework.env_util.HasAllMultiClientEnvVars())

x0 = flow.tensor(np.random.rand(20, 30), dtype=flow.float32)
Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/test/graph/test_variable_op_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@flow.unittest.skip_unless_1n1d()
class TestFeedVariableTensor(unittest.TestCase):
def test_feed_var_tensor(test_case):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(oneflow.env.is_multi_client())
test_case.assertTrue(oneflow.framework.env_util.HasAllMultiClientEnvVars())
x = flow.Tensor(1, 1, 10, 10)
flow.nn.init.uniform_(x, a=-1.0, b=1.0)
Expand Down
6 changes: 3 additions & 3 deletions python/oneflow/test/modules/test_allreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class TestAllReduce(flow.unittest.TestCase):
def test_all_reduce(test_case):
arr_rank1 = np.array([1, 2])
arr_rank2 = np.array([3, 4])
if flow.distributed.get_rank() == 0:
if flow.env.get_rank() == 0:
x = flow.Tensor(arr_rank1)
elif flow.distributed.get_rank() == 1:
elif flow.env.get_rank() == 1:
x = flow.Tensor(arr_rank2)
else:
raise ValueError
Expand All @@ -41,7 +41,7 @@ def test_all_reduce(test_case):
@flow.unittest.skip_unless_2n2d()
def test_all_reduce_2nodes(test_case):
np_arr = np.array([1, 2])
x = flow.Tensor(np_arr * (flow.distributed.get_rank() + 1))
x = flow.Tensor(np_arr * (flow.env.get_rank() + 1))
x = x.to("cuda")
y = flow.F.all_reduce(x)
test_case.assertTrue(np.allclose(y.numpy(), np_arr * 10))
Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/test/modules/test_coco_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def build(self):
@flow.unittest.skip_unless_1n2d()
class COCODataLoaderDistributedTestCase(oneflow.unittest.TestCase):
def test_case1(test_case):
rank = flow.distributed.get_rank()
rank = flow.env.get_rank()
# pid = os.getpid()
# print(f"[{pid}][{rank}] COCODataLoaderDistributedTestCase.test_case1")

Expand Down
Loading