Skip to content

[new-exec] enable the new standalone executor by default #41179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 5, 2022
17 changes: 12 additions & 5 deletions paddle/fluid/framework/new_executor/data_transfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key,
}
}

bool transfered = false;
DataTranferHelper data_transfer_helper(place, var_scope);
for (auto& var_name_item : *ins_map_temp) {
bool should_skip_input =
Expand All @@ -334,6 +335,9 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key,
if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>()) {
tensor_in = GetLoDTensorOrSelectedRowsValueFromVar(*var);
} else if (var->IsType<LoDTensorArray>()) {
if (var->Get<LoDTensorArray>().size() == 0) {
continue;
}
tensor_in =
static_cast<const Tensor*>(&(var->Get<LoDTensorArray>()[0]));
} else {
Expand Down Expand Up @@ -389,6 +393,7 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key,
}

if (is_transferred) {
transfered = true;
// update RuntimeContext.inputs and original op_func_node inputs
op_func_node->input_index[var_name_item.first][i] =
var_scope->VarId(new_var_name);
Expand Down Expand Up @@ -426,11 +431,13 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key,
}
}

// NOTE(zhiqiu): UPDATE the corresponding OeratorBase to make it consistent
// with instruction. (hot fix, it is not good design here)
op_func_node->operator_base_ =
std::shared_ptr<OperatorBase>(framework::OpRegistry::CreateOp(
op_base->Type(), new_ins, new_outs, op_base->Attrs()));
if (transfered) {
// NOTE(zhiqiu): UPDATE the corresponding OeratorBase to make it consistent
// with instruction. (hot fix, it is not good design here)
op_func_node->operator_base_ =
std::shared_ptr<OperatorBase>(framework::OpRegistry::CreateOp(
op_base->Type(), new_ins, new_outs, op_base->Attrs()));
}
op_func_node->no_data_transform_index = std::move(no_data_transform_index);
}

Expand Down
23 changes: 17 additions & 6 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,16 @@ void InterpreterCore::Convert(
gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
platform::GenerateDeviceEventFlag());
}
bool inplaced = false;
for (auto inst : vec_instruction_) {
if (inst.OpBase()->Type() == "share_buffer" ||
inst.OpBase()->Type() == "share_data") {
VLOG(4) << "Already inplaced, skip inplace now.";
inplaced = true;
}
}

if (FLAGS_new_executor_use_inplace) {
if (FLAGS_new_executor_use_inplace && !inplaced) {
BuildInplace();
}

Expand Down Expand Up @@ -516,6 +524,12 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) {

void InterpreterCore::ExecuteInstructionList(
const std::vector<Instruction>& vec_instr) {
unfinished_op_numer_ = vec_instr.size();
if (unfinished_op_numer_ == 0) {
VLOG(4) << "No op to run, return";
return;
}

// NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
// those for the next step
auto atomic_deps = async_work_queue_->AtomicDeps();
Expand All @@ -524,8 +538,6 @@ void InterpreterCore::ExecuteInstructionList(
async_work_queue_->PrepareAtomicDeps(dependecy_count_);
async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());

unfinished_op_numer_ = vec_instr.size();

exception_holder_.Clear();

for (size_t i = 0; i < dependecy_count_.size(); ++i) {
Expand Down Expand Up @@ -561,12 +573,11 @@ void InterpreterCore::RunNextInstructions(
const Instruction& instr, std::queue<size_t>* reserved_next_ops,
std::vector<std::atomic<size_t>>* atomic_deps,
std::vector<std::atomic<size_t>>* atomic_var_ref) {
VLOG(4) << "atomic 1:" << atomic_deps;
auto& next_instr = instr.NextInstructions();

auto IsReady = [atomic_deps](size_t next_id) {
VLOG(4) << "atomic:" << atomic_deps << " " << &(*atomic_deps)[next_id]
<< " " << next_id;
VLOG(4) << "atomic:" << atomic_deps << " op_id: " << next_id
<< ", remain deps: " << (*atomic_deps)[next_id];
return (*atomic_deps)[next_id].fetch_sub(1, std::memory_order_relaxed) == 1;
};

Expand Down
151 changes: 146 additions & 5 deletions paddle/fluid/framework/new_executor/interpretercore_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,19 +428,19 @@ void build_op_func_list(const platform::Place& place,
op_func_node.dev_ctx_ = dev_ctx;
VLOG(3) << op_with_kernel->Type()
<< " : expected_kernel_key : " << expected_kernel_key;
auto exec_ctx =
ExecutionContext(*op_with_kernel, scope, *dev_ctx, runtime_context);

// see OperatorWithKernel::RunImpl in operator.cc for why
if (!(op->HasAttr(kAllKernelsMustComputeRuntimeShape) &&
op->Attr<bool>(kAllKernelsMustComputeRuntimeShape))) {
InterpretercoreInferShapeContext infer_shape_ctx(*op, runtime_context);
// TODO(Aurelius84): In case of control flow ops, they are NOT
// inheritted
// from OperatorWithKernel.
// inheritted from OperatorWithKernel.
op_with_kernel->Info().infer_shape_(&infer_shape_ctx);
}

auto exec_ctx =
ExecutionContext(*op_with_kernel, scope, *dev_ctx, runtime_context);

auto run_phi_kernel = false;
if (phi::KernelFactory::Instance().HasCompatiblePhiKernel(
op_with_kernel->Type())) {
Expand Down Expand Up @@ -476,7 +476,6 @@ void build_op_func_list(const platform::Place& place,
op_with_kernel->BuildPhiKernelContext(runtime_context, dev_ctx,
&pt_kernel_context);
op_func_node.pt_kernel_ = op_with_kernel->PhiKernel();

(*op_func_node.pt_kernel_)(&pt_kernel_context);
} else {
auto kernels_iter = all_op_kernels.find(op->Type());
Expand Down Expand Up @@ -711,6 +710,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
const std::set<std::string> random_op_set = {
"bernoulli", "poisson", "multinomial", "gaussian_random",
"uniform_random", "randint", "randperm", "exponential"};

int dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) {
Expand All @@ -721,6 +721,147 @@ std::map<int, std::list<int>> build_op_downstream_map(
}
}

// add dependency for communication op
const std::string communication_op_prefix = "c_";
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type().find(
communication_op_prefix) != std::string::npos) {
if (dependence_op_idx != -1) {
op2dependences[op_idx].insert(dependence_op_idx);
}
dependence_op_idx = op_idx;
}
}

// TODO(zhiqiu): there still some cases not handled
// add dependency for c_sync_comm_stream

// in program, we can add only one c_sync_comm_stream to sync all
// communication ops.
// c_allreduce_sum(a)
// c_allreduce_sum(b)
// c_allreduce_sum(c)
// c_sync_comm_stream(a)
const std::string kSyncComm = "c_sync_comm_stream";
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == kSyncComm) {
dependence_op_idx = op_idx;
} else {
if (dependence_op_idx != -1) {
VLOG(4) << "Add depend from "
<< vec_instruction[dependence_op_idx].OpBase()->Type() << " to "
<< vec_instruction[op_idx].OpBase()->Type();
op2dependences[op_idx].insert(dependence_op_idx);
}
}
}

// add dependency for coalesce_tensor
const std::string kCoalesceTensor = "coalesce_tensor";
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == kCoalesceTensor) {
VLOG(4) << "Add depend for " << kCoalesceTensor << " " << op_idx;
auto fused_out = vec_instruction[op_idx].Outputs().at("FusedOutput")[0];
auto outputs = vec_instruction[op_idx].Outputs().at("Output");

auto is_read = [](const Instruction& inst, int var_id) -> bool {
for (auto pair : inst.Inputs()) {
for (auto item : pair.second) {
if (item == var_id) {
return true;
}
}
}
return false;
};

auto is_write = [](const Instruction& inst, int var_id) -> bool {
for (auto pair : inst.Outputs()) {
for (auto item : pair.second) {
if (item == var_id) {
return true;
}
}
}
return false;
};

// find first op that reads fused_out
auto first_read_fused_out_op = -1;
for (auto j = op_idx + 1; j < vec_instruction.size(); ++j) {
if (is_read(vec_instruction[j], fused_out)) {
first_read_fused_out_op = j;
break;
}
}

if (UNLIKELY(first_read_fused_out_op == -1)) {
VLOG(4) << "No op read FusedOutput";
continue;
}

// find ops that write 'outputs' between (op_index,
// first_read_fused_out_op)
// add depend: them->first_read_fused_out_op
for (auto j = op_idx + 1;
j < static_cast<size_t>(first_read_fused_out_op); ++j) {
for (auto var_id : outputs) {
if (is_write(vec_instruction[j], var_id)) {
op2dependences[first_read_fused_out_op].insert(j);
VLOG(4) << j << " -> " << first_read_fused_out_op;
VLOG(4)
<< "Add depend from " << vec_instruction[j].OpBase()->Type()
<< " to "
<< vec_instruction[first_read_fused_out_op].OpBase()->Type();
}
}
}

// find first op read 'outputs' between (first_read_fused_out_op, end)
// add depned: first_read_fused_out_op -> first op that reads 'outputs'

// special case for consecutive communication ops, for example,
// FusedOutput = c_sync_calc_stream(FusedOutput)
// FusedOutput= c_allreduce_sum(FusedOutput)
// FusedOutput = c_sync_comm_stream(FusedOutput)
// we should take the last one to add depned instead of
// 'first_read_fused_out_op'
size_t target = first_read_fused_out_op;
for (size_t j = first_read_fused_out_op + 1; j < vec_instruction.size();
++j) {
if (j == target + 1 &&
vec_instruction[target].OpBase()->Type().find(
communication_op_prefix) != std::string::npos &&
vec_instruction[j].OpBase()->Type().find(communication_op_prefix) !=
std::string::npos) {
VLOG(4) << "Found consecutive communication ops, "
<< vec_instruction[target].OpBase()->Type() << " -> "
<< vec_instruction[j].OpBase()->Type();
target = j;
continue;
}

for (auto var_id : outputs) {
if (is_read(vec_instruction[j], var_id)) {
op2dependences[j].insert(target);
VLOG(4) << target << " -> " << j;
VLOG(4) << "Add depend from "
<< vec_instruction[target].OpBase()->Type() << " to "
<< vec_instruction[j].OpBase()->Type();
}
}
}
}
}
for (auto pair : op2dependences) {
VLOG(10) << pair.first << " Depends on " << pair.second.size();
std::ostringstream oss;
std::copy(pair.second.begin(), pair.second.end(),
std::ostream_iterator<int>(oss, " "));
VLOG(10) << oss.str();
}
return std::move(get_downstream_map(op2dependences));
}

Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/memory/allocation/allocator_facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ PADDLE_DEFINE_EXPORTED_bool(use_virtual_memory_auto_growth, false,
// NOTE(Ruibiao): This FLAGS is just to be compatibled with
// the old single-stream CUDA allocator. It will be removed
// after StreamSafeCudaAllocator has been fully tested.
PADDLE_DEFINE_EXPORTED_bool(use_stream_safe_cuda_allocator, false,
PADDLE_DEFINE_EXPORTED_bool(use_stream_safe_cuda_allocator, true,
"Enable StreamSafeCUDAAllocator");

PADDLE_DEFINE_EXPORTED_bool(use_cuda_managed_memory, false,
Expand Down
48 changes: 17 additions & 31 deletions paddle/fluid/operators/pscore/distributed_lookup_table_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@ template <typename DeviceContext, typename T>
class DistributedLookupTableKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &context) const override {
auto &scope = context.scope();

auto padding_idx = context.Attr<int64_t>("padding_idx");
auto table_id = context.Attr<int>("table_id");
bool is_test = context.Attr<bool>("is_test");

auto embedding_name = context.InputNames("W").front();
auto *var = context.InputVar("W");
int64_t emb_dim = 0;

auto *var = scope.FindVar(embedding_name);

if (var->IsType<framework::LoDTensor>()) {
emb_dim = var->Get<framework::LoDTensor>().dims()[1];
} else if (var->IsType<phi::SelectedRows>()) {
Expand All @@ -61,35 +57,31 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {
} else {
auto inputs_variable = context.MultiInputVar("Ids");
auto outputs_variable = context.MultiOutputVar("Outputs");
auto inputs_name = context.InputNames("Ids");
auto outputs_name = context.OutputNames("Outputs");

auto cpu_place = platform::CPUPlace();
framework::Scope *tmp_scope = scope.NewTmpScope().release();

std::vector<const framework::LoDTensor *> tmp_input_vec;
auto input_var_size = inputs_variable.size();
std::vector<framework::LoDTensor *> tmp_output_vec;
auto output_var_size = outputs_variable.size();

std::vector<std::shared_ptr<framework::LoDTensor>> tmp_tensors;

// create temp input
for (size_t idx = 0; idx < input_var_size; ++idx) {
framework::Variable *tmp_input_var = tmp_scope->Var(inputs_name[idx]);
framework::LoDTensor *tmp_input_tensor =
tmp_input_var->GetMutable<framework::LoDTensor>();
tmp_tensors.emplace_back(std::make_shared<framework::LoDTensor>());
auto *p = tmp_tensors.back().get();
framework::TensorCopy(inputs_variable[idx]->Get<framework::LoDTensor>(),
cpu_place, context.device_context(),
tmp_input_tensor);
tmp_input_vec.push_back(tmp_input_tensor);
cpu_place, context.device_context(), p);
tmp_input_vec.push_back(p);
}

// create temp output
for (size_t idx = 0; idx < output_var_size; ++idx) {
framework::Variable *tmp_output_var = tmp_scope->Var(outputs_name[idx]);
framework::LoDTensor *tmp_output_tensor =
tmp_output_var->GetMutable<framework::LoDTensor>();
tmp_output_tensor->Resize(outputs[idx]->dims());
tmp_output_vec.push_back(tmp_output_tensor);
tmp_tensors.emplace_back(std::make_shared<framework::LoDTensor>());
auto *p = tmp_tensors.back().get();
p->Resize(outputs[idx]->dims());
tmp_output_vec.push_back(p);
}

// use fleet->PullSparse
Expand All @@ -100,27 +92,21 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {

// cp temp to origin
for (size_t idx = 0; idx < output_var_size; ++idx) {
framework::Variable *tmp_output_var = tmp_scope->Var(outputs_name[idx]);
framework::LoDTensor *tmp_output_tensor =
tmp_output_var->GetMutable<framework::LoDTensor>();
framework::TensorCopy(
*tmp_output_tensor, context.GetPlace(), context.device_context(),
*tmp_output_vec[idx], context.GetPlace(), context.device_context(),
outputs_variable[idx]->GetMutable<framework::LoDTensor>());
}
delete tmp_scope;
}

auto id_names = context.InputNames("Ids");
auto out_names = context.OutputNames("Outputs");
auto lookup_table_version =
context.Attr<std::string>("lookup_table_version");
auto id_vars = context.MultiInputVar("Ids");
auto out_vars = context.MultiOutputVar("Outputs");

if (lookup_table_version == "lookup_table_v2") {
for (size_t i = 0; i < id_names.size(); ++i) {
auto *id_var = scope.FindVar(id_names[i]);
auto *out_var = scope.FindVar(out_names[i]);
auto *id_tensor = id_var->GetMutable<framework::LoDTensor>();
auto *out_tensor = out_var->GetMutable<framework::LoDTensor>();
for (size_t i = 0; i < id_vars.size(); ++i) {
auto *id_tensor = id_vars[i]->GetMutable<framework::LoDTensor>();
auto *out_tensor = out_vars[i]->GetMutable<framework::LoDTensor>();

auto id_dims = id_tensor->dims();
out_tensor->Resize(phi::make_ddim({static_cast<int64_t>(id_dims[0]),
Expand Down
Loading