Skip to content

Commit

Permalink
support graph inference (PaddlePaddle#34)
Browse files Browse the repository at this point in the history
* gpu_graph_infer

* simplify infer

* fix

* remove logs

* remove logs

* change logs
  • Loading branch information
seemingwang authored Jun 16, 2022
1 parent 8ce3cf9 commit f19ca37
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 100 deletions.
229 changes: 150 additions & 79 deletions paddle/fluid/framework/data_feed.cu

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ class GraphDataGenerator {
int FillWalkBuf(std::shared_ptr<phi::Allocation> d_walk);
int FillFeatureBuf(int64_t* d_walk, int64_t* d_feature, size_t key_num);
int FillFeatureBuf(std::shared_ptr<phi::Allocation> d_walk,
std::shared_ptr<phi::Allocation> d_feature);
std::shared_ptr<phi::Allocation> d_feature);
void FillOneStep(uint64_t* start_ids, uint64_t* walk, int len,
NeighborSampleResult& sample_res, int cur_degree, int step,
int* len_per_row);
Expand Down Expand Up @@ -944,6 +944,7 @@ class GraphDataGenerator {

std::set<int> finish_node_type_;
std::unordered_map<int, size_t> node_type_start_;
std::vector<int> infer_node_type_start_;

std::shared_ptr<phi::Allocation> d_ins_buf_;
std::shared_ptr<phi::Allocation> d_feature_buf_;
Expand All @@ -962,6 +963,7 @@ class GraphDataGenerator {
int debug_mode_;
std::vector<int> first_node_type_;
std::vector<std::vector<int>> meta_path_;
bool gpu_graph_training_;
};

class DataFeed {
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/data_feed.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message GraphConfig {
optional int32 debug_mode = 7 [ default = 0 ];
optional string first_node_type = 8;
optional string meta_path = 9;
optional bool gpu_graph_training = 10 [ default = true ];
}

message DataFeedDesc {
Expand Down
68 changes: 51 additions & 17 deletions paddle/fluid/framework/device_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,62 @@ void DeviceWorker::SetDataFeed(DataFeed* data_feed) {
}

template <typename T>
std::string PrintLodTensorType(Tensor* tensor, int64_t start, int64_t end) {
std::string PrintLodTensorType(Tensor* tensor, int64_t start, int64_t end,
char separator = ':',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
return "access violation";
}
if (start >= end) return "";
std::ostringstream os;
if (!need_leading_separator) {
os << tensor->data<T>()[start];
start++;
}
for (int64_t i = start; i < end; i++) {
os << ":" << tensor->data<T>()[i];
// os << ":" << tensor->data<T>()[i];
os << separator << tensor->data<T>()[i];
}
return os.str();
}

std::string PrintLodTensorIntType(Tensor* tensor, int64_t start, int64_t end) {
std::string PrintLodTensorIntType(Tensor* tensor, int64_t start, int64_t end,
char separator = ':',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
return "access violation";
}
if (start >= end) return "";
std::ostringstream os;
if (!need_leading_separator) {
os << static_cast<uint64_t>(tensor->data<int64_t>()[start]);
start++;
}
for (int64_t i = start; i < end; i++) {
os << ":" << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
// os << ":" << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
os << separator << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
}
return os.str();
}

std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end) {
std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end,
char separator, bool need_leading_separator) {
std::string out_val;
if (framework::TransToProtoVarType(tensor->dtype()) == proto::VarType::FP32) {
out_val = PrintLodTensorType<float>(tensor, start, end);
out_val = PrintLodTensorType<float>(tensor, start, end, separator,
need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::INT64) {
out_val = PrintLodTensorIntType(tensor, start, end);
out_val = PrintLodTensorIntType(tensor, start, end, separator,
need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::FP64) {
out_val = PrintLodTensorType<double>(tensor, start, end);
out_val = PrintLodTensorType<double>(tensor, start, end, separator,
need_leading_separator);
} else {
out_val = "unsupported type";
}
Expand Down Expand Up @@ -122,6 +142,11 @@ void DeviceWorker::DumpParam(const Scope& scope, const int batch_id) {
}

void DeviceWorker::InitRandomDumpConfig(const TrainerDesc& desc) {
bool is_dump_in_simple_mode = desc.is_dump_in_simple_mode();
if (is_dump_in_simple_mode) {
dump_mode_ = 3;
return;
}
bool enable_random_dump = desc.enable_random_dump();
if (!enable_random_dump) {
dump_mode_ = 0;
Expand All @@ -139,7 +164,7 @@ void DeviceWorker::DumpField(const Scope& scope, int dump_mode,
int dump_interval) { // dump_mode: 0: no random,
// 1: random with insid hash,
// 2: random with random
// number
// 3: simple mode
size_t batch_size = device_reader_->GetCurBatchSize();
auto& ins_id_vec = device_reader_->GetInsIdVec();
auto& ins_content_vec = device_reader_->GetInsContentVec();
Expand All @@ -163,12 +188,15 @@ void DeviceWorker::DumpField(const Scope& scope, int dump_mode,
}
hit[i] = true;
}
for (size_t i = 0; i < ins_id_vec.size(); i++) {
if (!hit[i]) {
continue;

if (dump_mode_ != 3) {
for (size_t i = 0; i < ins_id_vec.size(); i++) {
if (!hit[i]) {
continue;
}
ars[i] += ins_id_vec[i];
ars[i] = ars[i] + "\t" + ins_content_vec[i];
}
ars[i] += ins_id_vec[i];
ars[i] = ars[i] + "\t" + ins_content_vec[i];
}
for (auto& field : *dump_fields_) {
Variable* var = scope.FindVar(field);
Expand All @@ -195,14 +223,20 @@ void DeviceWorker::DumpField(const Scope& scope, int dump_mode,
"wrong ";
continue;
}

for (size_t i = 0; i < batch_size; ++i) {
if (!hit[i]) {
continue;
}
auto bound = GetTensorBound(tensor, i);
ars[i] = ars[i] + "\t" + field + ":" +
std::to_string(bound.second - bound.first);
ars[i] += PrintLodTensor(tensor, bound.first, bound.second);
if (dump_mode_ == 3) {
if (ars[i].size() > 0) ars[i] += "\t";
ars[i] += PrintLodTensor(tensor, bound.first, bound.second, ' ', false);
} else {
ars[i] = ars[i] + "\t" + field + ":" +
std::to_string(bound.second - bound.first);
ars[i] += PrintLodTensor(tensor, bound.first, bound.second);
}
}
}
// #pragma omp parallel for
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/device_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class Scope;
namespace paddle {
namespace framework {

std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end);
std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end,
char separator = ',',
bool need_leading_separator = false);
std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index);
bool CheckValidOutput(LoDTensor* tensor, size_t batch_size);

Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/framework/trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ void TrainerBase::DumpWork(int tid) {
int err_no = 0;
// GetDumpPath is implemented in each Trainer
std::string path = GetDumpPath(tid);

std::shared_ptr<FILE> fp = fs_open_write(path, &err_no, dump_converter_);
while (1) {
std::string out_str;
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/trainer_desc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ message TrainerDesc {

// add for gpu
optional string fleet_desc = 37;

optional bool is_dump_in_simple_mode = 38 [ default = false ];
// device worker parameters
optional HogwildWorkerParameter hogwild_param = 101;
optional DownpourWorkerParameter downpour_param = 103;
Expand Down
2 changes: 2 additions & 0 deletions python/paddle/fluid/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,8 @@ def set_graph_config(self, config):
self.proto_desc.graph_config.first_node_type = config.get(
"first_node_type", "")
self.proto_desc.graph_config.meta_path = config.get("meta_path", "")
self.proto_desc.graph_config.gpu_graph_training = config.get(
"gpu_graph_training", True)


class QueueDataset(DatasetBase):
Expand Down
3 changes: 3 additions & 0 deletions python/paddle/fluid/trainer_desc.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ def _set_dump_fields(self, dump_fields):
for field in dump_fields:
self.proto_desc.dump_fields.append(field)

def _set_is_dump_in_simple_mode(self, is_dump_in_simple_mode):
self.proto_desc.is_dump_in_simple_mode = is_dump_in_simple_mode

def _set_dump_fields_path(self, path):
self.proto_desc.dump_fields_path = path

Expand Down
3 changes: 3 additions & 0 deletions python/paddle/fluid/trainer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def _create_trainer(self, opt_info=None):
trainer._set_worker_places(opt_info["worker_places"])
if opt_info.get("use_ps_gpu") is not None:
trainer._set_use_ps_gpu(opt_info["use_ps_gpu"])
if opt_info.get("is_dump_in_simple_mode") is not None:
trainer._set_is_dump_in_simple_mode(opt_info[
"is_dump_in_simple_mode"])
if opt_info.get("enable_random_dump") is not None:
trainer._set_enable_random_dump(opt_info[
"enable_random_dump"])
Expand Down

0 comments on commit f19ca37

Please sign in to comment.