Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RankTaskGraph #9108

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
560a511
implement RankTaskGraph
lixinqi Sep 19, 2022
51034f0
RankCompiler
lixinqi Sep 19, 2022
3736494
fix compiler complaints
lixinqi Sep 20, 2022
5807882
CompTaskNode::ConsumeFakeRegsts
lixinqi Sep 20, 2022
c07ea5e
TransportTaskProto::lbi
lixinqi Sep 20, 2022
1fd10ef
makes sure all ranks know all var_op_names
lixinqi Sep 22, 2022
74c96df
RankTaskGraph::ForEachDutyRank
lixinqi Sep 22, 2022
e89143b
PortableCtrlEdge
lixinqi Sep 23, 2022
1b10509
compile in MultiThreadLoop
lixinqi Sep 23, 2022
44bf12b
CompileMode
lixinqi Sep 23, 2022
bd50bc7
rebuild new_task_id_ before ProduceRegst
lixinqi Sep 26, 2022
7853956
RankTaskGraph::InitRegstDescsConsumers()
lixinqi Sep 26, 2022
b725318
PlanUtil::GenReachableTaskPairs
lixinqi Sep 27, 2022
45bc629
disable checking consumer_task_regst_desc_id_size
lixinqi Sep 27, 2022
3c4ea9d
TaskNode::InitConsumedRegstsFromProto
lixinqi Sep 27, 2022
9880ba4
remove RegstDesc::InitConsumersFromProto
lixinqi Sep 27, 2022
20175fc
refactor CompTaskNode::ConsumeFakeRegstsIf
lixinqi Sep 27, 2022
fbff274
refactor CompTaskNode::ConsumeFakeRegsts
lixinqi Sep 27, 2022
ede3cd2
remove Plan::fake_consumed_regst_desc_id
lixinqi Sep 27, 2022
3ba45e5
revert part of code in job/plan_util.cpp
lixinqi Sep 28, 2022
2e9ab1a
refacotr ParallelDesc::TryGetParallelId
lixinqi Sep 28, 2022
93a7947
cut boxing_task_graph by rank
lixinqi Sep 29, 2022
818d14d
make sure TaskIdGenerator::Generator is thread safe
lixinqi Oct 8, 2022
8ca22bf
atomic<int64_t> mem_block_id
lixinqi Oct 8, 2022
2adbb13
chunk id add lock
strint Oct 8, 2022
ccf9bea
get chunk proto with lock
strint Oct 9, 2022
2c577df
create chunk with lock
strint Oct 9, 2022
fa49459
mutable std::mutex
lixinqi Oct 9, 2022
a4e67b0
Rank task graph merge master (#9440)
strint Nov 22, 2022
7d69c25
fix conflict
strint Nov 22, 2022
d4782a7
auto format by CI
oneflow-ci-bot Nov 22, 2022
eb76987
fix conflict
strint Nov 22, 2022
bb9e65e
Merge branch 'rank_task_graph' of https://github.com/Oneflow-Inc/onef…
strint Nov 22, 2022
6b575fc
fix conflict
strint Nov 22, 2022
13ba2ac
auto format by CI
oneflow-ci-bot Nov 22, 2022
92face0
fix conflict
strint Nov 22, 2022
1b2edca
fix
strint Nov 22, 2022
3910af6
address pr comments
lixinqi Nov 24, 2022
a37f9f8
fix bug
strint Dec 13, 2022
132a8a7
Rank task graph fix (#9749)
strint Feb 28, 2023
f1352e6
rm useless
strint Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ if(BUILD_PROFILER)
add_definitions(-DOF_ENABLE_PROFILER)
endif()

if(BUILD_DEBUG_LAZY_RUNTIME)
add_definitions(-DOF_DEBUG_LAZY_RUNTIME)
endif()

if(OF_SOFTMAX_USE_FAST_MATH)
add_definitions(-DOF_SOFTMAX_USE_FAST_MATH)
endif()
Expand Down
29 changes: 29 additions & 0 deletions oneflow/core/common/async_deallocate_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/common/async_deallocate_context.h"
#include "oneflow/core/thread/thread_pool.h"

namespace oneflow {

AsyncDeallocateContext::AsyncDeallocateContext() : thread_pool_(std::make_unique<ThreadPool>(1)) {}

AsyncDeallocateContext::~AsyncDeallocateContext() {}

void AsyncDeallocateContext::LazyDeallocate(std::function<void()> LazyDeallocator) {
thread_pool_->AddWork(LazyDeallocator);
}

} // namespace oneflow
38 changes: 38 additions & 0 deletions oneflow/core/common/async_deallocate_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_COMMON_ASYNC_DEALLOCATE_CONTEXT_H_
#define ONEFLOW_CORE_COMMON_ASYNC_DEALLOCATE_CONTEXT_H_

#include "oneflow/core/common/deallocate_context.h"

namespace oneflow {

class ThreadPool;

class AsyncDeallocateContext final : public DeallocateContext {
public:
AsyncDeallocateContext();
~AsyncDeallocateContext() override;

void LazyDeallocate(std::function<void()> LazyDeallocator) override;

private:
std::shared_ptr<ThreadPool> thread_pool_;
};

} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_ASYNC_DEALLOCATE_CONTEXT_H_
26 changes: 26 additions & 0 deletions oneflow/core/common/balanced_splitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ BalancedSplitter::BalancedSplitter(int64_t total_num, int64_t split_num) {
base_part_size_ = total_num / split_num;
base_begin_idx_ = total_num % split_num;
split_num_ = split_num;
CHECK_EQ(this->total_num(), total_num);
}

int64_t BalancedSplitter::total_num() const { return At(split_num_ - 1).end(); }

Range BalancedSplitter::At(int64_t idx) const {
CHECK_LT(idx, split_num_);
int64_t left_bound = -1;
Expand All @@ -46,4 +49,27 @@ Range BalancedSplitter::At(int64_t first_idx, int64_t last_idx) const {
return Range(first_range.begin(), last_range.end());
}

int64_t BalancedSplitter::RecursiveBinarySearchIndex(int64_t value) const {
CHECK_GE(value, 0);
CHECK_LT(value, total_num());
return RecursiveBinarySearchIndex(value, 0, split_num_);
}

int64_t BalancedSplitter::RecursiveBinarySearchIndex(int64_t value, int begin_idx,
int end_idx) const {
CHECK_LT(begin_idx, end_idx);
if (begin_idx + 1 == end_idx) {
Range ret = At(begin_idx);
CHECK_GE(value, ret.begin());
CHECK_LT(value, ret.end());
return begin_idx;
}
int middle_idx = (begin_idx + end_idx) / 2;
if (value >= At(middle_idx).begin()) {
return RecursiveBinarySearchIndex(value, middle_idx, end_idx);
} else {
return RecursiveBinarySearchIndex(value, begin_idx, middle_idx);
}
}

} // namespace oneflow
6 changes: 6 additions & 0 deletions oneflow/core/common/balanced_splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ class BalancedSplitter final {
Range At(int64_t idx) const;
Range At(int64_t first_idx, int64_t last_idx) const;

int64_t RecursiveBinarySearchIndex(int64_t value) const;
int64_t total_num() const;

private:
// search in [begin_idx, end_idx)
int64_t RecursiveBinarySearchIndex(int64_t value, int begin_idx, int end_idx) const;

int64_t base_part_size_;
int64_t base_begin_idx_;
int64_t split_num_;
Expand Down
13 changes: 13 additions & 0 deletions oneflow/core/common/balanced_splitter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,17 @@ TEST(BalancedSplitter, split_2_to_3_part) {
ASSERT_TRUE(splitter.At(2) == Range(2, 2));
}

TEST(BalancedSplitter, RecursiveBinarySearchIndex) {
const size_t total_num = 937;
const size_t split_num = 11;
BalancedSplitter bs(total_num, split_num);
ASSERT_TRUE(bs.total_num() == total_num);
for (size_t i = 0; i < split_num; ++i) {
Range range = bs.At(i);
for (size_t value = range.begin(); value < range.end(); ++value) {
ASSERT_TRUE(bs.RecursiveBinarySearchIndex(value) == i);
}
}
}

} // namespace oneflow
50 changes: 50 additions & 0 deletions oneflow/core/common/deallocate_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_COMMON_DEALLOCATE_CONTEXT_H_
#define ONEFLOW_CORE_COMMON_DEALLOCATE_CONTEXT_H_

#include <functional>
#include <memory>

namespace oneflow {

class DeallocateContext {
public:
DeallocateContext() = default;
virtual ~DeallocateContext() = default;

template<typename T>
void Deallocate(std::shared_ptr<T>&& ptr) {
std::shared_ptr<T> data = ptr;
ptr.reset();
LazyDeallocate([data] { const_cast<std::shared_ptr<T>*>(&data)->reset(); });
data.reset();
}

virtual void LazyDeallocate(std::function<void()> LazyDeallocator) = 0;
};

class NaiveDeallocateContext final : public DeallocateContext {
public:
NaiveDeallocateContext() = default;
~NaiveDeallocateContext() = default;

void LazyDeallocate(std::function<void()> LazyDeallocator) { LazyDeallocator(); }
};

} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_DEALLOCATE_CONTEXT_H_
11 changes: 11 additions & 0 deletions oneflow/core/common/env_var/env_var.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ int64_t ThreadLocalEnvInteger();

DEFINE_THREAD_LOCAL_ENV_INTEGER(ONEFLOW_THRAED_LOCAL_CACHED_SIZE, 128 * 1024);

template<typename env_var>
const std::string& ThreadLocalEnvString();

#define DEFINE_THREAD_LOCAL_ENV_STRING(env_var, default_value) \
struct env_var {}; \
template<> \
inline const std::string& ThreadLocalEnvString<env_var>() { \
thread_local std::string value = GetStringFromEnv(OF_PP_STRINGIZE(env_var), default_value); \
return value; \
}

} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_ENV_VAR_ENV_VAR_H_
29 changes: 29 additions & 0 deletions oneflow/core/common/env_var/lazy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_COMMON_ENV_VAR_LAZY_H_
#define ONEFLOW_CORE_COMMON_ENV_VAR_LAZY_H_

#include <string>
#include "oneflow/core/common/env_var/env_var.h"

namespace oneflow {

DEFINE_THREAD_LOCAL_ENV_STRING(ONEFLOW_LAZY_COMPILE_MODE, "naive");
DEFINE_THREAD_LOCAL_ENV_INTEGER(ONEFLOW_LAZY_COMPILE_RPC_THREAD_NUM, 16);

} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_ENV_VAR_LAZY_H_
41 changes: 41 additions & 0 deletions oneflow/core/common/id_pairs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_COMMON_ID_PAIRS_H_
#define ONEFLOW_CORE_COMMON_ID_PAIRS_H_

#include "oneflow/core/common/id_pairs.pb.h"
#include <unordered_set>
#include <utility>

namespace oneflow {

void InitIdPairs(const std::unordered_set<std::pair<int64_t, int64_t>>& pairs, IdPairs* proto) {
for (const auto& pair : pairs) {
auto* proto_pair = proto->mutable_int64_pair()->Add();
proto_pair->set_first(pair.first);
proto_pair->set_second(pair.second);
}
}

void MergeIdPairs(const IdPairs& id_pairs, std::unordered_set<std::pair<int64_t, int64_t>>* pairs) {
for (const auto& pair : id_pairs.int64_pair()) {
pairs->emplace(std::make_pair(pair.first(), pair.second()));
}
}

} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_ID_PAIRS_H_
11 changes: 11 additions & 0 deletions oneflow/core/common/id_pairs.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto2";
package oneflow;

message Int64Pair {
required int64 first = 1;
required int64 second = 2;
}

message IdPairs {
repeated Int64Pair int64_pair = 1;
}
6 changes: 1 addition & 5 deletions oneflow/core/framework/instructions_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ Maybe<void> InstructionsBuilder::SoftSyncNNGraphBuffers(
namespace {

int64_t NewSymbolId() {
static std::atomic<int64_t> cnt(0);
static std::atomic<int64_t> cnt(1);
return cnt.fetch_add(1, std::memory_order_relaxed);
}

Expand All @@ -225,10 +225,6 @@ Maybe<Scope> InstructionsBuilder::GetScopeSymbol(const ScopeProto& scope_proto)
return Singleton<symbol::Storage<Scope>>::Get()->FindOrCreate(scope_proto, &NewSymbolId);
}

Maybe<OperatorConfSymbol> InstructionsBuilder::GetOpConfSymbol(const OperatorConf& op_conf) {
return Singleton<symbol::Storage<OperatorConfSymbol>>::Get()->FindOrCreate(op_conf, &NewSymbolId);
}

Maybe<Scope> InstructionsBuilder::BuildInitialScope(
int64_t session_id, const JobConfigProto& job_conf, const std::string& device_tag,
const std::vector<std::string>& machine_device_ids, const std::shared_ptr<Shape>& hierarchy,
Expand Down
Loading